You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/03/27 22:17:26 UTC

svn commit: r523062 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/

Author: cutting
Date: Tue Mar 27 13:17:25 2007
New Revision: 523062

URL: http://svn.apache.org/viewvc?view=rev&rev=523062
Log:
HADOOP-1153.  Fix HDFS daemons to correctly stop their threads.  Contributed by Konstantin.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Mar 27 13:17:25 2007
@@ -45,6 +45,9 @@
     by probing for free ports, improving test reliability.
     (Arun C Murthy via cutting)
 
+14. HADOOP-1153.  Fix HDFS daemons to correctly stop their threads.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.12.2 - 2007-23-17
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Tue Mar 27 13:17:25 2007
@@ -110,7 +110,7 @@
     FSDataset data;
     DatanodeRegistration dnRegistration;
     private String networkLoc;
-    boolean shouldRun = true;
+    volatile boolean shouldRun = true;
     Vector receivedBlockList = new Vector();
     int xmitsInProgress = 0;
     Daemon dataXceiveServer = null;
@@ -318,7 +318,7 @@
      * @throws IOException
      */
     private void register() throws IOException {
-      while( true ) {
+      while( shouldRun ) {
         try {
           // reset name to machineName. Mainly for web interface.
           dnRegistration.name = machineName + ":" + dnRegistration.getPort();
@@ -342,15 +342,22 @@
      * Returns only after shutdown is complete.
      */
     public void shutdown() {
-        try {
-          infoServer.stop();
-        } catch (Exception e) {
+        if (infoServer != null) {
+          try {
+            infoServer.stop();
+          } catch (Exception e) {
+          }
         }
         this.shouldRun = false;
-        ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
-        try {
-          this.storage.closeAll();
-        } catch (IOException ie) {
+        if (dataXceiveServer != null) {
+          ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
+          this.dataXceiveServer.interrupt();
+        }
+        if (storage != null) {
+          try {
+            this.storage.closeAll();
+          } catch (IOException ie) {
+          }
         }
     }
 
@@ -539,7 +546,6 @@
      * Hadoop IPC mechanism.
      */
     class DataXceiveServer implements Runnable {
-        boolean shouldListen = true;
         ServerSocket ss;
         public DataXceiveServer(ServerSocket ss) {
             this.ss = ss;
@@ -549,7 +555,7 @@
          */
         public void run() {
             try {
-                while (shouldListen) {
+                while (shouldRun) {
                     Socket s = ss.accept();
                     //s.setSoTimeout(READ_TIMEOUT);
                     data.checkDataDir();
@@ -566,7 +572,8 @@
             }
         }
         public void kill() {
-            this.shouldListen = false;
+            assert shouldRun == false :
+              "shoudRun should be set to false before killing";
             try {
                 this.ss.close();
             } catch (IOException iex) {
@@ -1071,16 +1078,16 @@
         LOG.info("Finishing DataNode in: "+data);
     }
 
-    private static ArrayList dataNodeList = new ArrayList();
-    private static ArrayList dataNodeThreadList = new ArrayList();
+    private static ArrayList<DataNode> dataNodeList = new ArrayList<DataNode>();
+    private static ArrayList<Thread> dataNodeThreadList = new ArrayList<Thread>();
     
     /** Start datanode daemon.
      */
     public static void run(Configuration conf, String networkLoc) throws IOException {
         String[] dataDirs = conf.getStrings("dfs.data.dir");
         DataNode dn = makeInstance(networkLoc, dataDirs, conf);
-        dataNodeList.add(dn);
         if (dn != null) {
+          dataNodeList.add(dn);
           Thread t = new Thread(dn, "DataNode: [" +
               StringUtils.arrayToString(dataDirs) + "]");
           t.setDaemon(true); // needed for JUnit testing
@@ -1090,15 +1097,14 @@
     }
     
     /**
-     * Shut down all datanodes that where started via the run(conf) method.
+     * Shut down all datanodes that where started via the 
+     * run(conf,networkLoc) method.
      * Returns only after shutdown is complete.
      */
     public static void shutdownAll(){
-      if(!dataNodeList.isEmpty()){
-        for (Iterator iterator = dataNodeList.iterator(); iterator.hasNext();) {
-          DataNode dataNode = (DataNode) iterator.next();
-          dataNode.shutdown();
-        }
+      while (!dataNodeList.isEmpty()) {
+        dataNodeList.remove(0).shutdown();
+        dataNodeThreadList.remove(0).interrupt();
       }
     }
 
@@ -1113,12 +1119,7 @@
       Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
       try {
         t.join();
-      } catch (InterruptedException e) {
-        if (Thread.currentThread().isInterrupted()) {
-          // did someone knock?
-          return;
-        }
-      }
+      } catch (InterruptedException e) {}
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Mar 27 13:17:25 2007
@@ -174,7 +174,7 @@
     Daemon lmthread = null;   // LeaseMonitor thread
     Daemon smmthread = null;  // SafeModeMonitor thread
     Daemon replthread = null;  // Replication thread
-    boolean fsRunning = true;
+    volatile boolean fsRunning = true;
     long systemStart = 0;
 
     //  The maximum number of replicates we should allow for a single block
@@ -302,20 +302,22 @@
      * them to finish, but a short timeout returns control back to caller.
      */
     public void close() {
-      synchronized (this) {
         fsRunning = false;
-      }
         try {
-            pendingReplications.stop();
-            infoServer.stop();
-            hbthread.join(3000);
-            replthread.join(3000);
-            dnthread.join(3000);
+          if (pendingReplications != null) pendingReplications.stop();
+          if (infoServer != null) infoServer.stop();
+          if (hbthread != null) hbthread.interrupt();
+          if (replthread != null) replthread.interrupt();
+          if (dnthread != null) dnthread.interrupt();
+          if (smmthread != null) smmthread.interrupt();
         } catch (InterruptedException ie) {
         } finally {
           // using finally to ensure we also wait for lease daemon
           try {
-            lmthread.join(3000);
+            if (lmthread != null) {
+              lmthread.interrupt();
+              lmthread.join(3000);
+            }
           } catch (InterruptedException ie) {
           } finally {
               try {
@@ -3710,7 +3712,7 @@
       /**
        */
       public void run() {
-        while( ! safeMode.canLeave() ) {
+        while (fsRunning && !safeMode.canLeave()) {
           try {
             Thread.sleep(recheckInterval);
           } catch (InterruptedException ie) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/PendingReplicationBlocks.java Tue Mar 27 13:17:25 2007
@@ -38,7 +38,7 @@
   private Map<Block, PendingBlockInfo> pendingReplications;
   private ArrayList<Block> timedOutItems;
   Daemon timerThread = null;
-  private boolean fsRunning = true;
+  private volatile boolean fsRunning = true;
 
   //
   // It might take anywhere between 5 to 10 minutes before

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java?view=diff&rev=523062&r1=523061&r2=523062
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDecommission.java Tue Mar 27 13:17:25 2007
@@ -250,13 +250,7 @@
     writeConfigFile(localFileSys, excludeFile, null);
 
     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
-    // Now wait for 15 seconds to give datanodes chance to register
-    // themselves and to report heartbeat
-    try {
-      Thread.sleep(15000L);
-    } catch (InterruptedException e) {
-      // nothing
-    }
+    cluster.waitActive();
     InetSocketAddress addr = new InetSocketAddress("localhost", 
                                              cluster.getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);