You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/05/21 20:11:36 UTC

svn commit: r947106 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/

Author: hairong
Date: Fri May 21 18:11:35 2010
New Revision: 947106

URL: http://svn.apache.org/viewvc?rev=947106&view=rev
Log:
HDFS-1112. Edit log buffer should not grow unboundedly. Contributed by Hairong Kuang.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri May 21 18:11:35 2010
@@ -13,9 +13,11 @@ Trunk (unreleased changes)
     HDFS-1061. Memory footprint optimization for INodeFile object. 
     (Bharath Mundlapudi via jghoman)
 
-    HDFS=1079. Throw exceptions as specified by the AbstractFileSystem
+    HDFS-1079. Throw exceptions as specified by the AbstractFileSystem
     in HDFS implemenation and protocols. (suresh)
 
+    HDFS-1112. Edit log buffer should not grow unfoundedly. (hairong)
+
   BUG FIXES
 
     HDFS 1021. specify correct server principal for RefreshAuthorizationPolicyProtocol 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri May 21 18:11:35 2010
@@ -41,6 +41,7 @@ class EditLogFileOutputStream extends Ed
   private FileChannel fc; // channel of the file stream for sync
   private DataOutputBuffer bufCurrent; // current buffer for writing
   private DataOutputBuffer bufReady; // buffer ready for flushing
