You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/03/01 00:35:30 UTC

[1/2] hadoop git commit: HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0fa54d45b -> 215171683


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
index 0265a4d..87e2523 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
@@ -27,6 +27,8 @@ import static org.mockito.Mockito.spy;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -50,13 +52,38 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Sets;
 
 /**
  * This tests data recovery mode for the NameNode.
  */
+
+@RunWith(Parameterized.class)
 public class TestNameNodeRecovery {
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestNameNodeRecovery(Boolean async) {
+    useAsyncEditLog = async;
+  }
+
+  private static Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
+    return conf;
+  }
+
   private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
   private static final StartupOption recoverStartOpt = StartupOption.RECOVER;
   private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class);
@@ -73,7 +100,7 @@ public class TestNameNodeRecovery {
     EditLogFileOutputStream elfos = null;
     EditLogFileInputStream elfis = null;
     try {
-      elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
+      elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
       elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
 
       elts.addTransactionsToLog(elfos, cache);
@@ -525,7 +552,7 @@ public class TestNameNodeRecovery {
     final boolean needRecovery = corruptor.needRecovery(finalize);
 
     // start a cluster
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     setupRecoveryTestConf(conf);
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index 5a104ad..30db429 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -40,11 +43,31 @@ import org.apache.hadoop.net.ServerSocketUtil;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Supplier;
 
+@RunWith(Parameterized.class)
 public class TestEditLogTailer {
-  
+  static {
+    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestEditLogTailer(Boolean async) {
+    useAsyncEditLog = async;
+  }
+
   private static final String DIR_PREFIX = "/dir";
   private static final int DIRS_TO_MAKE = 20;
   static final long SLEEP_TIME = 1000;
@@ -52,13 +75,21 @@ public class TestEditLogTailer {
   
   static {
     GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
     GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL);
   }
-  
+
+  private static Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
+    return conf;
+  }
+
   @Test
   public void testTailer() throws IOException, InterruptedException,
       ServiceFailedException {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
 
@@ -122,7 +153,7 @@ public class TestEditLogTailer {
 
   private static void testStandbyTriggersLogRolls(int activeIndex)
       throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = getConf();
     // Roll every 1s
     conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);


[2/2] hadoop git commit: HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.

Posted by ji...@apache.org.
HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21517168
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21517168
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21517168

Branch: refs/heads/trunk
Commit: 2151716832ad14932dd65b1a4e47e64d8d6cd767
Parents: 0fa54d4
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Feb 29 15:34:43 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Feb 29 15:34:43 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../bkjournal/TestBookKeeperAsHASharedDir.java  |  46 ++-
 .../src/test/resources/log4j.properties         |   2 +-
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   6 +-
 .../hadoop/hdfs/server/namenode/BackupNode.java |   4 +
 .../hadoop/hdfs/server/namenode/FSEditLog.java  | 109 ++++---
 .../hdfs/server/namenode/FSEditLogAsync.java    | 322 +++++++++++++++++++
 .../hdfs/server/namenode/FSEditLogOp.java       | 213 ++++++------
 .../hdfs/server/namenode/FSEditLogOpCodes.java  | 108 ++++---
 .../hadoop/hdfs/server/namenode/FSImage.java    |   2 +-
 .../hadoop/hdfs/server/namenode/NameNode.java   |   1 -
 .../namenode/metrics/NameNodeMetrics.java       |   4 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   3 +
 .../hdfs/server/namenode/TestAuditLogs.java     |  15 +-
 .../hdfs/server/namenode/TestEditLog.java       |  59 +++-
 .../server/namenode/TestEditLogAutoroll.java    |  26 ++
 .../namenode/TestEditLogJournalFailures.java    |  35 +-
 .../hdfs/server/namenode/TestEditLogRace.java   | 144 +++++----
 .../server/namenode/TestFSEditLogLoader.java    |  37 ++-
 .../server/namenode/TestNameNodeRecovery.java   |  31 +-
 .../server/namenode/ha/TestEditLogTailer.java   |  39 ++-
 21 files changed, 904 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3d57efa..c3ea5ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1037,6 +1037,8 @@ Release 2.9.0 - UNRELEASED
     HDFS-9754. Avoid unnecessary getBlockCollection calls in BlockManager.
     (jing9)
 
+    HDFS-7964. Add support for async edit logging. (Daryn Sharp)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
index 5611bb8..ff8c00d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
@@ -24,6 +24,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
 
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
@@ -56,11 +59,14 @@ import org.apache.commons.logging.LogFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
 
 /**
  * Integration test to ensure that the BookKeeper JournalManager
  * works for HDFS Namenode HA
  */
+@RunWith(Parameterized.class)
 public class TestBookKeeperAsHASharedDir {
   static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
 
@@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir {
 
   private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
 
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestBookKeeperAsHASharedDir(Boolean async) {
+    useAsyncEditLog = async;
+  }
+
+  private static Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
+    return conf;
+  }
+
   @BeforeClass
   public static void setupBookkeeper() throws Exception {
     bkutil = new BKJMUtil(numBookies);
@@ -92,8 +119,7 @@ public class TestBookKeeperAsHASharedDir {
   public void testFailoverWithBK() throws Exception {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      Configuration conf = getConf();
       conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
                BKJMUtil.createJournalURI("/hotfailover").toString());
       BKJMUtil.addJournalManagerDefinition(conf);
@@ -144,8 +170,7 @@ public class TestBookKeeperAsHASharedDir {
     MiniDFSCluster cluster = null;
 
     try {
-      Configuration conf = new Configuration();
-      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      Configuration conf = getConf();
       conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
                BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
       conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
@@ -221,8 +246,7 @@ public class TestBookKeeperAsHASharedDir {
 
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      Configuration conf = getConf();
       conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
                BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
       BKJMUtil.addJournalManagerDefinition(conf);
@@ -245,7 +269,9 @@ public class TestBookKeeperAsHASharedDir {
       fs = cluster.getFileSystem(0); // get the older active server.
 
       try {
-        fs.delete(p1, true);
+        System.out.println("DMS: > *************");
+        boolean foo = fs.delete(p1, true);
+        System.out.println("DMS: < ************* "+foo);
         fail("Log update on older active should cause it to exit");
       } catch (RemoteException re) {
         assertTrue(re.getClassName().contains("ExitException"));
@@ -267,9 +293,8 @@ public class TestBookKeeperAsHASharedDir {
   public void testInitializeBKSharedEdits() throws Exception {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = getConf();
       HAUtil.setAllowStandbyReads(conf, true);
-      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
 
       MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
       cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
@@ -358,8 +383,7 @@ public class TestBookKeeperAsHASharedDir {
   public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
     MiniDFSCluster cluster = null;
     try {
-      Configuration conf = new Configuration();
-      conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      Configuration conf = getConf();
       conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
           .createJournalURI("/correctEditLogSelection").toString());
       BKJMUtil.addJournalManagerDefinition(conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
index 93c22f7..52aac43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
@@ -26,7 +26,7 @@
 # Format is "<default threshold> (, <appender>)+
 
 # DEFAULT: console appender only
-log4j.rootLogger=OFF, CONSOLE
+log4j.rootLogger=DEBUG, CONSOLE
 
 # Example with rolling log file
 #log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bfb6203..9c06e29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -291,7 +291,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   
   public static final String  DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
   public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
-  
+
+  public static final String  DFS_NAMENODE_EDITS_ASYNC_LOGGING =
+      "dfs.namenode.edits.asynclogging";
+  public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
+
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index 36053f7..d36e0b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -142,6 +142,10 @@ public class BackupNode extends NameNode {
 
   @Override // NameNode
   protected void initialize(Configuration conf) throws IOException {
+    // async edit logs are incompatible with backup node due to race
+    // conditions resulting from laxer synchronization
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false);
+
     // Trash is disabled in BackupNameNode,
     // but should be turned back on if it ever becomes active.
     conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index c8986dc..809d9e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -78,7 +78,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@@ -115,7 +117,7 @@ import com.google.common.collect.Lists;
 @InterfaceStability.Evolving
 public class FSEditLog implements LogsPurgeable {
 
-  static final Log LOG = LogFactory.getLog(FSEditLog.class);
+  public static final Log LOG = LogFactory.getLog(FSEditLog.class);
 
   /**
    * State machine for edit log.
@@ -178,17 +180,11 @@ public class FSEditLog implements LogsPurgeable {
 
   private final NNStorage storage;
   private final Configuration conf;
-  
+
   private final List<URI> editsDirs;
 
-  private final ThreadLocal<OpInstanceCache> cache =
-      new ThreadLocal<OpInstanceCache>() {
-    @Override
-    protected OpInstanceCache initialValue() {
-      return new OpInstanceCache();
-    }
-  };
-  
+  protected final OpInstanceCache cache = new OpInstanceCache();
+
   /**
    * The edit directories that are shared between primary and secondary.
    */
@@ -217,6 +213,17 @@ public class FSEditLog implements LogsPurgeable {
     }
   };
 
+  static FSEditLog newInstance(Configuration conf, NNStorage storage,
+      List<URI> editsDirs) {
+    boolean asyncEditLogging = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
+    LOG.info("Edit logging is async:" + asyncEditLogging);
+    return asyncEditLogging
+        ? new FSEditLogAsync(conf, storage, editsDirs)
+        : new FSEditLog(conf, storage, editsDirs);
+  }
+
   /**
    * Constructor for FSEditLog. Underlying journals are constructed, but 
    * no streams are opened until open() is called.
@@ -423,33 +430,35 @@ public class FSEditLog implements LogsPurgeable {
       
       // wait if an automatic sync is scheduled
       waitIfAutoSyncScheduled();
-      
-      long start = beginTransaction();
-      op.setTransactionId(txid);
-
-      try {
-        editLogStream.write(op);
-      } catch (IOException ex) {
-        // All journals failed, it is handled in logSync.
-      } finally {
-        op.reset();
-      }
 
-      endTransaction(start);
-      
       // check if it is time to schedule an automatic sync
-      needsSync = shouldForceSync();
+      needsSync = doEditTransaction(op);
       if (needsSync) {
         isAutoSyncScheduled = true;
       }
     }
-    
+
     // Sync the log if an automatic sync is required.
     if (needsSync) {
       logSync();
     }
   }
 
+  synchronized boolean doEditTransaction(final FSEditLogOp op) {
+    long start = beginTransaction();
+    op.setTransactionId(txid);
+
+    try {
+      editLogStream.write(op);
+    } catch (IOException ex) {
+      // All journals failed, it is handled in logSync.
+    } finally {
+      op.reset();
+    }
+    endTransaction(start);
+    return shouldForceSync();
+  }
+
   /**
    * Wait if an automatic sync is scheduled
    */
@@ -544,15 +553,10 @@ public class FSEditLog implements LogsPurgeable {
    * else more operations can start writing while this is in progress.
    */
   void logSyncAll() {
-    // Record the most recent transaction ID as our own id
-    synchronized (this) {
-      TransactionId id = myTransactionId.get();
-      id.txid = txid;
-    }
-    // Then make sure we're synced up to this point
-    logSync();
+    // Make sure we're synced up to the most recent transaction ID.
+    logSync(getLastWrittenTxId());
   }
-  
+
   /**
    * Sync all modifications done by this thread.
    *
@@ -582,12 +586,14 @@ public class FSEditLog implements LogsPurgeable {
    * waitForSyncToFinish() before assuming they are running alone.
    */
   public void logSync() {
-    long syncStart = 0;
+    // Fetch the transactionId of this thread.
+    logSync(myTransactionId.get().txid);
+  }
 
-    // Fetch the transactionId of this thread. 
-    long mytxid = myTransactionId.get().txid;
-    
+  protected void logSync(long mytxid) {
+    long syncStart = 0;
     boolean sync = false;
+    long editsBatchedInSync = 0;
     try {
       EditLogOutputStream logStream = null;
       synchronized (this) {
@@ -606,19 +612,17 @@ public class FSEditLog implements LogsPurgeable {
           // If this transaction was already flushed, then nothing to do
           //
           if (mytxid <= synctxid) {
-            numTransactionsBatchedInSync++;
-            if (metrics != null) {
-              // Metrics is non-null only when used inside name node
-              metrics.incrTransactionsBatchedInSync();
-            }
             return;
           }
-     
-          // now, this thread will do the sync
+
+          // now, this thread will do the sync.  track if other edits were
+          // included in the sync - ie. batched.  if this is the only edit
+          // synced then the batched count is 0
+          editsBatchedInSync = txid - synctxid - 1;
           syncStart = txid;
           isSyncRunning = true;
           sync = true;
-  
+
           // swap buffers
           try {
             if (journalSet.isEmpty()) {
@@ -667,6 +671,8 @@ public class FSEditLog implements LogsPurgeable {
   
       if (metrics != null) { // Metrics non-null only when used inside name node
         metrics.addSync(elapsed);
+        metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
+        numTransactionsBatchedInSync += editsBatchedInSync;
       }
       
     } finally {
@@ -1138,13 +1144,13 @@ public class FSEditLog implements LogsPurgeable {
   }
 
   void logStartRollingUpgrade(long startTime) {
-    RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get());
+    RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get());
     op.setTime(startTime);
     logEdit(op);
   }
 
   void logFinalizeRollingUpgrade(long finalizeTime) {
-    RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get());
+    RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get());
     op.setTime(finalizeTime);
     logEdit(op);
   }
@@ -1313,8 +1319,9 @@ public class FSEditLog implements LogsPurgeable {
     if (writeEndTxn) {
       logEdit(LogSegmentOp.getInstance(cache.get(), 
           FSEditLogOpCodes.OP_END_LOG_SEGMENT));
-      logSync();
     }
+    // always sync to ensure all edits are flushed.
+    logSyncAll();
 
     printStatistics(true);
     
@@ -1701,6 +1708,12 @@ public class FSEditLog implements LogsPurgeable {
     }
   }
 
+  @VisibleForTesting
+  // needed by async impl to restart thread when edit log is replaced by a
+  // spy because a spy is a shallow copy
+  public void restart() {
+  }
+
   /**
    * Return total number of syncs happened on this edit log.
    * @return long - count

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
new file mode 100644
index 0000000..c14a310
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.util.ExitUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+class FSEditLogAsync extends FSEditLog implements Runnable {
+  static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
+  // use separate mutex to avoid possible deadlock when stopping the thread.
+  private final Object syncThreadLock = new Object();
+  private Thread syncThread;
+  private static ThreadLocal<Edit> threadEdit = new ThreadLocal<Edit>();
+
+  // requires concurrent access from caller threads and syncing thread.
+  private final BlockingQueue<Edit> editPendingQ =
+      new ArrayBlockingQueue<Edit>(4096);
+
+  // only accessed by syncing thread so no synchronization required.
+  // queue is unbounded because it's effectively limited by the size
+  // of the edit log buffer - ie. a sync will eventually be forced.
+  private final Deque<Edit> syncWaitQ = new ArrayDeque<Edit>();
+
+  FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
+    super(conf, storage, editsDirs);
+    // op instances cannot be shared due to queuing for background thread.
+    cache.disableCache();
+  }
+
+  private boolean isSyncThreadAlive() {
+    synchronized(syncThreadLock) {
+      return syncThread != null && syncThread.isAlive();
+    }
+  }
+
+  private void startSyncThread() {
+    synchronized(syncThreadLock) {
+      if (!isSyncThreadAlive()) {
+        syncThread = new Thread(this, this.getClass().getSimpleName());
+        syncThread.start();
+      }
+    }
+  }
+
+  private void stopSyncThread() {
+    synchronized(syncThreadLock) {
+      if (syncThread != null) {
+        try {
+          syncThread.interrupt();
+          syncThread.join();
+        } catch (InterruptedException e) {
+          // we're quitting anyway.
+        } finally {
+          syncThread = null;
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  @Override
+  public void restart() {
+    stopSyncThread();
+    startSyncThread();
+  }
+
+  @Override
+  void openForWrite(int layoutVersion) throws IOException {
+    try {
+      startSyncThread();
+      super.openForWrite(layoutVersion);
+    } catch (IOException ioe) {
+      stopSyncThread();
+      throw ioe;
+    }
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    stopSyncThread();
+  }
+
+  @Override
+  void logEdit(final FSEditLogOp op) {
+    Edit edit = getEditInstance(op);
+    threadEdit.set(edit);
+    enqueueEdit(edit);
+  }
+
+  @Override
+  public void logSync() {
+    Edit edit = threadEdit.get();
+    if (edit != null) {
+      // do NOT remove to avoid expunge & rehash penalties.
+      threadEdit.set(null);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("logSync " + edit);
+      }
+      edit.logSyncWait();
+    }
+  }
+
+  @Override
+  public void logSyncAll() {
+    // doesn't actually log anything, just ensures that the queues are
+    // drained when it returns.
+    Edit edit = new SyncEdit(this, null){
+      @Override
+      public boolean logEdit() {
+        return true;
+      }
+    };
+    enqueueEdit(edit);
+    edit.logSyncWait();
+  }
+
+  private void enqueueEdit(Edit edit) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("logEdit " + edit);
+    }
+    try {
+      if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
+        Preconditions.checkState(
+            isSyncThreadAlive(), "sync thread is not alive");
+        editPendingQ.put(edit);
+      }
+    } catch (Throwable t) {
+      // should never happen!  failure to enqueue an edit is fatal
+      terminate(t);
+    }
+  }
+
+  private Edit dequeueEdit() throws InterruptedException {
+    // only block for next edit if no pending syncs.
+    return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll();
+  }
+
+  @Override
+  public void run() {
+    try {
+      while (true) {
+        boolean doSync;
+        Edit edit = dequeueEdit();
+        if (edit != null) {
+          // sync if requested by edit log.
+          doSync = edit.logEdit();
+          syncWaitQ.add(edit);
+        } else {
+          // sync when editq runs dry, but have edits pending a sync.
+          doSync = !syncWaitQ.isEmpty();
+        }
+        if (doSync) {
+          // normally edit log exceptions cause the NN to terminate, but tests
+          // relying on ExitUtil.terminate need to see the exception.
+          RuntimeException syncEx = null;
+          try {
+            logSync(getLastWrittenTxId());
+          } catch (RuntimeException ex) {
+            syncEx = ex;
+          }
+          while ((edit = syncWaitQ.poll()) != null) {
+            edit.logSyncNotify(syncEx);
+          }
+        }
+      }
+    } catch (InterruptedException ie) {
+      LOG.info(Thread.currentThread().getName() + " was interrupted, exiting");
+    } catch (Throwable t) {
+      terminate(t);
+    }
+  }
+
+  private void terminate(Throwable t) {
+    String message = "Exception while edit logging: "+t.getMessage();
+    LOG.fatal(message, t);
+    ExitUtil.terminate(1, message);
+  }
+
+  private Edit getEditInstance(FSEditLogOp op) {
+    final Edit edit;
+    final Server.Call rpcCall = Server.getCurCall().get();
+    // only rpc calls not explicitly sync'ed on the log will be async.
+    if (rpcCall != null && !Thread.holdsLock(this)) {
+      edit = new RpcEdit(this, op, rpcCall);
+    } else {
+      edit = new SyncEdit(this, op);
+    }
+    return edit;
+  }
+
+  private abstract static class Edit {
+    final FSEditLog log;
+    final FSEditLogOp op;
+
+    Edit(FSEditLog log, FSEditLogOp op) {
+      this.log = log;
+      this.op = op;
+    }
+
+    // return whether edit log wants to sync.
+    boolean logEdit() {
+      return log.doEditTransaction(op);
+    }
+
+    // wait for background thread to finish syncing.
+    abstract void logSyncWait();
+    // wake up the thread in logSyncWait.
+    abstract void logSyncNotify(RuntimeException ex);
+  }
+
+  // the calling thread is synchronously waiting for the edit to complete.
+  private static class SyncEdit extends Edit {
+    private final Object lock;
+    private boolean done = false;
+    private RuntimeException syncEx;
+
+    SyncEdit(FSEditLog log, FSEditLogOp op) {
+      super(log, op);
+      // if the log is already sync'ed (ex. log rolling), must wait on it to
+      // avoid deadlock with sync thread.  the fsn lock protects against
+      // logging during a roll.  else lock on this object to avoid sync
+      // contention on edit log.
+      lock = Thread.holdsLock(log) ? log : this;
+    }
+
+    @Override
+    public void logSyncWait() {
+      synchronized(lock) {
+        while (!done) {
+          try {
+            lock.wait(10);
+          } catch (InterruptedException e) {}
+        }
+        // only needed by tests that rely on ExitUtil.terminate() since
+        // normally exceptions terminate the NN.
+        if (syncEx != null) {
+          syncEx.fillInStackTrace();
+          throw syncEx;
+        }
+      }
+    }
+
+    @Override
+    public void logSyncNotify(RuntimeException ex) {
+      synchronized(lock) {
+        done = true;
+        syncEx = ex;
+        lock.notifyAll();
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "["+getClass().getSimpleName()+" op:"+op+"]";
+    }
+  }
+
+  // the calling rpc thread will return immediately from logSync but the
+  // rpc response will not be sent until the edit is durable.
+  private static class RpcEdit extends Edit {
+    private final Server.Call call;
+
+    RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) {
+      super(log, op);
+      this.call = call;
+      call.postponeResponse();
+    }
+
+    @Override
+    public void logSyncWait() {
+      // logSync is a no-op to immediately free up rpc handlers.  the
+      // response is sent when the sync thread calls syncNotify.
+    }
+
+    @Override
+    public void logSyncNotify(RuntimeException syncEx) {
+      try {
+        if (syncEx == null) {
+          call.sendResponse();
+        } else {
+          call.abortResponse(syncEx);
+        }
+      } catch (Exception e) {} // don't care if not sent.
+    }
+
+    @Override
+    public String toString() {
+      return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index a8389f0..c4e1a78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -147,6 +147,55 @@ public abstract class FSEditLogOp {
   byte[] rpcClientId;
   int rpcCallId;
 
+  public static class OpInstanceCache {
+    private static ThreadLocal<OpInstanceCacheMap> cache =
+        new ThreadLocal<OpInstanceCacheMap>() {
+      @Override
+      protected OpInstanceCacheMap initialValue() {
+        return new OpInstanceCacheMap();
+      }
+    };
+
+    @SuppressWarnings("serial")
+    static final class OpInstanceCacheMap extends
+        EnumMap<FSEditLogOpCodes, FSEditLogOp> {
+      OpInstanceCacheMap() {
+        super(FSEditLogOpCodes.class);
+        for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
+          put(opCode, newInstance(opCode));
+        }
+      }
+    }
+
+    private boolean useCache = true;
+
+    void disableCache() {
+      useCache = false;
+    }
+
+    public OpInstanceCache get() {
+      return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) {
+      return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode);
+    }
+
+    private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) {
+      FSEditLogOp instance = null;
+      Class<? extends FSEditLogOp> clazz = opCode.getOpClass();
+      if (clazz != null) {
+        try {
+          instance = clazz.newInstance();
+        } catch (Exception ex) {
+          throw new RuntimeException("Failed to instantiate "+opCode, ex);
+        }
+      }
+      return instance;
+    }
+  }
+
   final void reset() {
     txid = HdfsServerConstants.INVALID_TXID;
     rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
@@ -156,70 +205,6 @@ public abstract class FSEditLogOp {
 
   abstract void resetSubFields();
 
-  final public static class OpInstanceCache {
-    private final EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
-        new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
-    
-    public OpInstanceCache() {
-      inst.put(OP_ADD, new AddOp());
-      inst.put(OP_CLOSE, new CloseOp());
-      inst.put(OP_SET_REPLICATION, new SetReplicationOp());
-      inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
-      inst.put(OP_RENAME_OLD, new RenameOldOp());
-      inst.put(OP_DELETE, new DeleteOp());
-      inst.put(OP_MKDIR, new MkdirOp());
-      inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
-      inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
-      inst.put(OP_SET_OWNER, new SetOwnerOp());
-      inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
-      inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
-      inst.put(OP_SET_QUOTA, new SetQuotaOp());
-      inst.put(OP_TIMES, new TimesOp());
-      inst.put(OP_SYMLINK, new SymlinkOp());
-      inst.put(OP_RENAME, new RenameOp());
-      inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
-      inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
-      inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
-      inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp());
-      inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
-      inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
-      inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
-      inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
-      inst.put(OP_TRUNCATE, new TruncateOp());
-
-      inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
-      inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
-      inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
-      inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
-      inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
-      inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
-      inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
-
-      inst.put(OP_ADD_CACHE_DIRECTIVE, new AddCacheDirectiveInfoOp());
-      inst.put(OP_MODIFY_CACHE_DIRECTIVE, new ModifyCacheDirectiveInfoOp());
-      inst.put(OP_REMOVE_CACHE_DIRECTIVE, new RemoveCacheDirectiveInfoOp());
-      inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
-      inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
-      inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
-
-      inst.put(OP_ADD_BLOCK, new AddBlockOp());
-      inst.put(OP_SET_ACL, new SetAclOp());
-      inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
-          OP_ROLLING_UPGRADE_START, "start"));
-      inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
-          OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
-      inst.put(OP_SET_XATTR, new SetXAttrOp());
-      inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
-      inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
-      inst.put(OP_APPEND, new AppendOp());
-      inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp());
-    }
-    
-    public FSEditLogOp get(FSEditLogOpCodes opcode) {
-      return inst.get(opcode);
-    }
-  }
-
   private static ImmutableMap<String, FsAction> fsActionMap() {
     ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
     for (FsAction v : FsAction.values())
@@ -774,7 +759,7 @@ public abstract class FSEditLogOp {
    * {@link ClientProtocol#append}
    */
   static class AddOp extends AddCloseOp {
-    private AddOp() {
+    AddOp() {
       super(OP_ADD);
     }
 
@@ -802,7 +787,7 @@ public abstract class FSEditLogOp {
    * finally log an AddOp.
    */
   static class CloseOp extends AddCloseOp {
-    private CloseOp() {
+    CloseOp() {
       super(OP_CLOSE);
     }
 
@@ -830,7 +815,7 @@ public abstract class FSEditLogOp {
     String clientMachine;
     boolean newBlock;
 
-    private AppendOp() {
+    AppendOp() {
       super(OP_APPEND);
     }
 
@@ -920,7 +905,7 @@ public abstract class FSEditLogOp {
     private Block penultimateBlock;
     private Block lastBlock;
     
-    private AddBlockOp() {
+    AddBlockOp() {
       super(OP_ADD_BLOCK);
     }
     
@@ -1032,7 +1017,7 @@ public abstract class FSEditLogOp {
     String path;
     Block[] blocks;
     
-    private UpdateBlocksOp() {
+    UpdateBlocksOp() {
       super(OP_UPDATE_BLOCKS);
     }
     
@@ -1125,7 +1110,7 @@ public abstract class FSEditLogOp {
     String path;
     short replication;
 
-    private SetReplicationOp() {
+    SetReplicationOp() {
       super(OP_SET_REPLICATION);
     }
 
@@ -1204,7 +1189,7 @@ public abstract class FSEditLogOp {
     long timestamp;
     final static public int MAX_CONCAT_SRC = 1024 * 1024;
 
-    private ConcatDeleteOp() {
+    ConcatDeleteOp() {
       super(OP_CONCAT_DELETE);
     }
 
@@ -1362,7 +1347,7 @@ public abstract class FSEditLogOp {
     String dst;
     long timestamp;
 
-    private RenameOldOp() {
+    RenameOldOp() {
       super(OP_RENAME_OLD);
     }
 
@@ -1474,7 +1459,7 @@ public abstract class FSEditLogOp {
     String path;
     long timestamp;
 
-    private DeleteOp() {
+    DeleteOp() {
       super(OP_DELETE);
     }
 
@@ -1575,7 +1560,7 @@ public abstract class FSEditLogOp {
     List<AclEntry> aclEntries;
     List<XAttr> xAttrs;
 
-    private MkdirOp() {
+    MkdirOp() {
       super(OP_MKDIR);
     }
     
@@ -1748,7 +1733,7 @@ public abstract class FSEditLogOp {
   static class SetGenstampV1Op extends FSEditLogOp {
     long genStampV1;
 
-    private SetGenstampV1Op() {
+    SetGenstampV1Op() {
       super(OP_SET_GENSTAMP_V1);
     }
 
@@ -1806,7 +1791,7 @@ public abstract class FSEditLogOp {
   static class SetGenstampV2Op extends FSEditLogOp {
     long genStampV2;
 
-    private SetGenstampV2Op() {
+    SetGenstampV2Op() {
       super(OP_SET_GENSTAMP_V2);
     }
 
@@ -1864,7 +1849,7 @@ public abstract class FSEditLogOp {
   static class AllocateBlockIdOp extends FSEditLogOp {
     long blockId;
 
-    private AllocateBlockIdOp() {
+    AllocateBlockIdOp() {
       super(OP_ALLOCATE_BLOCK_ID);
     }
 
@@ -1923,7 +1908,7 @@ public abstract class FSEditLogOp {
     String src;
     FsPermission permissions;
 
-    private SetPermissionsOp() {
+    SetPermissionsOp() {
       super(OP_SET_PERMISSIONS);
     }
 
@@ -1996,7 +1981,7 @@ public abstract class FSEditLogOp {
     String username;
     String groupname;
 
-    private SetOwnerOp() {
+    SetOwnerOp() {
       super(OP_SET_OWNER);
     }
 
@@ -2083,7 +2068,7 @@ public abstract class FSEditLogOp {
     String src;
     long nsQuota;
 
-    private SetNSQuotaOp() {
+    SetNSQuotaOp() {
       super(OP_SET_NS_QUOTA);
     }
 
@@ -2141,7 +2126,7 @@ public abstract class FSEditLogOp {
   static class ClearNSQuotaOp extends FSEditLogOp {
     String src;
 
-    private ClearNSQuotaOp() {
+    ClearNSQuotaOp() {
       super(OP_CLEAR_NS_QUOTA);
     }
 
@@ -2195,7 +2180,7 @@ public abstract class FSEditLogOp {
     long nsQuota;
     long dsQuota;
 
-    private SetQuotaOp() {
+    SetQuotaOp() {
       super(OP_SET_QUOTA);
     }
 
@@ -2280,7 +2265,7 @@ public abstract class FSEditLogOp {
     long dsQuota;
     StorageType type;
 
-    private SetQuotaByStorageTypeOp() {
+    SetQuotaByStorageTypeOp() {
       super(OP_SET_QUOTA_BY_STORAGETYPE);
     }
 
@@ -2363,7 +2348,7 @@ public abstract class FSEditLogOp {
     long mtime;
     long atime;
 
-    private TimesOp() {
+    TimesOp() {
       super(OP_TIMES);
     }
 
@@ -2472,7 +2457,7 @@ public abstract class FSEditLogOp {
     long atime;
     PermissionStatus permissionStatus;
 
-    private SymlinkOp() {
+    SymlinkOp() {
       super(OP_SYMLINK);
     }
 
@@ -2631,7 +2616,7 @@ public abstract class FSEditLogOp {
     long timestamp;
     Rename[] options;
 
-    private RenameOp() {
+    RenameOp() {
       super(OP_RENAME);
     }
 
@@ -2796,7 +2781,7 @@ public abstract class FSEditLogOp {
     long timestamp;
     Block truncateBlock;
 
-    private TruncateOp() {
+    TruncateOp() {
       super(OP_TRUNCATE);
     }
 
@@ -2929,7 +2914,7 @@ public abstract class FSEditLogOp {
     String path;
     String newHolder;
 
-    private ReassignLeaseOp() {
+    ReassignLeaseOp() {
       super(OP_REASSIGN_LEASE);
     }
 
@@ -3011,7 +2996,7 @@ public abstract class FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
 
-    private GetDelegationTokenOp() {
+    GetDelegationTokenOp() {
       super(OP_GET_DELEGATION_TOKEN);
     }
 
@@ -3090,7 +3075,7 @@ public abstract class FSEditLogOp {
     DelegationTokenIdentifier token;
     long expiryTime;
 
-    private RenewDelegationTokenOp() {
+    RenewDelegationTokenOp() {
       super(OP_RENEW_DELEGATION_TOKEN);
     }
 
@@ -3168,7 +3153,7 @@ public abstract class FSEditLogOp {
   static class CancelDelegationTokenOp extends FSEditLogOp {
     DelegationTokenIdentifier token;
 
-    private CancelDelegationTokenOp() {
+    CancelDelegationTokenOp() {
       super(OP_CANCEL_DELEGATION_TOKEN);
     }
 
@@ -3227,7 +3212,7 @@ public abstract class FSEditLogOp {
   static class UpdateMasterKeyOp extends FSEditLogOp {
     DelegationKey key;
 
-    private UpdateMasterKeyOp() {
+    UpdateMasterKeyOp() {
       super(OP_UPDATE_MASTER_KEY);
     }
 
@@ -3332,8 +3317,20 @@ public abstract class FSEditLogOp {
     }
   }
 
+  static class StartLogSegmentOp extends LogSegmentOp {
+    StartLogSegmentOp() {
+      super(OP_START_LOG_SEGMENT);
+    }
+  }
+
+  static class EndLogSegmentOp extends LogSegmentOp {
+    EndLogSegmentOp() {
+      super(OP_END_LOG_SEGMENT);
+    }
+  }
+
   static class InvalidOp extends FSEditLogOp {
-    private InvalidOp() {
+    InvalidOp() {
       super(OP_INVALID);
     }
 
@@ -4144,7 +4141,7 @@ public abstract class FSEditLogOp {
     List<XAttr> xAttrs;
     String src;
     
-    private RemoveXAttrOp() {
+    RemoveXAttrOp() {
       super(OP_REMOVE_XATTR);
     }
     
@@ -4197,7 +4194,7 @@ public abstract class FSEditLogOp {
     List<XAttr> xAttrs;
     String src;
     
-    private SetXAttrOp() {
+    SetXAttrOp() {
       super(OP_SET_XATTR);
     }
     
@@ -4250,7 +4247,7 @@ public abstract class FSEditLogOp {
     List<AclEntry> aclEntries = Lists.newArrayList();
     String src;
 
-    private SetAclOp() {
+    SetAclOp() {
       super(OP_SET_ACL);
     }
 
@@ -4347,7 +4344,7 @@ public abstract class FSEditLogOp {
   /**
    * Operation corresponding to upgrade
    */
-  static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
+  abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
     private final String name;
     private long time;
 
@@ -4414,7 +4411,7 @@ public abstract class FSEditLogOp {
     String path;
     byte policyId;
 
-    private SetStoragePolicyOp() {
+    SetStoragePolicyOp() {
       super(OP_SET_STORAGE_POLICY);
     }
 
@@ -4480,6 +4477,26 @@ public abstract class FSEditLogOp {
     }
   }  
 
+  static class RollingUpgradeStartOp extends RollingUpgradeOp {
+    RollingUpgradeStartOp() {
+      super(OP_ROLLING_UPGRADE_START, "start");
+    }
+
+    static RollingUpgradeStartOp getInstance(OpInstanceCache cache) {
+      return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START);
+    }
+  }
+
+  static class RollingUpgradeFinalizeOp extends RollingUpgradeOp {
+    RollingUpgradeFinalizeOp() {
+      super(OP_ROLLING_UPGRADE_FINALIZE, "finalize");
+    }
+
+    static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
+      return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
+    }
+  }
+
   /**
    * Class for writing editlog ops
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 1a0a296..3f8feba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
 
 /**
  * Op codes for edits file
@@ -27,60 +28,64 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public enum FSEditLogOpCodes {
   // last op code in file
-  OP_ADD                        ((byte)  0),
-  OP_RENAME_OLD                 ((byte)  1), // deprecated operation
-  OP_DELETE                     ((byte)  2),
-  OP_MKDIR                      ((byte)  3),
-  OP_SET_REPLICATION            ((byte)  4),
+  OP_ADD                        ((byte)  0, AddOp.class),
+  // deprecated operation
+  OP_RENAME_OLD                 ((byte)  1, RenameOldOp.class),
+  OP_DELETE                     ((byte)  2, DeleteOp.class),
+  OP_MKDIR                      ((byte)  3, MkdirOp.class),
+  OP_SET_REPLICATION            ((byte)  4, SetReplicationOp.class),
   @Deprecated OP_DATANODE_ADD   ((byte)  5), // obsolete
   @Deprecated OP_DATANODE_REMOVE((byte)  6), // obsolete
-  OP_SET_PERMISSIONS            ((byte)  7),
-  OP_SET_OWNER                  ((byte)  8),
-  OP_CLOSE                      ((byte)  9),
-  OP_SET_GENSTAMP_V1            ((byte) 10),
-  OP_SET_NS_QUOTA               ((byte) 11), // obsolete
-  OP_CLEAR_NS_QUOTA             ((byte) 12), // obsolete
-  OP_TIMES                      ((byte) 13), // set atime, mtime
-  OP_SET_QUOTA                  ((byte) 14),
-  OP_RENAME                     ((byte) 15), // filecontext rename
-  OP_CONCAT_DELETE              ((byte) 16), // concat files
-  OP_SYMLINK                    ((byte) 17),
-  OP_GET_DELEGATION_TOKEN       ((byte) 18),
-  OP_RENEW_DELEGATION_TOKEN     ((byte) 19),
-  OP_CANCEL_DELEGATION_TOKEN    ((byte) 20),
-  OP_UPDATE_MASTER_KEY          ((byte) 21),
-  OP_REASSIGN_LEASE             ((byte) 22),
-  OP_END_LOG_SEGMENT            ((byte) 23),
-  OP_START_LOG_SEGMENT          ((byte) 24),
-  OP_UPDATE_BLOCKS              ((byte) 25),
-  OP_CREATE_SNAPSHOT            ((byte) 26),
-  OP_DELETE_SNAPSHOT            ((byte) 27),
-  OP_RENAME_SNAPSHOT            ((byte) 28),
-  OP_ALLOW_SNAPSHOT             ((byte) 29),
-  OP_DISALLOW_SNAPSHOT          ((byte) 30),
-  OP_SET_GENSTAMP_V2            ((byte) 31),
-  OP_ALLOCATE_BLOCK_ID          ((byte) 32),
-  OP_ADD_BLOCK                  ((byte) 33),
-  OP_ADD_CACHE_DIRECTIVE       ((byte) 34),
-  OP_REMOVE_CACHE_DIRECTIVE    ((byte) 35),
-  OP_ADD_CACHE_POOL                       ((byte) 36),
-  OP_MODIFY_CACHE_POOL                    ((byte) 37),
-  OP_REMOVE_CACHE_POOL                    ((byte) 38),
-  OP_MODIFY_CACHE_DIRECTIVE     ((byte) 39),
-  OP_SET_ACL                    ((byte) 40),
-  OP_ROLLING_UPGRADE_START      ((byte) 41),
-  OP_ROLLING_UPGRADE_FINALIZE   ((byte) 42),
-  OP_SET_XATTR                  ((byte) 43),
-  OP_REMOVE_XATTR               ((byte) 44),
-  OP_SET_STORAGE_POLICY         ((byte) 45),
-  OP_TRUNCATE                   ((byte) 46),
-  OP_APPEND                     ((byte) 47),
-  OP_SET_QUOTA_BY_STORAGETYPE   ((byte) 48),
+  OP_SET_PERMISSIONS            ((byte)  7, SetPermissionsOp.class),
+  OP_SET_OWNER                  ((byte)  8, SetOwnerOp.class),
+  OP_CLOSE                      ((byte)  9, CloseOp.class),
+  OP_SET_GENSTAMP_V1            ((byte) 10, SetGenstampV1Op.class),
+  OP_SET_NS_QUOTA               ((byte) 11, SetNSQuotaOp.class), // obsolete
+  OP_CLEAR_NS_QUOTA             ((byte) 12, ClearNSQuotaOp.class), // obsolete
+  OP_TIMES                      ((byte) 13, TimesOp.class), // set atime, mtime
+  OP_SET_QUOTA                  ((byte) 14, SetQuotaOp.class),
+  // filecontext rename
+  OP_RENAME                     ((byte) 15, RenameOp.class),
+  // concat files
+  OP_CONCAT_DELETE              ((byte) 16, ConcatDeleteOp.class),
+  OP_SYMLINK                    ((byte) 17, SymlinkOp.class),
+  OP_GET_DELEGATION_TOKEN       ((byte) 18, GetDelegationTokenOp.class),
+  OP_RENEW_DELEGATION_TOKEN     ((byte) 19, RenewDelegationTokenOp.class),
+  OP_CANCEL_DELEGATION_TOKEN    ((byte) 20, CancelDelegationTokenOp.class),
+  OP_UPDATE_MASTER_KEY          ((byte) 21, UpdateMasterKeyOp.class),
+  OP_REASSIGN_LEASE             ((byte) 22, ReassignLeaseOp.class),
+  OP_END_LOG_SEGMENT            ((byte) 23, EndLogSegmentOp.class),
+  OP_START_LOG_SEGMENT          ((byte) 24, StartLogSegmentOp.class),
+  OP_UPDATE_BLOCKS              ((byte) 25, UpdateBlocksOp.class),
+  OP_CREATE_SNAPSHOT            ((byte) 26, CreateSnapshotOp.class),
+  OP_DELETE_SNAPSHOT            ((byte) 27, DeleteSnapshotOp.class),
+  OP_RENAME_SNAPSHOT            ((byte) 28, RenameSnapshotOp.class),
+  OP_ALLOW_SNAPSHOT             ((byte) 29, AllowSnapshotOp.class),
+  OP_DISALLOW_SNAPSHOT          ((byte) 30, DisallowSnapshotOp.class),
+  OP_SET_GENSTAMP_V2            ((byte) 31, SetGenstampV2Op.class),
+  OP_ALLOCATE_BLOCK_ID          ((byte) 32, AllocateBlockIdOp.class),
+  OP_ADD_BLOCK                  ((byte) 33, AddBlockOp.class),
+  OP_ADD_CACHE_DIRECTIVE        ((byte) 34, AddCacheDirectiveInfoOp.class),
+  OP_REMOVE_CACHE_DIRECTIVE     ((byte) 35, RemoveCacheDirectiveInfoOp.class),
+  OP_ADD_CACHE_POOL             ((byte) 36, AddCachePoolOp.class),
+  OP_MODIFY_CACHE_POOL          ((byte) 37, ModifyCachePoolOp.class),
+  OP_REMOVE_CACHE_POOL          ((byte) 38, RemoveCachePoolOp.class),
+  OP_MODIFY_CACHE_DIRECTIVE     ((byte) 39, ModifyCacheDirectiveInfoOp.class),
+  OP_SET_ACL                    ((byte) 40, SetAclOp.class),
+  OP_ROLLING_UPGRADE_START      ((byte) 41, RollingUpgradeStartOp.class),
+  OP_ROLLING_UPGRADE_FINALIZE   ((byte) 42, RollingUpgradeFinalizeOp.class),
+  OP_SET_XATTR                  ((byte) 43, SetXAttrOp.class),
+  OP_REMOVE_XATTR               ((byte) 44, RemoveXAttrOp.class),
+  OP_SET_STORAGE_POLICY         ((byte) 45, SetStoragePolicyOp.class),
+  OP_TRUNCATE                   ((byte) 46, TruncateOp.class),
+  OP_APPEND                     ((byte) 47, AppendOp.class),
+  OP_SET_QUOTA_BY_STORAGETYPE   ((byte) 48, SetQuotaByStorageTypeOp.class),
 
   // Note that the current range of the valid OP code is 0~127
   OP_INVALID                    ((byte) -1);
 
   private final byte opCode;
+  private final Class<? extends FSEditLogOp> opClass;
 
   /**
    * Constructor
@@ -88,7 +93,12 @@ public enum FSEditLogOpCodes {
    * @param opCode byte value of constructed enum
    */
   FSEditLogOpCodes(byte opCode) {
+    this(opCode, null);
+  }
+
+  FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {
     this.opCode = opCode;
+    this.opClass = opClass;
   }
 
   /**
@@ -100,6 +110,10 @@ public enum FSEditLogOpCodes {
     return opCode;
   }
 
+  public Class<? extends FSEditLogOp> getOpClass() {
+    return opClass;
+  }
+
   private static final FSEditLogOpCodes[] VALUES;
   
   static {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 084f82a..b637105 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -140,7 +140,7 @@ public class FSImage implements Closeable {
       storage.setRestoreFailedStorage(true);
     }
 
-    this.editLog = new FSEditLog(conf, storage, editsDirs);
+    this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
     archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
   }
  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index f376901..e8900ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -1263,7 +1263,6 @@ public class NameNode implements NameNodeStatusMXBean {
           newSharedEditLog.logEdit(op);
 
           if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
-            newSharedEditLog.logSync();
             newSharedEditLog.endCurrentLogSegment(false);
             LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
                 + stream);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 54b5c6e..ce0b050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -283,8 +283,8 @@ public class NameNodeMetrics {
     transactions.add(latency);
   }
 
-  public void incrTransactionsBatchedInSync() {
-    transactionsBatchedInSync.incr();
+  public void incrTransactionsBatchedInSync(long count) {
+    transactionsBatchedInSync.incr(count);
   }
 
   public void addSync(long elapsed) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index dc1853a..0b08996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -267,6 +267,9 @@ public class DFSTestUtil {
   }
 
   public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
+    // spies are shallow copies, must allow async log to restart its thread
+    // so it has the new copy
+    newLog.restart();
     Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
     Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
index c79e0c2..9b42cac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
@@ -69,17 +69,21 @@ import org.junit.runners.Parameterized.Parameters;
 public class TestAuditLogs {
   static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
   final boolean useAsyncLog;
-  
+  final boolean useAsyncEdits;
+
   @Parameters
   public static Collection<Object[]> data() {
     Collection<Object[]> params = new ArrayList<Object[]>();
-    params.add(new Object[]{new Boolean(false)});
-    params.add(new Object[]{new Boolean(true)});
+    params.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
+    params.add(new Object[]{Boolean.TRUE,  Boolean.FALSE});
+    params.add(new Object[]{Boolean.FALSE, Boolean.TRUE});
+    params.add(new Object[]{Boolean.TRUE,  Boolean.TRUE});
     return params;
   }
-  
-  public TestAuditLogs(boolean useAsyncLog) {
+
+  public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
     this.useAsyncLog = useAsyncLog;
+    this.useAsyncEdits = useAsyncEdits;
   }
 
   // Pattern for: 
@@ -116,6 +120,7 @@ public class TestAuditLogs {
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits);
     util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
         setNumFiles(20).build();
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 14240e0..1eb377a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -88,6 +88,9 @@ import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.spi.LoggingEvent;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
@@ -98,12 +101,33 @@ import com.google.common.collect.Lists;
 /**
  * This class tests the creation and validation of a checkpoint.
  */
+@RunWith(Parameterized.class)
 public class TestEditLog {
-  
+
   static {
     GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
   }
 
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestEditLog(Boolean async) {
+    useAsyncEditLog = async;
+  }
+
+  public static Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
+    return conf;
+  }
+
   /**
    * A garbage mkdir op which is used for testing
    * {@link EditLogFileInputStream#scanEditLog(File)}
@@ -225,11 +249,12 @@ public class TestEditLog {
    * @param storage Storage object used by namenode
    */
   private static FSEditLog getFSEditLog(NNStorage storage) throws IOException {
-    Configuration conf = new Configuration();
+    Configuration conf = getConf();
     // Make sure the edits dirs are set in the provided configuration object.
     conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         StringUtils.join(",", storage.getEditsDirectories()));
-    FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
+    FSEditLog log = FSEditLog.newInstance(
+        conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
     return log;
   }
 
@@ -252,7 +277,7 @@ public class TestEditLog {
    */
   @Test
   public void testPreTxidEditLogWithEdits() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
 
     try {
@@ -282,7 +307,7 @@ public class TestEditLog {
   @Test
   public void testSimpleEditLog() throws IOException {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     try {
@@ -351,7 +376,7 @@ public class TestEditLog {
   private void testEditLog(int initialSize) throws IOException {
 
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
 
@@ -482,8 +507,12 @@ public class TestEditLog {
 
   @Test
   public void testSyncBatching() throws Exception {
-    // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    if (useAsyncEditLog) {
+      // semantics are completely differently since edits will be auto-synced
+      return;
+    }
+    // start a cluster
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     ExecutorService threadA = Executors.newSingleThreadExecutor();
@@ -546,7 +575,7 @@ public class TestEditLog {
   @Test
   public void testBatchedSyncWithClosedLogs() throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     ExecutorService threadA = Executors.newSingleThreadExecutor();
@@ -586,7 +615,7 @@ public class TestEditLog {
   @Test
   public void testEditChecksum() throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
@@ -658,7 +687,7 @@ public class TestEditLog {
    */
   private void testCrashRecovery(int numTransactions) throws Exception {
     MiniDFSCluster cluster = null;
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
         CHECKPOINT_ON_STARTUP_MIN_TXNS);
     
@@ -803,7 +832,7 @@ public class TestEditLog {
       boolean updateTransactionIdFile, boolean shouldSucceed)
       throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(NUM_DATA_NODES).build();
@@ -1134,7 +1163,7 @@ public class TestEditLog {
   public static NNStorage setupEdits(List<URI> editUris, int numrolls,
       boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException {
     List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
-    NNStorage storage = new NNStorage(new Configuration(),
+    NNStorage storage = new NNStorage(getConf(),
                                       Collections.<URI>emptyList(),
                                       editUris);
     storage.format(new NamespaceInfo());
@@ -1296,7 +1325,7 @@ public class TestEditLog {
     EditLogFileOutputStream elfos = null;
     EditLogFileInputStream elfis = null;
     try {
-      elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
+      elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
       elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
       elfos.writeRaw(garbage, 0, garbage.length);
       elfos.setReadyToFlush();
@@ -1474,7 +1503,7 @@ public class TestEditLog {
   public void testManyEditLogSegments() throws IOException {
     final int NUM_EDIT_LOG_ROLLS = 1000;
     // start a cluster
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
index f22ee2f..c60d79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Random;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
@@ -30,18 +32,40 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Supplier;
 
+@RunWith(Parameterized.class)
 public class TestEditLogAutoroll {
+  static {
+    GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestEditLogAutoroll(Boolean async) {
+    useAsyncEditLog = async;
+  }
 
   private Configuration conf;
   private MiniDFSCluster cluster;
@@ -61,6 +85,8 @@ public class TestEditLogAutoroll {
     // Make it autoroll after 10 edits
     conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f);
     conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
 
     int retryCount = 0;
     while (true) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
index 51dfc3e..28169bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
@@ -21,12 +21,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -43,13 +44,37 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.Mockito;
 
+@RunWith(Parameterized.class)
 public class TestEditLogJournalFailures {
 
   private int editsPerformed = 0;
   private MiniDFSCluster cluster;
   private FileSystem fs;
+  private boolean useAsyncEdits;
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{Boolean.FALSE});
+    params.add(new Object[]{Boolean.TRUE});
+    return params;
+  }
+
+  public TestEditLogJournalFailures(boolean useAsyncEdits) {
+    this.useAsyncEdits = useAsyncEdits;
+  }
+
+  private Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEdits);
+    return conf;
+  }
 
   /**
    * Create the mini cluster for testing and sub in a custom runtime so that
@@ -57,9 +82,9 @@ public class TestEditLogJournalFailures {
    */
   @Before
   public void setUpMiniCluster() throws IOException {
-    setUpMiniCluster(new HdfsConfiguration(), true);
+    setUpMiniCluster(getConf(), true);
   }
-  
+
   public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs)
       throws IOException {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
@@ -153,7 +178,7 @@ public class TestEditLogJournalFailures {
     String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings(
         DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
     shutDownMiniCluster();
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
@@ -193,7 +218,7 @@ public class TestEditLogJournalFailures {
       throws IOException {
     // Set up 4 name/edits dirs.
     shutDownMiniCluster();
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     String[] nameDirs = new String[4];
     for (int i = 0; i < nameDirs.length; i++) {
       File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index fcffbc3..195ce5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer;
  * This class tests various synchronization bugs in FSEditLog rolling
  * and namespace saving.
  */
+@RunWith(Parameterized.class)
 public class TestEditLogRace {
   static {
     GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
   }
 
-  private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ false });
+    params.add(new Object[]{ true });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
 
-  private static final String NAME_DIR =
-    MiniDFSCluster.getBaseDirectory() + "name1";
+  public TestEditLogRace(boolean useAsyncEditLog) {
+    TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
+  }
+
+  private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
 
   // This test creates NUM_THREADS threads and each thread continuously writes
   // transactions
@@ -94,21 +113,29 @@ public class TestEditLogRace {
    * This value needs to be significantly longer than the average
    * time for an fsync() or enterSafeMode().
    */
-  private static final int BLOCK_TIME = 10;
-  
+  private static final int BLOCK_TIME = 4; // 4 sec pretty generous
+
   //
   // an object that does a bunch of transactions
   //
   static class Transactions implements Runnable {
     final NamenodeProtocols nn;
+    final MiniDFSCluster cluster;
+    FileSystem fs;
     short replication = 3;
     long blockSize = 64;
     volatile boolean stopped = false;
     volatile Thread thr;
     final AtomicReference<Throwable> caught;
 
-    Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
-      nn = ns;
+    Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) {
+      this.cluster = cluster;
+      this.nn = cluster.getNameNodeRpc();
+      try {
+        this.fs = cluster.getFileSystem();
+      } catch (IOException e) {
+        caught.set(e);
+      }
       this.caught = caught;
     }
 
@@ -122,11 +149,23 @@ public class TestEditLogRace {
       while (!stopped) {
         try {
           String dirname = "/thr-" + thr.getId() + "-dir-" + i; 
-          nn.mkdirs(dirname, p, true);
-          nn.delete(dirname, true);
+          if (i % 2 == 0) {
+            Path dirnamePath = new Path(dirname);
+            fs.mkdirs(dirnamePath);
+            fs.delete(dirnamePath, true);
+          } else {
+            nn.mkdirs(dirname, p, true);
+            nn.delete(dirname, true);
+          }
         } catch (SafeModeException sme) {
           // This is OK - the tests will bring NN in and out of safemode
         } catch (Throwable e) {
+          // This is OK - the tests will bring NN in and out of safemode
+          if (e instanceof RemoteException &&
+              ((RemoteException)e).getClassName()
+              .contains("SafeModeException")) {
+            return;
+          }
           LOG.warn("Got error in transaction thread", e);
           caught.compareAndSet(null, e);
           break;
@@ -144,11 +183,11 @@ public class TestEditLogRace {
     }
   }
 
-  private void startTransactionWorkers(NamenodeProtocols namesystem,
+  private void startTransactionWorkers(MiniDFSCluster cluster,
                                        AtomicReference<Throwable> caughtErr) {
     // Create threads and make them run transactions concurrently.
     for (int i = 0; i < NUM_THREADS; i++) {
-      Transactions trans = new Transactions(namesystem, caughtErr);
+      Transactions trans = new Transactions(cluster, caughtErr);
       new Thread(trans, "TransactionThread-" + i).start();
       workers.add(trans);
     }
@@ -174,21 +213,21 @@ public class TestEditLogRace {
   @Test
   public void testEditLogRolling() throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = null;
+    Configuration conf = getConf();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
     FileSystem fileSys = null;
 
 
     AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
       cluster.waitActive();
       fileSys = cluster.getFileSystem();
       final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
       FSImage fsimage = cluster.getNamesystem().getFSImage();
       StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
 
-      startTransactionWorkers(nn, caughtErr);
+      startTransactionWorkers(cluster, caughtErr);
 
       long previousLogTxId = 1;
 
@@ -256,7 +295,7 @@ public class TestEditLogRace {
   @Test
   public void testSaveNamespace() throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
 
@@ -266,12 +305,11 @@ public class TestEditLogRace {
       cluster.waitActive();
       fileSys = cluster.getFileSystem();
       final FSNamesystem namesystem = cluster.getNamesystem();
-      final NamenodeProtocols nn = cluster.getNameNodeRpc();
 
       FSImage fsimage = namesystem.getFSImage();
       FSEditLog editLog = fsimage.getEditLog();
 
-      startTransactionWorkers(nn, caughtErr);
+      startTransactionWorkers(cluster, caughtErr);
 
       for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
         try {
@@ -321,11 +359,13 @@ public class TestEditLogRace {
  
   private Configuration getConf() {
     Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
-    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
-    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); 
+    //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
+    //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
+    conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
     return conf;
   }
 
@@ -389,7 +429,7 @@ public class TestEditLogRace {
         @Override
         public Void answer(InvocationOnMock invocation) throws Throwable {
           LOG.info("Flush called");
-          if (Thread.currentThread() == doAnEditThread) {
+          if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) {
             LOG.info("edit thread: Telling main thread we made it to flush section...");
             // Signal to main thread that the edit thread is in the racy section
             waitToEnterFlush.countDown();
@@ -457,62 +497,52 @@ public class TestEditLogRace {
 
     try {
       FSImage fsimage = namesystem.getFSImage();
-      FSEditLog editLog = spy(fsimage.getEditLog());
-      DFSTestUtil.setEditLogForTesting(namesystem, editLog);
+      final FSEditLog editLog = fsimage.getEditLog();
 
       final AtomicReference<Throwable> deferredException =
           new AtomicReference<Throwable>();
-      final CountDownLatch waitToEnterSync = new CountDownLatch(1);
-      
+      final CountDownLatch sleepingBeforeSync = new CountDownLatch(1);
+
       final Thread doAnEditThread = new Thread() {
         @Override
         public void run() {
           try {
-            LOG.info("Starting mkdirs");
-            namesystem.mkdirs("/test",
-                new PermissionStatus("test","test", new FsPermission((short)00755)),
-                true);
-            LOG.info("mkdirs complete");
+            LOG.info("Starting setOwner");
+            namesystem.writeLock();
+            try {
+              editLog.logSetOwner("/","test","test");
+            } finally {
+              namesystem.writeUnlock();
+            }
+            sleepingBeforeSync.countDown();
+            LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+            Thread.sleep(BLOCK_TIME*1000);
+            editLog.logSync();
+            LOG.info("edit thread: logSync complete");
           } catch (Throwable ioe) {
             LOG.fatal("Got exception", ioe);
             deferredException.set(ioe);
-            waitToEnterSync.countDown();
-          }
-        }
-      };
-      
-      Answer<Void> blockingSync = new Answer<Void>() {
-        @Override
-        public Void answer(InvocationOnMock invocation) throws Throwable {
-          LOG.info("logSync called");
-          if (Thread.currentThread() == doAnEditThread) {
-            LOG.info("edit thread: Telling main thread we made it just before logSync...");
-            waitToEnterSync.countDown();
-            LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
-            Thread.sleep(BLOCK_TIME*1000);
-            LOG.info("Going through to logSync. This will allow the main thread to continue.");
+            sleepingBeforeSync.countDown();
           }
-          invocation.callRealMethod();
-          LOG.info("logSync complete");
-          return null;
         }
       };
-      doAnswer(blockingSync).when(editLog).logSync();
-      
+      doAnEditThread.setDaemon(true);
       doAnEditThread.start();
       LOG.info("Main thread: waiting to just before logSync...");
-      waitToEnterSync.await();
+      sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS);
       assertNull(deferredException.get());
       LOG.info("Main thread: detected that logSync about to be called.");
       LOG.info("Trying to enter safe mode.");
-      LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
-      
+
       long st = Time.now();
       namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
       long et = Time.now();
-      LOG.info("Entered safe mode");
-      // Make sure we really waited for the flush to complete!
-      assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+      LOG.info("Entered safe mode after "+(et-st)+"ms");
+
+      // Make sure we didn't wait for the thread that did a logEdit but
+      // not logSync.  Going into safemode does a logSyncAll that will flush
+      // its edit.
+      assertTrue(et - st < (BLOCK_TIME/2)*1000);
 
       // Once we're in safe mode, save namespace.
       namesystem.saveNamespace(0, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index b0e5704..fe29e1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -31,6 +31,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
@@ -59,28 +61,51 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 
+@RunWith(Parameterized.class)
 public class TestFSEditLogLoader {
-  
+  @Parameters
+  public static Collection<Object[]> data() {
+    Collection<Object[]> params = new ArrayList<Object[]>();
+    params.add(new Object[]{ Boolean.FALSE });
+    params.add(new Object[]{ Boolean.TRUE });
+    return params;
+  }
+
+  private static boolean useAsyncEditLog;
+  public TestFSEditLogLoader(Boolean async) {
+    useAsyncEditLog = async;
+  }
+
+  private static Configuration getConf() {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+        useAsyncEditLog);
+    return conf;
+  }
+
   static {
     GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
     GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
   }
-  
+
   private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
 
   private static final int NUM_DATA_NODES = 0;
 
   private static final ErasureCodingPolicy testECPolicy
       = StripedFileTestUtil.TEST_EC_POLICY;
-  
+
   @Test
   public void testDisplayRecentEditLogOpCodes() throws IOException {
-    // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    // start a cluster
+    Configuration conf = getConf();
     MiniDFSCluster cluster = null;
     FileSystem fileSys = null;
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
@@ -130,7 +155,7 @@ public class TestFSEditLogLoader {
   @Test
   public void testReplicationAdjusted() throws Exception {
     // start a cluster 
-    Configuration conf = new HdfsConfiguration();
+    Configuration conf = getConf();
     // Replicate and heartbeat fast to shave a few seconds off test
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);