You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/07/31 00:33:23 UTC

svn commit: r799461 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

Author: shv
Date: Thu Jul 30 22:33:22 2009
New Revision: 799461

URL: http://svn.apache.org/viewvc?rev=799461&view=rev
Log:
HDFS-119. Fix a bug in logSync(), which causes NameNode block forever. Contributed by Suresh Srinivas.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=799461&r1=799460&r2=799461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Thu Jul 30 22:33:22 2009
@@ -113,6 +113,9 @@
     HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
     (He Yongqiang via szetszwo)
 
+    HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
+    (Suresh Srinivas via shv)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=799461&r1=799460&r2=799461&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Jul 30 22:33:22 2009
@@ -320,7 +320,7 @@
    * @param es - stream to remove
    * @return the matching stream
    */
-  public StorageDirectory getStorage(EditLogOutputStream es) {
+  StorageDirectory getStorage(EditLogOutputStream es) {
     String parentStorageDir = ((EditLogFileOutputStream)es).getFile()
     .getParentFile().getParentFile().getAbsolutePath();
 
@@ -339,7 +339,7 @@
    * @param sd
    * @return the matching stream
    */
-  public EditLogOutputStream getEditsStream(StorageDirectory sd) {
+  synchronized EditLogOutputStream getEditsStream(StorageDirectory sd) {
 	for (EditLogOutputStream es : editStreams) {
 	  File parentStorageDir = ((EditLogFileOutputStream)es).getFile()
 	  .getParentFile().getParentFile();
@@ -780,68 +780,76 @@
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-
-    synchronized (this) {
-      assert editStreams.size() > 0 : "no editlog streams";
-      printStatistics(false);
-
-      // if somebody is already syncing, then wait
-      while (mytxid > synctxid && isSyncRunning) {
-        try {
-          wait(1000);
-        } catch (InterruptedException ie) { 
+    EditLogOutputStream streams[] = null;
+    boolean sync = false;
+    try {
+      synchronized (this) {
+        assert editStreams.size() > 0 : "no editlog streams";
+        printStatistics(false);
+  
+        // if somebody is already syncing, then wait
+        while (mytxid > synctxid && isSyncRunning) {
+          try {
+            wait(1000);
+          } catch (InterruptedException ie) { 
+          }
         }
-      }
-
-      //
-      // If this transaction was already flushed, then nothing to do
-      //
-      if (mytxid <= synctxid) {
-        numTransactionsBatchedInSync++;
-        if (metrics != null) // Metrics is non-null only when used inside name node
-          metrics.transactionsBatchedInSync.inc();
-        return;
-      }
-   
-      // now, this thread will do the sync
-      syncStart = txid;
-      isSyncRunning = true;   
-
-      // swap buffers
-      for(EditLogOutputStream eStream : editStreams) {
-        eStream.setReadyToFlush();
-      }
-    }
-
-    // do the sync
-    long start = FSNamesystem.now();
-    for (int idx = 0; idx < editStreams.size(); idx++) {
-      EditLogOutputStream eStream = editStreams.get(idx);
-      try {
-        eStream.flush();
-      } catch (IOException ie) {
+  
         //
-        // remember the streams that encountered an error.
+        // If this transaction was already flushed, then nothing to do
         //
-        if (errorStreams == null) {
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
+        if (mytxid <= synctxid) {
+          numTransactionsBatchedInSync++;
+          if (metrics != null) // Metrics is non-null only when used inside name node
+            metrics.transactionsBatchedInSync.inc();
+          return;
         }
-        errorStreams.add(eStream);
-        FSNamesystem.LOG.error("Unable to sync edit log. " +
-                               "Fatal Error.");
+     
+        // now, this thread will do the sync
+        syncStart = txid;
+        isSyncRunning = true;
+        sync = true;
+  
+        // swap buffers
+        for(EditLogOutputStream eStream : editStreams) {
+          eStream.setReadyToFlush();
+        }
+        streams = 
+          editStreams.toArray(new EditLogOutputStream[editStreams.size()]);
       }
+  
+      // do the sync
+      long start = FSNamesystem.now();
+      for (int idx = 0; idx < streams.length; idx++) {
+        EditLogOutputStream eStream = streams[idx];
+        try {
+          eStream.flush();
+        } catch (IOException ie) {
+          //
+          // remember the streams that encountered an error.
+          //
+          if (errorStreams == null) {
+            errorStreams = new ArrayList<EditLogOutputStream>(1);
+          }
+          errorStreams.add(eStream);
+          FSNamesystem.LOG.error("Unable to sync edit log. " +
+                                 "Fatal Error.");
+        }
+      }
+      long elapsed = FSNamesystem.now() - start;
+      processIOError(errorStreams, true);
+  
+      if (metrics != null) // Metrics non-null only when used inside name node
+        metrics.syncs.inc(elapsed);
+    } finally {
+      synchronized (this) {
+        synctxid = syncStart;
+        if (sync) {
+          isSyncRunning = false;
+        }
+        this.notifyAll();
+     }
     }
-    long elapsed = FSNamesystem.now() - start;
-
-    synchronized (this) {
-       processIOError(errorStreams, true);
-       synctxid = syncStart;
-       isSyncRunning = false;
-       this.notifyAll();
-    }
-
-    if (metrics != null) // Metrics is non-null only when used inside name node
-      metrics.syncs.inc(elapsed);
   }
 
   //
@@ -1030,14 +1038,6 @@
     return size;
   }
   
-  public String listEditsStreams() {
-    StringBuffer buf = new StringBuffer();
-    for (EditLogOutputStream os : editStreams) {
-      buf.append(os.getName()  + ";");
-    }
-    return buf.toString();
-  }
-
   /**
    * Closes the current edit log and opens edits.new. 
    */
@@ -1272,7 +1272,7 @@
    * @param nnReg this (active) name-node registration.
    * @throws IOException
    */
-  void logJSpoolStart(NamenodeRegistration bnReg, // backup node
+  synchronized void logJSpoolStart(NamenodeRegistration bnReg, // backup node
                       NamenodeRegistration nnReg) // active name-node
   throws IOException {
     if(bnReg.isRole(NamenodeRole.CHECKPOINT))
@@ -1331,22 +1331,27 @@
     }
 
     public boolean hasNext() {
-      if(editStreams == null || 
-         editStreams.isEmpty() || nextIndex >= editStreams.size())
-        return false;
-      while(nextIndex < editStreams.size()
-            && !editStreams.get(nextIndex).getType().isOfType(type))
-        nextIndex++;
-      return nextIndex < editStreams.size();
+      synchronized(FSEditLog.this) {
+        if(editStreams == null || 
+           editStreams.isEmpty() || nextIndex >= editStreams.size())
+          return false;
+        while(nextIndex < editStreams.size()
+              && !editStreams.get(nextIndex).getType().isOfType(type))
+          nextIndex++;
+        return nextIndex < editStreams.size();
+      }
     }
 
     public EditLogOutputStream next() {
-      EditLogOutputStream stream = editStreams.get(nextIndex);
-      prevIndex = nextIndex;
-      nextIndex++;
-      while(nextIndex < editStreams.size()
-          && !editStreams.get(nextIndex).getType().isOfType(type))
-      nextIndex++;
+      EditLogOutputStream stream = null;
+      synchronized(FSEditLog.this) {
+        stream = editStreams.get(nextIndex);
+        prevIndex = nextIndex;
+        nextIndex++;
+        while(nextIndex < editStreams.size()
+            && !editStreams.get(nextIndex).getType().isOfType(type))
+        nextIndex++;
+      }
       return stream;
     }
 
@@ -1357,9 +1362,11 @@
     }
 
     void replace(EditLogOutputStream newStream) {
-      assert 0 <= prevIndex && prevIndex < editStreams.size() :
-                                                        "Index out of bound.";
-      editStreams.set(prevIndex, newStream);
+      synchronized (FSEditLog.this) {
+        assert 0 <= prevIndex && prevIndex < editStreams.size() :
+                                                          "Index out of bound.";
+        editStreams.set(prevIndex, newStream);
+      }
     }
   }