+  final private int initBufferSize; // inital buffer size
   static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
 
   /**
@@ -55,6 +56,7 @@ class EditLogFileOutputStream extends Ed
   EditLogFileOutputStream(File name, int size) throws IOException {
     super();
     file = name;
+    initBufferSize = size;
     bufCurrent = new DataOutputBuffer(size);
     bufReady = new DataOutputBuffer(size);
     RandomAccessFile rp = new RandomAccessFile(name, "rw");
@@ -146,6 +148,14 @@ class EditLogFileOutputStream extends Ed
   }
 
   /**
+   * @return true if the number of buffered data exceeds the intial buffer size
+   */
+  @Override
+  public boolean shouldForceSync() {
+    return bufReady.size() >= initBufferSize;
+  }
+  
+  /**
    * Return the size of the current edit log including buffered data.
    */
   @Override

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri May 21 18:11:35 2010
@@ -91,6 +91,17 @@ implements JournalStream {
    */
   abstract long length() throws IOException;
 
+  /**
+   * Implement the policy when to automatically sync the buffered edits log
+   * The buffered edits can be flushed when the buffer becomes full or
+   * a certain period of time is elapsed.
+   * 
+   * @return true if the buffered data should be automatically synced to disk
+   */
+  public boolean shouldForceSync() {
+    return false;
+  }
+  
   boolean isOperationSupported(byte op) {
     return true;
   }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri May 21 18:11:35 2010
@@ -115,6 +115,8 @@ public class FSEditLog {
   // is a sync currently running?
   private volatile boolean isSyncRunning;
 
+  // is an automatic sync scheduled?
+  private volatile boolean isAutoSyncScheduled = false;
 
   // these are statistics counters.
   private long numTransactions;        // number of transactions
@@ -846,31 +848,84 @@ public class FSEditLog {
   }
 
   /**
-   * Write an operation to the edit log. Do not sync to persistent
-   * store yet.
+   * Write an operation to the edit log. 
+   * Automatically sync buffered edits to persistent store if it is time
+   * to sync.
    */
-  synchronized void logEdit(byte op, Writable ... writables) {
-    if(getNumEditStreams() == 0)
-      throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
-    ArrayList<EditLogOutputStream> errorStreams = null;
-    long start = FSNamesystem.now();
-    for(EditLogOutputStream eStream : editStreams) {
-      FSImage.LOG.debug("loggin edits into " + eStream.getName()  + " stream");
-      if(!eStream.isOperationSupported(op))
-        continue;
-      try {
-        eStream.write(op, writables);
-      } catch (IOException ie) {
-        FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
-        if(errorStreams == null)
-          errorStreams = new ArrayList<EditLogOutputStream>(1);
-        errorStreams.add(eStream);
+  void logEdit(byte op, Writable ... writables) {
+    synchronized (this) {
+      // wait if an automatic sync is scheduled
+      waitIfAutoSyncScheduled();
+      
+      if(getNumEditStreams() == 0)
+        throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
+      ArrayList<EditLogOutputStream> errorStreams = null;
+      long start = FSNamesystem.now();
+      for(EditLogOutputStream eStream : editStreams) {
+        FSImage.LOG.debug("loggin edits into " + eStream.getName()  + " stream");
+        if(!eStream.isOperationSupported(op))
+          continue;
+        try {
+          eStream.write(op, writables);
+        } catch (IOException ie) {
+          FSImage.LOG.warn("logEdit: removing "+ eStream.getName(), ie);
+          if(errorStreams == null)
+            errorStreams = new ArrayList<EditLogOutputStream>(1);
+          errorStreams.add(eStream);
+        }
       }
+      processIOError(errorStreams, true);
+      recordTransaction(start);
+      
+      // check if it is time to schedule an automatic sync
+      if (!shouldForceSync()) {
+        return;
+      }
+      isAutoSyncScheduled = true;
     }
-    processIOError(errorStreams, true);
-    recordTransaction(start);
+    
+    // sync buffered edit log entries to persistent store
+    logSync();
   }
 
+  /**
+   * Wait if an automatic sync is scheduled
+   * @throws InterruptedException
+   */
+  synchronized void waitIfAutoSyncScheduled() {
+    try {
+      while (isAutoSyncScheduled) {
+        this.wait(1000);
+      }
+    } catch (InterruptedException e) {
+    }
+  }
+  
+  /**
+   * Signal that an automatic sync scheduling is done if it is scheduled
+   */
+  synchronized void doneWithAutoSyncScheduling() {
+    if (isAutoSyncScheduled) {
+      isAutoSyncScheduled = false;
+      notifyAll();
+    }
+  }
+  
+  /**
+   * Check if should automatically sync buffered edits to 
+   * persistent store
+   * 
+   * @return true if any of the edit stream says that it should sync
+   */
+  private boolean shouldForceSync() {
+    for (EditLogOutputStream eStream : editStreams) {
+      if (eStream.shouldForceSync()) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
   private void recordTransaction(long start) {
     // get a new transactionId
     txid++;
@@ -935,16 +990,17 @@ public class FSEditLog {
    * concurrency with sync() should be synchronized and also call
    * waitForSyncToFinish() before assuming they are running alone.
    */
-  public void logSync() throws IOException {
+  public void logSync() {
     ArrayList<EditLogOutputStream> errorStreams = null;
     long syncStart = 0;
 
     // Fetch the transactionId of this thread. 
     long mytxid = myTransactionId.get().txid;
-    EditLogOutputStream streams[] = null;
+    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
     boolean sync = false;
     try {
       synchronized (this) {
+        try {
         assert editStreams.size() > 0 : "no editlog streams";
         printStatistics(false);
   
@@ -973,16 +1029,29 @@ public class FSEditLog {
   
         // swap buffers
         for(EditLogOutputStream eStream : editStreams) {
-          eStream.setReadyToFlush();
+          try {
+            eStream.setReadyToFlush();
+            streams.add(eStream);
+          } catch (IOException ie) {
+            FSNamesystem.LOG.error("Unable to get ready to flush.", ie);
+            //
+            // remember the streams that encountered an error.
+            //
+            if (errorStreams == null) {
+              errorStreams = new ArrayList<EditLogOutputStream>(1);
+            }
+            errorStreams.add(eStream);
+          }
+        }
+        } finally {
+          // Prevent RuntimeException from blocking other log edit write 
+          doneWithAutoSyncScheduling();
         }
-        streams = 
-          editStreams.toArray(new EditLogOutputStream[editStreams.size()]);
       }
   
       // do the sync
       long start = FSNamesystem.now();
-      for (int idx = 0; idx < streams.length; idx++) {
-        EditLogOutputStream eStream = streams[idx];
+      for (EditLogOutputStream eStream : streams) {
         try {
           eStream.flush();
         } catch (IOException ie) {
@@ -1002,6 +1071,7 @@ public class FSEditLog {
       if (metrics != null) // Metrics non-null only when used inside name node
         metrics.syncs.inc(elapsed);
     } finally {
+      // Prevent RuntimeException from blocking other log edit sync 
       synchronized (this) {
         synctxid = syncStart;
         if (sync) {

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java Fri May 21 18:11:35 2010
@@ -79,31 +79,25 @@ public class CreateEditsLog {
          blocks[iB].setBlockId(currentBlockId++);
       }
 
-      try {
-
-        INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
-                      null, replication, 0, blockSize, blocks, p, "", "", null);
-        // Append path to filename with information about blockIDs 
-        String path = "_" + iF + "_B" + blocks[0].getBlockId() + 
-                      "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
-        String filePath = nameGenerator.getNextFileName("");
-        filePath = filePath + path;
-        // Log the new sub directory in edits
-        if ((iF % nameGenerator.getFilesPerDirectory())  == 0) {
-          String currentDir = nameGenerator.getCurrentDir();
-          dirInode = new INodeDirectory(p, 0L);
-          editLog.logMkDir(currentDir, dirInode);
-        }
-        editLog.logOpenFile(filePath, inode);
-        editLog.logCloseFile(filePath, inode);
+      INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+                    null, replication, 0, blockSize, blocks, p, "", "", null);
+      // Append path to filename with information about blockIDs 
+      String path = "_" + iF + "_B" + blocks[0].getBlockId() + 
+                    "_to_B" + blocks[blocksPerFile-1].getBlockId() + "_";
+      String filePath = nameGenerator.getNextFileName("");
+      filePath = filePath + path;
+      // Log the new sub directory in edits
+      if ((iF % nameGenerator.getFilesPerDirectory())  == 0) {
+        String currentDir = nameGenerator.getCurrentDir();
+        dirInode = new INodeDirectory(p, 0L);
+        editLog.logMkDir(currentDir, dirInode);
+      }
+      editLog.logOpenFile(filePath, inode);
+      editLog.logCloseFile(filePath, inode);
 
-        if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
-          editLog.logSync();
-          bidAtSync = currentBlockId;
-        }
-      } catch (IOException e) {
-        System.out.println("Creating trascation for file " + iF +
-            " encountered exception " + e);
+      if (currentBlockId - bidAtSync >= 2000) { // sync every 2K blocks
+        editLog.logSync();
+        bidAtSync = currentBlockId;
       }
     }
     System.out.println("Created edits log in directory " + edits_dir);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=947106&r1=947105&r2=947106&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java Fri May 21 18:11:35 2010
@@ -64,16 +64,11 @@ public class TestEditLog extends TestCas
       FSEditLog editLog = namesystem.getEditLog();
 
       for (int i = 0; i < numTransactions; i++) {
-        try {
-          INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
-                              p, replication, blockSize, 0, "", "", null);
-          editLog.logOpenFile("/filename" + i, inode);
-          editLog.logCloseFile("/filename" + i, inode);
-          editLog.logSync();
-        } catch (IOException e) {
-          System.out.println("Transaction " + i + " encountered exception " +
-                             e);
-        }
+        INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+                            p, replication, blockSize, 0, "", "", null);
+        editLog.logOpenFile("/filename" + i, inode);
+        editLog.logCloseFile("/filename" + i, inode);
+        editLog.logSync();
       }
     }
   }
@@ -82,6 +77,18 @@ public class TestEditLog extends TestCas
    * Tests transaction logging in dfs.
    */
   public void testEditLog() throws IOException {
+    testEditLog(2048);
+    // force edit buffer to automatically sync on each log of edit log entry
+    testEditLog(1);
+  }
+  
+  /**
+   * Test edit log with different initial buffer size
+   * 
+   * @param initialSize initial edit log buffer size
+   * @throws IOException
+   */
+  private void testEditLog(int initialSize) throws IOException {
 
     // start a cluster 
     Configuration conf = new HdfsConfiguration();
@@ -103,7 +110,7 @@ public class TestEditLog extends TestCas
       FSEditLog editLog = fsimage.getEditLog();
   
       // set small size of flush buffer
-      editLog.setBufferCapacity(2048);
+      editLog.setBufferCapacity(initialSize);
       editLog.close();
       editLog.open();