You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/03/01 00:35:30 UTC
[1/2] hadoop git commit: HDFS-7964. Add support for async edit
logging. Contributed by Daryn Sharp.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0fa54d45b -> 215171683
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
index 0265a4d..87e2523 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
@@ -27,6 +27,8 @@ import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@@ -50,13 +52,38 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Sets;
/**
* This tests data recovery mode for the NameNode.
*/
+
+@RunWith(Parameterized.class)
public class TestNameNodeRecovery {
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestNameNodeRecovery(Boolean async) {
+ useAsyncEditLog = async;
+ }
+
+ private static Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
+ return conf;
+ }
+
private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
private static final StartupOption recoverStartOpt = StartupOption.RECOVER;
private static final File TEST_DIR = PathUtils.getTestDir(TestNameNodeRecovery.class);
@@ -73,7 +100,7 @@ public class TestNameNodeRecovery {
EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null;
try {
- elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
+ elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elts.addTransactionsToLog(elfos, cache);
@@ -525,7 +552,7 @@ public class TestNameNodeRecovery {
final boolean needRecovery = corruptor.needRecovery(finalize);
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
setupRecoveryTestConf(conf);
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
index 5a104ad..30db429 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -40,11 +43,31 @@ import org.apache.hadoop.net.ServerSocketUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Supplier;
+@RunWith(Parameterized.class)
public class TestEditLogTailer {
-
+ static {
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestEditLogTailer(Boolean async) {
+ useAsyncEditLog = async;
+ }
+
private static final String DIR_PREFIX = "/dir";
private static final int DIRS_TO_MAKE = 20;
static final long SLEEP_TIME = 1000;
@@ -52,13 +75,21 @@ public class TestEditLogTailer {
static {
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
GenericTestUtils.setLogLevel(EditLogTailer.LOG, Level.ALL);
}
-
+
+ private static Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
+ return conf;
+ }
+
@Test
public void testTailer() throws IOException, InterruptedException,
ServiceFailedException {
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
@@ -122,7 +153,7 @@ public class TestEditLogTailer {
private static void testStandbyTriggersLogRolls(int activeIndex)
throws Exception {
- Configuration conf = new Configuration();
+ Configuration conf = getConf();
// Roll every 1s
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
[2/2] hadoop git commit: HDFS-7964. Add support for async edit
logging. Contributed by Daryn Sharp.
Posted by ji...@apache.org.
HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21517168
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21517168
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21517168
Branch: refs/heads/trunk
Commit: 2151716832ad14932dd65b1a4e47e64d8d6cd767
Parents: 0fa54d4
Author: Jing Zhao <ji...@apache.org>
Authored: Mon Feb 29 15:34:43 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Feb 29 15:34:43 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../bkjournal/TestBookKeeperAsHASharedDir.java | 46 ++-
.../src/test/resources/log4j.properties | 2 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +-
.../hadoop/hdfs/server/namenode/BackupNode.java | 4 +
.../hadoop/hdfs/server/namenode/FSEditLog.java | 109 ++++---
.../hdfs/server/namenode/FSEditLogAsync.java | 322 +++++++++++++++++++
.../hdfs/server/namenode/FSEditLogOp.java | 213 ++++++------
.../hdfs/server/namenode/FSEditLogOpCodes.java | 108 ++++---
.../hadoop/hdfs/server/namenode/FSImage.java | 2 +-
.../hadoop/hdfs/server/namenode/NameNode.java | 1 -
.../namenode/metrics/NameNodeMetrics.java | 4 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +
.../hdfs/server/namenode/TestAuditLogs.java | 15 +-
.../hdfs/server/namenode/TestEditLog.java | 59 +++-
.../server/namenode/TestEditLogAutoroll.java | 26 ++
.../namenode/TestEditLogJournalFailures.java | 35 +-
.../hdfs/server/namenode/TestEditLogRace.java | 144 +++++----
.../server/namenode/TestFSEditLogLoader.java | 37 ++-
.../server/namenode/TestNameNodeRecovery.java | 31 +-
.../server/namenode/ha/TestEditLogTailer.java | 39 ++-
21 files changed, 904 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3d57efa..c3ea5ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1037,6 +1037,8 @@ Release 2.9.0 - UNRELEASED
HDFS-9754. Avoid unnecessary getBlockCollection calls in BlockManager.
(jing9)
+ HDFS-7964. Add support for async edit logging. (Daryn Sharp)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
index 5611bb8..ff8c00d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
@@ -24,6 +24,9 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
@@ -56,11 +59,14 @@ import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
/**
* Integration test to ensure that the BookKeeper JournalManager
* works for HDFS Namenode HA
*/
+@RunWith(Parameterized.class)
public class TestBookKeeperAsHASharedDir {
static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class);
@@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir {
private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager";
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestBookKeeperAsHASharedDir(Boolean async) {
+ useAsyncEditLog = async;
+ }
+
+ private static Configuration getConf() {
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
+ return conf;
+ }
+
@BeforeClass
public static void setupBookkeeper() throws Exception {
bkutil = new BKJMUtil(numBookies);
@@ -92,8 +119,7 @@ public class TestBookKeeperAsHASharedDir {
public void testFailoverWithBK() throws Exception {
MiniDFSCluster cluster = null;
try {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailover").toString());
BKJMUtil.addJournalManagerDefinition(conf);
@@ -144,8 +170,7 @@ public class TestBookKeeperAsHASharedDir {
MiniDFSCluster cluster = null;
try {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverWithFail").toString());
conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
@@ -221,8 +246,7 @@ public class TestBookKeeperAsHASharedDir {
MiniDFSCluster cluster = null;
try {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
BKJMUtil.createJournalURI("/hotfailoverMultiple").toString());
BKJMUtil.addJournalManagerDefinition(conf);
@@ -245,7 +269,9 @@ public class TestBookKeeperAsHASharedDir {
fs = cluster.getFileSystem(0); // get the older active server.
try {
- fs.delete(p1, true);
+ System.out.println("DMS: > *************");
+ boolean foo = fs.delete(p1, true);
+ System.out.println("DMS: < ************* "+foo);
fail("Log update on older active should cause it to exit");
} catch (RemoteException re) {
assertTrue(re.getClassName().contains("ExitException"));
@@ -267,9 +293,8 @@ public class TestBookKeeperAsHASharedDir {
public void testInitializeBKSharedEdits() throws Exception {
MiniDFSCluster cluster = null;
try {
- Configuration conf = new Configuration();
+ Configuration conf = getConf();
HAUtil.setAllowStandbyReads(conf, true);
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
@@ -358,8 +383,7 @@ public class TestBookKeeperAsHASharedDir {
public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception {
MiniDFSCluster cluster = null;
try {
- Configuration conf = new Configuration();
- conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+ Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/correctEditLogSelection").toString());
BKJMUtil.addJournalManagerDefinition(conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
index 93c22f7..52aac43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties
@@ -26,7 +26,7 @@
# Format is "<default threshold> (, <appender>)+
# DEFAULT: console appender only
-log4j.rootLogger=OFF, CONSOLE
+log4j.rootLogger=DEBUG, CONSOLE
# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index bfb6203..9c06e29 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -291,7 +291,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
-
+
+ public static final String DFS_NAMENODE_EDITS_ASYNC_LOGGING =
+ "dfs.namenode.edits.asynclogging";
+ public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false;
+
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index 36053f7..d36e0b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -142,6 +142,10 @@ public class BackupNode extends NameNode {
@Override // NameNode
protected void initialize(Configuration conf) throws IOException {
+ // async edit logs are incompatible with backup node due to race
+ // conditions resulting from laxer synchronization
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false);
+
// Trash is disabled in BackupNameNode,
// but should be turned back on if it ever becomes active.
conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index c8986dc..809d9e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -78,7 +78,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@@ -115,7 +117,7 @@ import com.google.common.collect.Lists;
@InterfaceStability.Evolving
public class FSEditLog implements LogsPurgeable {
- static final Log LOG = LogFactory.getLog(FSEditLog.class);
+ public static final Log LOG = LogFactory.getLog(FSEditLog.class);
/**
* State machine for edit log.
@@ -178,17 +180,11 @@ public class FSEditLog implements LogsPurgeable {
private final NNStorage storage;
private final Configuration conf;
-
+
private final List<URI> editsDirs;
- private final ThreadLocal<OpInstanceCache> cache =
- new ThreadLocal<OpInstanceCache>() {
- @Override
- protected OpInstanceCache initialValue() {
- return new OpInstanceCache();
- }
- };
-
+ protected final OpInstanceCache cache = new OpInstanceCache();
+
/**
* The edit directories that are shared between primary and secondary.
*/
@@ -217,6 +213,17 @@ public class FSEditLog implements LogsPurgeable {
}
};
+ static FSEditLog newInstance(Configuration conf, NNStorage storage,
+ List<URI> editsDirs) {
+ boolean asyncEditLogging = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT);
+ LOG.info("Edit logging is async:" + asyncEditLogging);
+ return asyncEditLogging
+ ? new FSEditLogAsync(conf, storage, editsDirs)
+ : new FSEditLog(conf, storage, editsDirs);
+ }
+
/**
* Constructor for FSEditLog. Underlying journals are constructed, but
* no streams are opened until open() is called.
@@ -423,33 +430,35 @@ public class FSEditLog implements LogsPurgeable {
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
-
- long start = beginTransaction();
- op.setTransactionId(txid);
-
- try {
- editLogStream.write(op);
- } catch (IOException ex) {
- // All journals failed, it is handled in logSync.
- } finally {
- op.reset();
- }
- endTransaction(start);
-
// check if it is time to schedule an automatic sync
- needsSync = shouldForceSync();
+ needsSync = doEditTransaction(op);
if (needsSync) {
isAutoSyncScheduled = true;
}
}
-
+
// Sync the log if an automatic sync is required.
if (needsSync) {
logSync();
}
}
+ synchronized boolean doEditTransaction(final FSEditLogOp op) {
+ long start = beginTransaction();
+ op.setTransactionId(txid);
+
+ try {
+ editLogStream.write(op);
+ } catch (IOException ex) {
+ // All journals failed, it is handled in logSync.
+ } finally {
+ op.reset();
+ }
+ endTransaction(start);
+ return shouldForceSync();
+ }
+
/**
* Wait if an automatic sync is scheduled
*/
@@ -544,15 +553,10 @@ public class FSEditLog implements LogsPurgeable {
* else more operations can start writing while this is in progress.
*/
void logSyncAll() {
- // Record the most recent transaction ID as our own id
- synchronized (this) {
- TransactionId id = myTransactionId.get();
- id.txid = txid;
- }
- // Then make sure we're synced up to this point
- logSync();
+ // Make sure we're synced up to the most recent transaction ID.
+ logSync(getLastWrittenTxId());
}
-
+
/**
* Sync all modifications done by this thread.
*
@@ -582,12 +586,14 @@ public class FSEditLog implements LogsPurgeable {
* waitForSyncToFinish() before assuming they are running alone.
*/
public void logSync() {
- long syncStart = 0;
+ // Fetch the transactionId of this thread.
+ logSync(myTransactionId.get().txid);
+ }
- // Fetch the transactionId of this thread.
- long mytxid = myTransactionId.get().txid;
-
+ protected void logSync(long mytxid) {
+ long syncStart = 0;
boolean sync = false;
+ long editsBatchedInSync = 0;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
@@ -606,19 +612,17 @@ public class FSEditLog implements LogsPurgeable {
// If this transaction was already flushed, then nothing to do
//
if (mytxid <= synctxid) {
- numTransactionsBatchedInSync++;
- if (metrics != null) {
- // Metrics is non-null only when used inside name node
- metrics.incrTransactionsBatchedInSync();
- }
return;
}
-
- // now, this thread will do the sync
+
+ // now, this thread will do the sync. track if other edits were
+ // included in the sync - ie. batched. if this is the only edit
+ // synced then the batched count is 0
+ editsBatchedInSync = txid - synctxid - 1;
syncStart = txid;
isSyncRunning = true;
sync = true;
-
+
// swap buffers
try {
if (journalSet.isEmpty()) {
@@ -667,6 +671,8 @@ public class FSEditLog implements LogsPurgeable {
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
+ metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
+ numTransactionsBatchedInSync += editsBatchedInSync;
}
} finally {
@@ -1138,13 +1144,13 @@ public class FSEditLog implements LogsPurgeable {
}
void logStartRollingUpgrade(long startTime) {
- RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get());
+ RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get());
op.setTime(startTime);
logEdit(op);
}
void logFinalizeRollingUpgrade(long finalizeTime) {
- RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get());
+ RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get());
op.setTime(finalizeTime);
logEdit(op);
}
@@ -1313,8 +1319,9 @@ public class FSEditLog implements LogsPurgeable {
if (writeEndTxn) {
logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_END_LOG_SEGMENT));
- logSync();
}
+ // always sync to ensure all edits are flushed.
+ logSyncAll();
printStatistics(true);
@@ -1701,6 +1708,12 @@ public class FSEditLog implements LogsPurgeable {
}
}
+ @VisibleForTesting
+ // needed by async impl to restart thread when edit log is replaced by a
+ // spy because a spy is a shallow copy
+ public void restart() {
+ }
+
/**
* Return total number of syncs happened on this edit log.
* @return long - count
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
new file mode 100644
index 0000000..c14a310
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.util.ExitUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+class FSEditLogAsync extends FSEditLog implements Runnable {
+ static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
+ // use separate mutex to avoid possible deadlock when stopping the thread.
+ private final Object syncThreadLock = new Object();
+ private Thread syncThread;
+ private static ThreadLocal<Edit> threadEdit = new ThreadLocal<Edit>();
+
+ // requires concurrent access from caller threads and syncing thread.
+ private final BlockingQueue<Edit> editPendingQ =
+ new ArrayBlockingQueue<Edit>(4096);
+
+ // only accessed by syncing thread so no synchronization required.
+ // queue is unbounded because it's effectively limited by the size
+ // of the edit log buffer - ie. a sync will eventually be forced.
+ private final Deque<Edit> syncWaitQ = new ArrayDeque<Edit>();
+
+ FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) {
+ super(conf, storage, editsDirs);
+ // op instances cannot be shared due to queuing for background thread.
+ cache.disableCache();
+ }
+
+ private boolean isSyncThreadAlive() {
+ synchronized(syncThreadLock) {
+ return syncThread != null && syncThread.isAlive();
+ }
+ }
+
+ private void startSyncThread() {
+ synchronized(syncThreadLock) {
+ if (!isSyncThreadAlive()) {
+ syncThread = new Thread(this, this.getClass().getSimpleName());
+ syncThread.start();
+ }
+ }
+ }
+
+ private void stopSyncThread() {
+ synchronized(syncThreadLock) {
+ if (syncThread != null) {
+ try {
+ syncThread.interrupt();
+ syncThread.join();
+ } catch (InterruptedException e) {
+ // we're quitting anyway.
+ } finally {
+ syncThread = null;
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ @Override
+ public void restart() {
+ stopSyncThread();
+ startSyncThread();
+ }
+
+ @Override
+ void openForWrite(int layoutVersion) throws IOException {
+ try {
+ startSyncThread();
+ super.openForWrite(layoutVersion);
+ } catch (IOException ioe) {
+ stopSyncThread();
+ throw ioe;
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ stopSyncThread();
+ }
+
+ @Override
+ void logEdit(final FSEditLogOp op) {
+ Edit edit = getEditInstance(op);
+ threadEdit.set(edit);
+ enqueueEdit(edit);
+ }
+
+ @Override
+ public void logSync() {
+ Edit edit = threadEdit.get();
+ if (edit != null) {
+ // do NOT remove to avoid expunge & rehash penalties.
+ threadEdit.set(null);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("logSync " + edit);
+ }
+ edit.logSyncWait();
+ }
+ }
+
+ @Override
+ public void logSyncAll() {
+ // doesn't actually log anything, just ensures that the queues are
+ // drained when it returns.
+ Edit edit = new SyncEdit(this, null){
+ @Override
+ public boolean logEdit() {
+ return true;
+ }
+ };
+ enqueueEdit(edit);
+ edit.logSyncWait();
+ }
+
+ private void enqueueEdit(Edit edit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("logEdit " + edit);
+ }
+ try {
+ if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) {
+ Preconditions.checkState(
+ isSyncThreadAlive(), "sync thread is not alive");
+ editPendingQ.put(edit);
+ }
+ } catch (Throwable t) {
+ // should never happen! failure to enqueue an edit is fatal
+ terminate(t);
+ }
+ }
+
+ private Edit dequeueEdit() throws InterruptedException {
+ // only block for next edit if no pending syncs.
+ return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ boolean doSync;
+ Edit edit = dequeueEdit();
+ if (edit != null) {
+ // sync if requested by edit log.
+ doSync = edit.logEdit();
+ syncWaitQ.add(edit);
+ } else {
+ // sync when editq runs dry, but have edits pending a sync.
+ doSync = !syncWaitQ.isEmpty();
+ }
+ if (doSync) {
+ // normally edit log exceptions cause the NN to terminate, but tests
+ // relying on ExitUtil.terminate need to see the exception.
+ RuntimeException syncEx = null;
+ try {
+ logSync(getLastWrittenTxId());
+ } catch (RuntimeException ex) {
+ syncEx = ex;
+ }
+ while ((edit = syncWaitQ.poll()) != null) {
+ edit.logSyncNotify(syncEx);
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ LOG.info(Thread.currentThread().getName() + " was interrupted, exiting");
+ } catch (Throwable t) {
+ terminate(t);
+ }
+ }
+
+ private void terminate(Throwable t) {
+ String message = "Exception while edit logging: "+t.getMessage();
+ LOG.fatal(message, t);
+ ExitUtil.terminate(1, message);
+ }
+
+ private Edit getEditInstance(FSEditLogOp op) {
+ final Edit edit;
+ final Server.Call rpcCall = Server.getCurCall().get();
+ // only rpc calls not explicitly sync'ed on the log will be async.
+ if (rpcCall != null && !Thread.holdsLock(this)) {
+ edit = new RpcEdit(this, op, rpcCall);
+ } else {
+ edit = new SyncEdit(this, op);
+ }
+ return edit;
+ }
+
+ private abstract static class Edit {
+ final FSEditLog log;
+ final FSEditLogOp op;
+
+ Edit(FSEditLog log, FSEditLogOp op) {
+ this.log = log;
+ this.op = op;
+ }
+
+ // return whether edit log wants to sync.
+ boolean logEdit() {
+ return log.doEditTransaction(op);
+ }
+
+ // wait for background thread to finish syncing.
+ abstract void logSyncWait();
+ // wake up the thread in logSyncWait.
+ abstract void logSyncNotify(RuntimeException ex);
+ }
+
+ // the calling thread is synchronously waiting for the edit to complete.
+ private static class SyncEdit extends Edit {
+ private final Object lock;
+ private boolean done = false;
+ private RuntimeException syncEx;
+
+ SyncEdit(FSEditLog log, FSEditLogOp op) {
+ super(log, op);
+ // if the log is already sync'ed (ex. log rolling), must wait on it to
+ // avoid deadlock with sync thread. the fsn lock protects against
+ // logging during a roll. else lock on this object to avoid sync
+ // contention on edit log.
+ lock = Thread.holdsLock(log) ? log : this;
+ }
+
+ @Override
+ public void logSyncWait() {
+ synchronized(lock) {
+ while (!done) {
+ try {
+ lock.wait(10);
+ } catch (InterruptedException e) {}
+ }
+ // only needed by tests that rely on ExitUtil.terminate() since
+ // normally exceptions terminate the NN.
+ if (syncEx != null) {
+ syncEx.fillInStackTrace();
+ throw syncEx;
+ }
+ }
+ }
+
+ @Override
+ public void logSyncNotify(RuntimeException ex) {
+ synchronized(lock) {
+ done = true;
+ syncEx = ex;
+ lock.notifyAll();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "["+getClass().getSimpleName()+" op:"+op+"]";
+ }
+ }
+
+ // the calling rpc thread will return immediately from logSync but the
+ // rpc response will not be sent until the edit is durable.
+ private static class RpcEdit extends Edit {
+ private final Server.Call call;
+
+ RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) {
+ super(log, op);
+ this.call = call;
+ call.postponeResponse();
+ }
+
+ @Override
+ public void logSyncWait() {
+ // logSync is a no-op to immediately free up rpc handlers. the
+ // response is sent when the sync thread calls syncNotify.
+ }
+
+ @Override
+ public void logSyncNotify(RuntimeException syncEx) {
+ try {
+ if (syncEx == null) {
+ call.sendResponse();
+ } else {
+ call.abortResponse(syncEx);
+ }
+ } catch (Exception e) {} // don't care if not sent.
+ }
+
+ @Override
+ public String toString() {
+ return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index a8389f0..c4e1a78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -147,6 +147,55 @@ public abstract class FSEditLogOp {
byte[] rpcClientId;
int rpcCallId;
+ public static class OpInstanceCache {
+ private static ThreadLocal<OpInstanceCacheMap> cache =
+ new ThreadLocal<OpInstanceCacheMap>() {
+ @Override
+ protected OpInstanceCacheMap initialValue() {
+ return new OpInstanceCacheMap();
+ }
+ };
+
+ @SuppressWarnings("serial")
+ static final class OpInstanceCacheMap extends
+ EnumMap<FSEditLogOpCodes, FSEditLogOp> {
+ OpInstanceCacheMap() {
+ super(FSEditLogOpCodes.class);
+ for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
+ put(opCode, newInstance(opCode));
+ }
+ }
+ }
+
+ private boolean useCache = true;
+
+ void disableCache() {
+ useCache = false;
+ }
+
+ public OpInstanceCache get() {
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) {
+ return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode);
+ }
+
+ private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) {
+ FSEditLogOp instance = null;
+ Class<? extends FSEditLogOp> clazz = opCode.getOpClass();
+ if (clazz != null) {
+ try {
+ instance = clazz.newInstance();
+ } catch (Exception ex) {
+ throw new RuntimeException("Failed to instantiate "+opCode, ex);
+ }
+ }
+ return instance;
+ }
+ }
+
final void reset() {
txid = HdfsServerConstants.INVALID_TXID;
rpcClientId = RpcConstants.DUMMY_CLIENT_ID;
@@ -156,70 +205,6 @@ public abstract class FSEditLogOp {
abstract void resetSubFields();
- final public static class OpInstanceCache {
- private final EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
- new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
-
- public OpInstanceCache() {
- inst.put(OP_ADD, new AddOp());
- inst.put(OP_CLOSE, new CloseOp());
- inst.put(OP_SET_REPLICATION, new SetReplicationOp());
- inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
- inst.put(OP_RENAME_OLD, new RenameOldOp());
- inst.put(OP_DELETE, new DeleteOp());
- inst.put(OP_MKDIR, new MkdirOp());
- inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op());
- inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
- inst.put(OP_SET_OWNER, new SetOwnerOp());
- inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
- inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
- inst.put(OP_SET_QUOTA, new SetQuotaOp());
- inst.put(OP_TIMES, new TimesOp());
- inst.put(OP_SYMLINK, new SymlinkOp());
- inst.put(OP_RENAME, new RenameOp());
- inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
- inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
- inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
- inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp());
- inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
- inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
- inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
- inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
- inst.put(OP_TRUNCATE, new TruncateOp());
-
- inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp());
- inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp());
- inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp());
- inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp());
- inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
- inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
- inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
-
- inst.put(OP_ADD_CACHE_DIRECTIVE, new AddCacheDirectiveInfoOp());
- inst.put(OP_MODIFY_CACHE_DIRECTIVE, new ModifyCacheDirectiveInfoOp());
- inst.put(OP_REMOVE_CACHE_DIRECTIVE, new RemoveCacheDirectiveInfoOp());
- inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
- inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
- inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
-
- inst.put(OP_ADD_BLOCK, new AddBlockOp());
- inst.put(OP_SET_ACL, new SetAclOp());
- inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp(
- OP_ROLLING_UPGRADE_START, "start"));
- inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp(
- OP_ROLLING_UPGRADE_FINALIZE, "finalize"));
- inst.put(OP_SET_XATTR, new SetXAttrOp());
- inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
- inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
- inst.put(OP_APPEND, new AppendOp());
- inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp());
- }
-
- public FSEditLogOp get(FSEditLogOpCodes opcode) {
- return inst.get(opcode);
- }
- }
-
private static ImmutableMap<String, FsAction> fsActionMap() {
ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder();
for (FsAction v : FsAction.values())
@@ -774,7 +759,7 @@ public abstract class FSEditLogOp {
* {@link ClientProtocol#append}
*/
static class AddOp extends AddCloseOp {
- private AddOp() {
+ AddOp() {
super(OP_ADD);
}
@@ -802,7 +787,7 @@ public abstract class FSEditLogOp {
* finally log an AddOp.
*/
static class CloseOp extends AddCloseOp {
- private CloseOp() {
+ CloseOp() {
super(OP_CLOSE);
}
@@ -830,7 +815,7 @@ public abstract class FSEditLogOp {
String clientMachine;
boolean newBlock;
- private AppendOp() {
+ AppendOp() {
super(OP_APPEND);
}
@@ -920,7 +905,7 @@ public abstract class FSEditLogOp {
private Block penultimateBlock;
private Block lastBlock;
- private AddBlockOp() {
+ AddBlockOp() {
super(OP_ADD_BLOCK);
}
@@ -1032,7 +1017,7 @@ public abstract class FSEditLogOp {
String path;
Block[] blocks;
- private UpdateBlocksOp() {
+ UpdateBlocksOp() {
super(OP_UPDATE_BLOCKS);
}
@@ -1125,7 +1110,7 @@ public abstract class FSEditLogOp {
String path;
short replication;
- private SetReplicationOp() {
+ SetReplicationOp() {
super(OP_SET_REPLICATION);
}
@@ -1204,7 +1189,7 @@ public abstract class FSEditLogOp {
long timestamp;
final static public int MAX_CONCAT_SRC = 1024 * 1024;
- private ConcatDeleteOp() {
+ ConcatDeleteOp() {
super(OP_CONCAT_DELETE);
}
@@ -1362,7 +1347,7 @@ public abstract class FSEditLogOp {
String dst;
long timestamp;
- private RenameOldOp() {
+ RenameOldOp() {
super(OP_RENAME_OLD);
}
@@ -1474,7 +1459,7 @@ public abstract class FSEditLogOp {
String path;
long timestamp;
- private DeleteOp() {
+ DeleteOp() {
super(OP_DELETE);
}
@@ -1575,7 +1560,7 @@ public abstract class FSEditLogOp {
List<AclEntry> aclEntries;
List<XAttr> xAttrs;
- private MkdirOp() {
+ MkdirOp() {
super(OP_MKDIR);
}
@@ -1748,7 +1733,7 @@ public abstract class FSEditLogOp {
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
- private SetGenstampV1Op() {
+ SetGenstampV1Op() {
super(OP_SET_GENSTAMP_V1);
}
@@ -1806,7 +1791,7 @@ public abstract class FSEditLogOp {
static class SetGenstampV2Op extends FSEditLogOp {
long genStampV2;
- private SetGenstampV2Op() {
+ SetGenstampV2Op() {
super(OP_SET_GENSTAMP_V2);
}
@@ -1864,7 +1849,7 @@ public abstract class FSEditLogOp {
static class AllocateBlockIdOp extends FSEditLogOp {
long blockId;
- private AllocateBlockIdOp() {
+ AllocateBlockIdOp() {
super(OP_ALLOCATE_BLOCK_ID);
}
@@ -1923,7 +1908,7 @@ public abstract class FSEditLogOp {
String src;
FsPermission permissions;
- private SetPermissionsOp() {
+ SetPermissionsOp() {
super(OP_SET_PERMISSIONS);
}
@@ -1996,7 +1981,7 @@ public abstract class FSEditLogOp {
String username;
String groupname;
- private SetOwnerOp() {
+ SetOwnerOp() {
super(OP_SET_OWNER);
}
@@ -2083,7 +2068,7 @@ public abstract class FSEditLogOp {
String src;
long nsQuota;
- private SetNSQuotaOp() {
+ SetNSQuotaOp() {
super(OP_SET_NS_QUOTA);
}
@@ -2141,7 +2126,7 @@ public abstract class FSEditLogOp {
static class ClearNSQuotaOp extends FSEditLogOp {
String src;
- private ClearNSQuotaOp() {
+ ClearNSQuotaOp() {
super(OP_CLEAR_NS_QUOTA);
}
@@ -2195,7 +2180,7 @@ public abstract class FSEditLogOp {
long nsQuota;
long dsQuota;
- private SetQuotaOp() {
+ SetQuotaOp() {
super(OP_SET_QUOTA);
}
@@ -2280,7 +2265,7 @@ public abstract class FSEditLogOp {
long dsQuota;
StorageType type;
- private SetQuotaByStorageTypeOp() {
+ SetQuotaByStorageTypeOp() {
super(OP_SET_QUOTA_BY_STORAGETYPE);
}
@@ -2363,7 +2348,7 @@ public abstract class FSEditLogOp {
long mtime;
long atime;
- private TimesOp() {
+ TimesOp() {
super(OP_TIMES);
}
@@ -2472,7 +2457,7 @@ public abstract class FSEditLogOp {
long atime;
PermissionStatus permissionStatus;
- private SymlinkOp() {
+ SymlinkOp() {
super(OP_SYMLINK);
}
@@ -2631,7 +2616,7 @@ public abstract class FSEditLogOp {
long timestamp;
Rename[] options;
- private RenameOp() {
+ RenameOp() {
super(OP_RENAME);
}
@@ -2796,7 +2781,7 @@ public abstract class FSEditLogOp {
long timestamp;
Block truncateBlock;
- private TruncateOp() {
+ TruncateOp() {
super(OP_TRUNCATE);
}
@@ -2929,7 +2914,7 @@ public abstract class FSEditLogOp {
String path;
String newHolder;
- private ReassignLeaseOp() {
+ ReassignLeaseOp() {
super(OP_REASSIGN_LEASE);
}
@@ -3011,7 +2996,7 @@ public abstract class FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
- private GetDelegationTokenOp() {
+ GetDelegationTokenOp() {
super(OP_GET_DELEGATION_TOKEN);
}
@@ -3090,7 +3075,7 @@ public abstract class FSEditLogOp {
DelegationTokenIdentifier token;
long expiryTime;
- private RenewDelegationTokenOp() {
+ RenewDelegationTokenOp() {
super(OP_RENEW_DELEGATION_TOKEN);
}
@@ -3168,7 +3153,7 @@ public abstract class FSEditLogOp {
static class CancelDelegationTokenOp extends FSEditLogOp {
DelegationTokenIdentifier token;
- private CancelDelegationTokenOp() {
+ CancelDelegationTokenOp() {
super(OP_CANCEL_DELEGATION_TOKEN);
}
@@ -3227,7 +3212,7 @@ public abstract class FSEditLogOp {
static class UpdateMasterKeyOp extends FSEditLogOp {
DelegationKey key;
- private UpdateMasterKeyOp() {
+ UpdateMasterKeyOp() {
super(OP_UPDATE_MASTER_KEY);
}
@@ -3332,8 +3317,20 @@ public abstract class FSEditLogOp {
}
}
+ static class StartLogSegmentOp extends LogSegmentOp {
+ StartLogSegmentOp() {
+ super(OP_START_LOG_SEGMENT);
+ }
+ }
+
+ static class EndLogSegmentOp extends LogSegmentOp {
+ EndLogSegmentOp() {
+ super(OP_END_LOG_SEGMENT);
+ }
+ }
+
static class InvalidOp extends FSEditLogOp {
- private InvalidOp() {
+ InvalidOp() {
super(OP_INVALID);
}
@@ -4144,7 +4141,7 @@ public abstract class FSEditLogOp {
List<XAttr> xAttrs;
String src;
- private RemoveXAttrOp() {
+ RemoveXAttrOp() {
super(OP_REMOVE_XATTR);
}
@@ -4197,7 +4194,7 @@ public abstract class FSEditLogOp {
List<XAttr> xAttrs;
String src;
- private SetXAttrOp() {
+ SetXAttrOp() {
super(OP_SET_XATTR);
}
@@ -4250,7 +4247,7 @@ public abstract class FSEditLogOp {
List<AclEntry> aclEntries = Lists.newArrayList();
String src;
- private SetAclOp() {
+ SetAclOp() {
super(OP_SET_ACL);
}
@@ -4347,7 +4344,7 @@ public abstract class FSEditLogOp {
/**
* Operation corresponding to upgrade
*/
- static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
+ abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent
private final String name;
private long time;
@@ -4414,7 +4411,7 @@ public abstract class FSEditLogOp {
String path;
byte policyId;
- private SetStoragePolicyOp() {
+ SetStoragePolicyOp() {
super(OP_SET_STORAGE_POLICY);
}
@@ -4480,6 +4477,26 @@ public abstract class FSEditLogOp {
}
}
+ static class RollingUpgradeStartOp extends RollingUpgradeOp {
+ RollingUpgradeStartOp() {
+ super(OP_ROLLING_UPGRADE_START, "start");
+ }
+
+ static RollingUpgradeStartOp getInstance(OpInstanceCache cache) {
+ return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START);
+ }
+ }
+
+ static class RollingUpgradeFinalizeOp extends RollingUpgradeOp {
+ RollingUpgradeFinalizeOp() {
+ super(OP_ROLLING_UPGRADE_FINALIZE, "finalize");
+ }
+
+ static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) {
+ return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE);
+ }
+ }
+
/**
* Class for writing editlog ops
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 1a0a296..3f8feba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* Op codes for edits file
@@ -27,60 +28,64 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public enum FSEditLogOpCodes {
// last op code in file
- OP_ADD ((byte) 0),
- OP_RENAME_OLD ((byte) 1), // deprecated operation
- OP_DELETE ((byte) 2),
- OP_MKDIR ((byte) 3),
- OP_SET_REPLICATION ((byte) 4),
+ OP_ADD ((byte) 0, AddOp.class),
+ // deprecated operation
+ OP_RENAME_OLD ((byte) 1, RenameOldOp.class),
+ OP_DELETE ((byte) 2, DeleteOp.class),
+ OP_MKDIR ((byte) 3, MkdirOp.class),
+ OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class),
@Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete
@Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete
- OP_SET_PERMISSIONS ((byte) 7),
- OP_SET_OWNER ((byte) 8),
- OP_CLOSE ((byte) 9),
- OP_SET_GENSTAMP_V1 ((byte) 10),
- OP_SET_NS_QUOTA ((byte) 11), // obsolete
- OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete
- OP_TIMES ((byte) 13), // set atime, mtime
- OP_SET_QUOTA ((byte) 14),
- OP_RENAME ((byte) 15), // filecontext rename
- OP_CONCAT_DELETE ((byte) 16), // concat files
- OP_SYMLINK ((byte) 17),
- OP_GET_DELEGATION_TOKEN ((byte) 18),
- OP_RENEW_DELEGATION_TOKEN ((byte) 19),
- OP_CANCEL_DELEGATION_TOKEN ((byte) 20),
- OP_UPDATE_MASTER_KEY ((byte) 21),
- OP_REASSIGN_LEASE ((byte) 22),
- OP_END_LOG_SEGMENT ((byte) 23),
- OP_START_LOG_SEGMENT ((byte) 24),
- OP_UPDATE_BLOCKS ((byte) 25),
- OP_CREATE_SNAPSHOT ((byte) 26),
- OP_DELETE_SNAPSHOT ((byte) 27),
- OP_RENAME_SNAPSHOT ((byte) 28),
- OP_ALLOW_SNAPSHOT ((byte) 29),
- OP_DISALLOW_SNAPSHOT ((byte) 30),
- OP_SET_GENSTAMP_V2 ((byte) 31),
- OP_ALLOCATE_BLOCK_ID ((byte) 32),
- OP_ADD_BLOCK ((byte) 33),
- OP_ADD_CACHE_DIRECTIVE ((byte) 34),
- OP_REMOVE_CACHE_DIRECTIVE ((byte) 35),
- OP_ADD_CACHE_POOL ((byte) 36),
- OP_MODIFY_CACHE_POOL ((byte) 37),
- OP_REMOVE_CACHE_POOL ((byte) 38),
- OP_MODIFY_CACHE_DIRECTIVE ((byte) 39),
- OP_SET_ACL ((byte) 40),
- OP_ROLLING_UPGRADE_START ((byte) 41),
- OP_ROLLING_UPGRADE_FINALIZE ((byte) 42),
- OP_SET_XATTR ((byte) 43),
- OP_REMOVE_XATTR ((byte) 44),
- OP_SET_STORAGE_POLICY ((byte) 45),
- OP_TRUNCATE ((byte) 46),
- OP_APPEND ((byte) 47),
- OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48),
+ OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class),
+ OP_SET_OWNER ((byte) 8, SetOwnerOp.class),
+ OP_CLOSE ((byte) 9, CloseOp.class),
+ OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class),
+ OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete
+ OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete
+ OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime
+ OP_SET_QUOTA ((byte) 14, SetQuotaOp.class),
+ // filecontext rename
+ OP_RENAME ((byte) 15, RenameOp.class),
+ // concat files
+ OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class),
+ OP_SYMLINK ((byte) 17, SymlinkOp.class),
+ OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class),
+ OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class),
+ OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class),
+ OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class),
+ OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class),
+ OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class),
+ OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class),
+ OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class),
+ OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class),
+ OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class),
+ OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class),
+ OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class),
+ OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class),
+ OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class),
+ OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class),
+ OP_ADD_BLOCK ((byte) 33, AddBlockOp.class),
+ OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class),
+ OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class),
+ OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class),
+ OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class),
+ OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class),
+ OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class),
+ OP_SET_ACL ((byte) 40, SetAclOp.class),
+ OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class),
+ OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class),
+ OP_SET_XATTR ((byte) 43, SetXAttrOp.class),
+ OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class),
+ OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class),
+ OP_TRUNCATE ((byte) 46, TruncateOp.class),
+ OP_APPEND ((byte) 47, AppendOp.class),
+ OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
private final byte opCode;
+ private final Class<? extends FSEditLogOp> opClass;
/**
* Constructor
@@ -88,7 +93,12 @@ public enum FSEditLogOpCodes {
* @param opCode byte value of constructed enum
*/
FSEditLogOpCodes(byte opCode) {
+ this(opCode, null);
+ }
+
+ FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {
this.opCode = opCode;
+ this.opClass = opClass;
}
/**
@@ -100,6 +110,10 @@ public enum FSEditLogOpCodes {
return opCode;
}
+ public Class<? extends FSEditLogOp> getOpClass() {
+ return opClass;
+ }
+
private static final FSEditLogOpCodes[] VALUES;
static {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 084f82a..b637105 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -140,7 +140,7 @@ public class FSImage implements Closeable {
storage.setRestoreFailedStorage(true);
}
- this.editLog = new FSEditLog(conf, storage, editsDirs);
+ this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index f376901..e8900ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -1263,7 +1263,6 @@ public class NameNode implements NameNodeStatusMXBean {
newSharedEditLog.logEdit(op);
if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
- newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
LOG.debug("ending log segment because of END_LOG_SEGMENT op in "
+ stream);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 54b5c6e..ce0b050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -283,8 +283,8 @@ public class NameNodeMetrics {
transactions.add(latency);
}
- public void incrTransactionsBatchedInSync() {
- transactionsBatchedInSync.incr();
+ public void incrTransactionsBatchedInSync(long count) {
+ transactionsBatchedInSync.incr(count);
}
public void addSync(long elapsed) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index dc1853a..0b08996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -267,6 +267,9 @@ public class DFSTestUtil {
}
public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) {
+ // spies are shallow copies, must allow async log to restart its thread
+ // so it has the new copy
+ newLog.restart();
Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog);
Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
index c79e0c2..9b42cac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
@@ -69,17 +69,21 @@ import org.junit.runners.Parameterized.Parameters;
public class TestAuditLogs {
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
final boolean useAsyncLog;
-
+ final boolean useAsyncEdits;
+
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
- params.add(new Object[]{new Boolean(false)});
- params.add(new Object[]{new Boolean(true)});
+ params.add(new Object[]{Boolean.FALSE, Boolean.FALSE});
+ params.add(new Object[]{Boolean.TRUE, Boolean.FALSE});
+ params.add(new Object[]{Boolean.FALSE, Boolean.TRUE});
+ params.add(new Object[]{Boolean.TRUE, Boolean.TRUE});
return params;
}
-
- public TestAuditLogs(boolean useAsyncLog) {
+
+ public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) {
this.useAsyncLog = useAsyncLog;
+ this.useAsyncEdits = useAsyncEdits;
}
// Pattern for:
@@ -116,6 +120,7 @@ public class TestAuditLogs {
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits);
util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
setNumFiles(20).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 14240e0..1eb377a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -88,6 +88,9 @@ import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
@@ -98,12 +101,33 @@ import com.google.common.collect.Lists;
/**
* This class tests the creation and validation of a checkpoint.
*/
+@RunWith(Parameterized.class)
public class TestEditLog {
-
+
static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
}
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestEditLog(Boolean async) {
+ useAsyncEditLog = async;
+ }
+
+ public static Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
+ return conf;
+ }
+
/**
* A garbage mkdir op which is used for testing
* {@link EditLogFileInputStream#scanEditLog(File)}
@@ -225,11 +249,12 @@ public class TestEditLog {
* @param storage Storage object used by namenode
*/
private static FSEditLog getFSEditLog(NNStorage storage) throws IOException {
- Configuration conf = new Configuration();
+ Configuration conf = getConf();
// Make sure the edits dirs are set in the provided configuration object.
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
StringUtils.join(",", storage.getEditsDirectories()));
- FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
+ FSEditLog log = FSEditLog.newInstance(
+ conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
return log;
}
@@ -252,7 +277,7 @@ public class TestEditLog {
*/
@Test
public void testPreTxidEditLogWithEdits() throws Exception {
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
try {
@@ -282,7 +307,7 @@ public class TestEditLog {
@Test
public void testSimpleEditLog() throws IOException {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
try {
@@ -351,7 +376,7 @@ public class TestEditLog {
private void testEditLog(int initialSize) throws IOException {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
@@ -482,8 +507,12 @@ public class TestEditLog {
@Test
public void testSyncBatching() throws Exception {
- // start a cluster
- Configuration conf = new HdfsConfiguration();
+ if (useAsyncEditLog) {
+ // semantics are completely differently since edits will be auto-synced
+ return;
+ }
+ // start a cluster
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
ExecutorService threadA = Executors.newSingleThreadExecutor();
@@ -546,7 +575,7 @@ public class TestEditLog {
@Test
public void testBatchedSyncWithClosedLogs() throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
ExecutorService threadA = Executors.newSingleThreadExecutor();
@@ -586,7 +615,7 @@ public class TestEditLog {
@Test
public void testEditChecksum() throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
@@ -658,7 +687,7 @@ public class TestEditLog {
*/
private void testCrashRecovery(int numTransactions) throws Exception {
MiniDFSCluster cluster = null;
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY,
CHECKPOINT_ON_STARTUP_MIN_TXNS);
@@ -803,7 +832,7 @@ public class TestEditLog {
boolean updateTransactionIdFile, boolean shouldSucceed)
throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATA_NODES).build();
@@ -1134,7 +1163,7 @@ public class TestEditLog {
public static NNStorage setupEdits(List<URI> editUris, int numrolls,
boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException {
List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
- NNStorage storage = new NNStorage(new Configuration(),
+ NNStorage storage = new NNStorage(getConf(),
Collections.<URI>emptyList(),
editUris);
storage.format(new NamespaceInfo());
@@ -1296,7 +1325,7 @@ public class TestEditLog {
EditLogFileOutputStream elfos = null;
EditLogFileInputStream elfis = null;
try {
- elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0);
+ elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0);
elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
elfos.writeRaw(garbage, 0, garbage.length);
elfos.setReadyToFlush();
@@ -1474,7 +1503,7 @@ public class TestEditLog {
public void testManyEditLogSegments() throws IOException {
final int NUM_EDIT_LOG_ROLLS = 1000;
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
index f22ee2f..c60d79f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Random;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY;
@@ -30,18 +32,40 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Supplier;
+@RunWith(Parameterized.class)
public class TestEditLogAutoroll {
+ static {
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestEditLogAutoroll(Boolean async) {
+ useAsyncEditLog = async;
+ }
private Configuration conf;
private MiniDFSCluster cluster;
@@ -61,6 +85,8 @@ public class TestEditLogAutoroll {
// Make it autoroll after 10 edits
conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f);
conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
int retryCount = 0;
while (true) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
index 51dfc3e..28169bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
@@ -21,12 +21,13 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -43,13 +44,37 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
+@RunWith(Parameterized.class)
public class TestEditLogJournalFailures {
private int editsPerformed = 0;
private MiniDFSCluster cluster;
private FileSystem fs;
+ private boolean useAsyncEdits;
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{Boolean.FALSE});
+ params.add(new Object[]{Boolean.TRUE});
+ return params;
+ }
+
+ public TestEditLogJournalFailures(boolean useAsyncEdits) {
+ this.useAsyncEdits = useAsyncEdits;
+ }
+
+ private Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEdits);
+ return conf;
+ }
/**
* Create the mini cluster for testing and sub in a custom runtime so that
@@ -57,9 +82,9 @@ public class TestEditLogJournalFailures {
*/
@Before
public void setUpMiniCluster() throws IOException {
- setUpMiniCluster(new HdfsConfiguration(), true);
+ setUpMiniCluster(getConf(), true);
}
-
+
public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs)
throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
@@ -153,7 +178,7 @@ public class TestEditLogJournalFailures {
String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings(
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
shutDownMiniCluster();
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
@@ -193,7 +218,7 @@ public class TestEditLogJournalFailures {
throws IOException {
// Set up 4 name/edits dirs.
shutDownMiniCluster();
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
String[] nameDirs = new String[4];
for (int i = 0; i < nameDirs.length; i++) {
File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index fcffbc3..195ce5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer;
* This class tests various synchronization bugs in FSEditLog rolling
* and namespace saving.
*/
+@RunWith(Parameterized.class)
public class TestEditLogRace {
static {
GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL);
}
- private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ false });
+ params.add(new Object[]{ true });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
- private static final String NAME_DIR =
- MiniDFSCluster.getBaseDirectory() + "name1";
+ public TestEditLogRace(boolean useAsyncEditLog) {
+ TestEditLogRace.useAsyncEditLog = useAsyncEditLog;
+ }
+
+ private static final Log LOG = LogFactory.getLog(TestEditLogRace.class);
// This test creates NUM_THREADS threads and each thread continuously writes
// transactions
@@ -94,21 +113,29 @@ public class TestEditLogRace {
* This value needs to be significantly longer than the average
* time for an fsync() or enterSafeMode().
*/
- private static final int BLOCK_TIME = 10;
-
+ private static final int BLOCK_TIME = 4; // 4 sec pretty generous
+
//
// an object that does a bunch of transactions
//
static class Transactions implements Runnable {
final NamenodeProtocols nn;
+ final MiniDFSCluster cluster;
+ FileSystem fs;
short replication = 3;
long blockSize = 64;
volatile boolean stopped = false;
volatile Thread thr;
final AtomicReference<Throwable> caught;
- Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) {
- nn = ns;
+ Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) {
+ this.cluster = cluster;
+ this.nn = cluster.getNameNodeRpc();
+ try {
+ this.fs = cluster.getFileSystem();
+ } catch (IOException e) {
+ caught.set(e);
+ }
this.caught = caught;
}
@@ -122,11 +149,23 @@ public class TestEditLogRace {
while (!stopped) {
try {
String dirname = "/thr-" + thr.getId() + "-dir-" + i;
- nn.mkdirs(dirname, p, true);
- nn.delete(dirname, true);
+ if (i % 2 == 0) {
+ Path dirnamePath = new Path(dirname);
+ fs.mkdirs(dirnamePath);
+ fs.delete(dirnamePath, true);
+ } else {
+ nn.mkdirs(dirname, p, true);
+ nn.delete(dirname, true);
+ }
} catch (SafeModeException sme) {
// This is OK - the tests will bring NN in and out of safemode
} catch (Throwable e) {
+ // This is OK - the tests will bring NN in and out of safemode
+ if (e instanceof RemoteException &&
+ ((RemoteException)e).getClassName()
+ .contains("SafeModeException")) {
+ return;
+ }
LOG.warn("Got error in transaction thread", e);
caught.compareAndSet(null, e);
break;
@@ -144,11 +183,11 @@ public class TestEditLogRace {
}
}
- private void startTransactionWorkers(NamenodeProtocols namesystem,
+ private void startTransactionWorkers(MiniDFSCluster cluster,
AtomicReference<Throwable> caughtErr) {
// Create threads and make them run transactions concurrently.
for (int i = 0; i < NUM_THREADS; i++) {
- Transactions trans = new Transactions(namesystem, caughtErr);
+ Transactions trans = new Transactions(cluster, caughtErr);
new Thread(trans, "TransactionThread-" + i).start();
workers.add(trans);
}
@@ -174,21 +213,21 @@ public class TestEditLogRace {
@Test
public void testEditLogRolling() throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = null;
+ Configuration conf = getConf();
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
FileSystem fileSys = null;
AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>();
try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
final NamenodeProtocols nn = cluster.getNameNode().getRpcServer();
FSImage fsimage = cluster.getNamesystem().getFSImage();
StorageDirectory sd = fsimage.getStorage().getStorageDir(0);
- startTransactionWorkers(nn, caughtErr);
+ startTransactionWorkers(cluster, caughtErr);
long previousLogTxId = 1;
@@ -256,7 +295,7 @@ public class TestEditLogRace {
@Test
public void testSaveNamespace() throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
@@ -266,12 +305,11 @@ public class TestEditLogRace {
cluster.waitActive();
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
- final NamenodeProtocols nn = cluster.getNameNodeRpc();
FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = fsimage.getEditLog();
- startTransactionWorkers(nn, caughtErr);
+ startTransactionWorkers(cluster, caughtErr);
for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) {
try {
@@ -321,11 +359,13 @@ public class TestEditLogRace {
private Configuration getConf() {
Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
- conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
- conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
- conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
+ //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR);
+ //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR);
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
return conf;
}
@@ -389,7 +429,7 @@ public class TestEditLogRace {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
LOG.info("Flush called");
- if (Thread.currentThread() == doAnEditThread) {
+ if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) {
LOG.info("edit thread: Telling main thread we made it to flush section...");
// Signal to main thread that the edit thread is in the racy section
waitToEnterFlush.countDown();
@@ -457,62 +497,52 @@ public class TestEditLogRace {
try {
FSImage fsimage = namesystem.getFSImage();
- FSEditLog editLog = spy(fsimage.getEditLog());
- DFSTestUtil.setEditLogForTesting(namesystem, editLog);
+ final FSEditLog editLog = fsimage.getEditLog();
final AtomicReference<Throwable> deferredException =
new AtomicReference<Throwable>();
- final CountDownLatch waitToEnterSync = new CountDownLatch(1);
-
+ final CountDownLatch sleepingBeforeSync = new CountDownLatch(1);
+
final Thread doAnEditThread = new Thread() {
@Override
public void run() {
try {
- LOG.info("Starting mkdirs");
- namesystem.mkdirs("/test",
- new PermissionStatus("test","test", new FsPermission((short)00755)),
- true);
- LOG.info("mkdirs complete");
+ LOG.info("Starting setOwner");
+ namesystem.writeLock();
+ try {
+ editLog.logSetOwner("/","test","test");
+ } finally {
+ namesystem.writeUnlock();
+ }
+ sleepingBeforeSync.countDown();
+ LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
+ Thread.sleep(BLOCK_TIME*1000);
+ editLog.logSync();
+ LOG.info("edit thread: logSync complete");
} catch (Throwable ioe) {
LOG.fatal("Got exception", ioe);
deferredException.set(ioe);
- waitToEnterSync.countDown();
- }
- }
- };
-
- Answer<Void> blockingSync = new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- LOG.info("logSync called");
- if (Thread.currentThread() == doAnEditThread) {
- LOG.info("edit thread: Telling main thread we made it just before logSync...");
- waitToEnterSync.countDown();
- LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs");
- Thread.sleep(BLOCK_TIME*1000);
- LOG.info("Going through to logSync. This will allow the main thread to continue.");
+ sleepingBeforeSync.countDown();
}
- invocation.callRealMethod();
- LOG.info("logSync complete");
- return null;
}
};
- doAnswer(blockingSync).when(editLog).logSync();
-
+ doAnEditThread.setDaemon(true);
doAnEditThread.start();
LOG.info("Main thread: waiting to just before logSync...");
- waitToEnterSync.await();
+ sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS);
assertNull(deferredException.get());
LOG.info("Main thread: detected that logSync about to be called.");
LOG.info("Trying to enter safe mode.");
- LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits");
-
+
long st = Time.now();
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
long et = Time.now();
- LOG.info("Entered safe mode");
- // Make sure we really waited for the flush to complete!
- assertTrue(et - st > (BLOCK_TIME - 1)*1000);
+ LOG.info("Entered safe mode after "+(et-st)+"ms");
+
+ // Make sure we didn't wait for the thread that did a logEdit but
+ // not logSync. Going into safemode does a logSyncAll that will flush
+ // its edit.
+ assertTrue(et - st < (BLOCK_TIME/2)*1000);
// Once we're in safe mode, save namespace.
namesystem.saveNamespace(0, 0);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
index b0e5704..fe29e1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
@@ -31,6 +31,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
@@ -59,28 +61,51 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Level;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+@RunWith(Parameterized.class)
public class TestFSEditLogLoader {
-
+ @Parameters
+ public static Collection<Object[]> data() {
+ Collection<Object[]> params = new ArrayList<Object[]>();
+ params.add(new Object[]{ Boolean.FALSE });
+ params.add(new Object[]{ Boolean.TRUE });
+ return params;
+ }
+
+ private static boolean useAsyncEditLog;
+ public TestFSEditLogLoader(Boolean async) {
+ useAsyncEditLog = async;
+ }
+
+ private static Configuration getConf() {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING,
+ useAsyncEditLog);
+ return conf;
+ }
+
static {
GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL);
GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL);
}
-
+
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0;
private static final ErasureCodingPolicy testECPolicy
= StripedFileTestUtil.TEST_EC_POLICY;
-
+
@Test
public void testDisplayRecentEditLogOpCodes() throws IOException {
- // start a cluster
- Configuration conf = new HdfsConfiguration();
+ // start a cluster
+ Configuration conf = getConf();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
@@ -130,7 +155,7 @@ public class TestFSEditLogLoader {
@Test
public void testReplicationAdjusted() throws Exception {
// start a cluster
- Configuration conf = new HdfsConfiguration();
+ Configuration conf = getConf();
// Replicate and heartbeat fast to shave a few seconds off test
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);