You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2021/05/26 22:08:10 UTC
[hadoop] branch trunk updated: HDFS-15915. Race condition with
async edits logging due to updating txId outside of the namesystem log.
Contributed by Konstantin V Shvachko.
This is an automated email from the ASF dual-hosted git repository.
shv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1abd03d HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko.
1abd03d is described below
commit 1abd03d68f4f236674ce929164cc460037730abb
Author: Konstantin V Shvachko <sh...@apache.org>
AuthorDate: Wed May 26 12:07:13 2021 -0700
HDFS-15915. Race condition with async edits logging due to updating txId outside of the namesystem log. Contributed by Konstantin V Shvachko.
---
.../hdfs/server/namenode/EditLogOutputStream.java | 11 ++
.../hadoop/hdfs/server/namenode/FSEditLog.java | 39 +++++--
.../hdfs/server/namenode/FSEditLogAsync.java | 7 +-
.../hadoop/hdfs/server/namenode/JournalSet.java | 19 +++-
.../hdfs/server/namenode/NameNodeAdapter.java | 36 ++++++-
.../hdfs/server/namenode/TestEditLogRace.java | 66 +++++++-----
.../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 12 +++
.../hdfs/server/namenode/ha/TestObserverNode.java | 118 +++++++++++++++++++++
8 files changed, 270 insertions(+), 38 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
index 27733cf..6f43d73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
/**
* A generic abstract class to support journaling of edits logs into
@@ -43,6 +44,16 @@ public abstract class EditLogOutputStream implements Closeable {
}
/**
+ * Get the last txId journalled in the stream.
+ * The txId is recorded when FSEditLogOp is written to the stream.
+ * The default implementation is dummy.
+ * JournalSet tracks the txId uniformly for all underlying streams.
+ */
+ public long getLastJournalledTxId() {
+ return HdfsServerConstants.INVALID_TXID;
+ };
+
+ /**
* Write edits log operation to the stream.
*
* @param op operation
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 79f039b..6048457 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -218,7 +218,10 @@ public class FSEditLog implements LogsPurgeable {
private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
@Override
protected synchronized TransactionId initialValue() {
- return new TransactionId(Long.MAX_VALUE);
+ // If an RPC call did not generate any transactions,
+ // logSync() should exit without syncing
+ // Therefore the initial value of myTransactionId should be 0
+ return new TransactionId(0L);
}
};
@@ -463,6 +466,7 @@ public class FSEditLog implements LogsPurgeable {
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
+ beginTransaction(op);
// check if it is time to schedule an automatic sync
needsSync = doEditTransaction(op);
if (needsSync) {
@@ -477,9 +481,11 @@ public class FSEditLog implements LogsPurgeable {
}
synchronized boolean doEditTransaction(final FSEditLogOp op) {
- long start = beginTransaction();
- op.setTransactionId(txid);
+ LOG.debug("doEditTx() op={} txid={}", op, txid);
+ assert op.hasTransactionId() :
+ "Transaction id is not set for " + op + " EditLog.txId=" + txid;
+ long start = monotonicNow();
try {
editLogStream.write(op);
} catch (IOException ex) {
@@ -523,7 +529,7 @@ public class FSEditLog implements LogsPurgeable {
return editLogStream.shouldForceSync();
}
- private long beginTransaction() {
+ protected void beginTransaction(final FSEditLogOp op) {
assert Thread.holdsLock(this);
// get a new transactionId
txid++;
@@ -533,7 +539,9 @@ public class FSEditLog implements LogsPurgeable {
//
TransactionId id = myTransactionId.get();
id.txid = txid;
- return monotonicNow();
+ if(op != null) {
+ op.setTransactionId(txid);
+ }
}
private void endTransaction(long start) {
@@ -650,7 +658,7 @@ public class FSEditLog implements LogsPurgeable {
}
protected void logSync(long mytxid) {
- long syncStart = 0;
+ long lastJournalledTxId = HdfsServerConstants.INVALID_TXID;
boolean sync = false;
long editsBatchedInSync = 0;
try {
@@ -677,8 +685,16 @@ public class FSEditLog implements LogsPurgeable {
// now, this thread will do the sync. track if other edits were
// included in the sync - ie. batched. if this is the only edit
// synced then the batched count is 0
- editsBatchedInSync = txid - synctxid - 1;
- syncStart = txid;
+ lastJournalledTxId = editLogStream.getLastJournalledTxId();
+ LOG.debug("logSync(tx) synctxid={} lastJournalledTxId={} mytxid={}",
+ synctxid, lastJournalledTxId, mytxid);
+ assert lastJournalledTxId <= txid : "lastJournalledTxId exceeds txid";
+ // The stream has already been flushed, or there are no active streams
+ // We still try to flush up to mytxid
+ if(lastJournalledTxId <= synctxid) {
+ lastJournalledTxId = mytxid;
+ }
+ editsBatchedInSync = lastJournalledTxId - synctxid - 1;
isSyncRunning = true;
sync = true;
@@ -738,7 +754,7 @@ public class FSEditLog implements LogsPurgeable {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
- synctxid = syncStart;
+ synctxid = lastJournalledTxId;
for (JournalManager jm : journalSet.getJournalManagers()) {
/**
* {@link FileJournalManager#lastReadableTxId} is only meaningful
@@ -746,7 +762,7 @@ public class FSEditLog implements LogsPurgeable {
* other types of {@link JournalManager}.
*/
if (jm instanceof FileJournalManager) {
- ((FileJournalManager)jm).setLastReadableTxId(syncStart);
+ ((FileJournalManager)jm).setLastReadableTxId(synctxid);
}
}
isSyncRunning = false;
@@ -1618,7 +1634,8 @@ public class FSEditLog implements LogsPurgeable {
* store yet.
*/
synchronized void logEdit(final int length, final byte[] data) {
- long start = beginTransaction();
+ beginTransaction(null);
+ long start = monotonicNow();
try {
editLogStream.writeRaw(data, 0, length);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
index f60b458..34bf257 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java
@@ -125,9 +125,14 @@ class FSEditLogAsync extends FSEditLog implements Runnable {
@Override
void logEdit(final FSEditLogOp op) {
+ assert isOpenForWrite();
+
Edit edit = getEditInstance(op);
THREAD_EDIT.set(edit);
- enqueueEdit(edit);
+ synchronized(this) {
+ enqueueEdit(edit);
+ beginTransaction(op);
+ }
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index abcab80..e17d7b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.INVALID_TXID;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
@@ -187,9 +188,11 @@ public class JournalSet implements JournalManager {
final int minimumRedundantJournals;
private boolean closed;
-
+ private long lastJournalledTxId;
+
JournalSet(int minimumRedundantResources) {
this.minimumRedundantJournals = minimumRedundantResources;
+ lastJournalledTxId = INVALID_TXID;
}
@Override
@@ -439,6 +442,16 @@ public class JournalSet implements JournalManager {
super();
}
+ /**
+ * Get the last txId journalled in the stream.
+ * The txId is recorded when FSEditLogOp is written to the journal.
+ * JournalSet tracks the txId uniformly for all underlying streams.
+ */
+ @Override
+ public long getLastJournalledTxId() {
+ return lastJournalledTxId;
+ }
+
@Override
public void write(final FSEditLogOp op)
throws IOException {
@@ -450,6 +463,10 @@ public class JournalSet implements JournalManager {
}
}
}, "write op");
+
+ assert lastJournalledTxId < op.txid : "TxId order violation for op=" +
+ op + ", lastJournalledTxId=" + lastJournalledTxId;
+ lastJournalledTxId = op.txid;
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index a584da1..3731c2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -21,6 +21,8 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
+
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.File;
@@ -55,7 +57,12 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.Whitebox;
+import org.mockito.ArgumentMatcher;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import static org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer.FSIMAGE_ATTRIBUTE_KEY;
/**
@@ -318,7 +325,34 @@ public class NameNodeAdapter {
}
return spyEditLog;
}
-
+
+ /**
+ * Spy on EditLog to delay execution of doEditTransaction() for MkdirOp.
+ */
+ public static FSEditLog spyDelayMkDirTransaction(
+ final NameNode nn, final long delay) {
+ FSEditLog realEditLog = nn.getFSImage().getEditLog();
+ FSEditLogAsync spyEditLog = (FSEditLogAsync) spy(realEditLog);
+ DFSTestUtil.setEditLogForTesting(nn.getNamesystem(), spyEditLog);
+ Answer<Boolean> ans = new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(delay);
+ return (Boolean) invocation.callRealMethod();
+ }
+ };
+ ArgumentMatcher<FSEditLogOp> am = new ArgumentMatcher<FSEditLogOp>() {
+ @Override
+ public boolean matches(FSEditLogOp argument) {
+ FSEditLogOp op = (FSEditLogOp) argument;
+ return op.opCode == FSEditLogOpCodes.OP_MKDIR;
+ }
+ };
+ doAnswer(ans).when(spyEditLog).doEditTransaction(
+ ArgumentMatchers.argThat(am));
+ return spyEditLog;
+ }
+
public static JournalSet spyOnJournalSet(NameNode nn) {
FSEditLog editLog = nn.getFSImage().getEditLog();
JournalSet js = Mockito.spy(editLog.getJournalSet());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
index f844eb3..083d9e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_SET_OWNER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@@ -53,13 +55,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
-import org.mockito.Mockito;
+import org.mockito.ArgumentMatcher;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -286,12 +290,12 @@ public class TestEditLogRace {
File editFile = new File(sd.getCurrentDir(), logFileName);
- System.out.println("Verifying file: " + editFile);
+ LOG.info("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
long numEditsThisLog = loader.loadFSEdits(
new EditLogFileInputStream(editFile), startTxId);
- System.out.println("Number of edits: " + numEditsThisLog);
+ LOG.info("Number of edits: " + numEditsThisLog);
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
numEdits = numEditsThisLog;
}
@@ -576,9 +580,29 @@ public class TestEditLogRace {
}
}
+ static SetOwnerOp getSetOwnerOp(OpInstanceCache cache, String group) {
+ return ((SetOwnerOp)cache.get(OP_SET_OWNER))
+ .setSource("/").setUser("u").setGroup(group);
+ }
+
+ static class BlockingOpMatcher implements ArgumentMatcher<FSEditLogOp> {
+ @Override
+ public boolean matches(FSEditLogOp o) {
+ if(o instanceof FSEditLogOp.SetOwnerOp) {
+ FSEditLogOp.SetOwnerOp op = (FSEditLogOp.SetOwnerOp)o;
+ if("b".equals(op.groupname)) {
+ LOG.info("Blocking op: " + op);
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
@Test(timeout=180000)
public void testDeadlock() throws Throwable {
- GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.INFO);
+ GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(FSEditLogAsync.LOG, Level.DEBUG);
Configuration conf = getConf();
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
@@ -591,21 +615,17 @@ public class TestEditLogRace {
ExecutorService executor = Executors.newCachedThreadPool();
try {
- final FSEditLog editLog = namesystem.getEditLog();
+ final FSEditLog editLog = spy(namesystem.getEditLog());
+ DFSTestUtil.setEditLogForTesting(namesystem, editLog);
- FSEditLogOp.OpInstanceCache cache = editLog.cache.get();
- final FSEditLogOp op = FSEditLogOp.SetOwnerOp.getInstance(cache)
- .setSource("/").setUser("u").setGroup("g");
- // don't reset fields so instance can be reused.
- final FSEditLogOp reuseOp = Mockito.spy(op);
- Mockito.doNothing().when(reuseOp).reset();
+ final OpInstanceCache cache = editLog.cache.get();
// only job is spam edits. it will fill the queue when the test
// loop injects the blockingOp.
- Future[] logSpammers = new Future[16];
+ Future<?>[] logSpammers = new Future<?>[16];
for (int i=0; i < logSpammers.length; i++) {
final int ii = i;
- logSpammers[i] = executor.submit(new Callable() {
+ logSpammers[i] = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log spammer " + ii);
@@ -613,7 +633,7 @@ public class TestEditLogRace {
startSpamLatch.await();
for (int i = 0; !done.get() && i < 1000000; i++) {
// do not logSync here because we need to congest the queue.
- editLog.logEdit(reuseOp);
+ editLog.logEdit(getSetOwnerOp(cache, "g"));
if (i % 2048 == 0) {
LOG.info("thread[" + ii +"] edits=" + i);
}
@@ -624,10 +644,9 @@ public class TestEditLogRace {
});
}
- // the tx id is set while the edit log monitor is held, so this will
- // effectively stall the async processing thread which will cause the
- // edit queue to fill up.
- final FSEditLogOp blockingOp = Mockito.spy(op);
+ // doEditTransaction is set while the edit log monitor is held, so this
+ // will effectively stall the async processing thread which will cause
+ // the edit queue to fill up.
doAnswer(
new Answer<Void>() {
@Override
@@ -641,9 +660,7 @@ public class TestEditLogRace {
return null;
}
}
- ).when(blockingOp).setTransactionId(Mockito.anyLong());
- // don't reset fields so instance can be reused.
- Mockito.doNothing().when(blockingOp).reset();
+ ).when(editLog).doEditTransaction(argThat(new BlockingOpMatcher()));
// repeatedly overflow the queue and verify it doesn't deadlock.
for (int i = 0; i < 8; i++) {
@@ -651,10 +668,11 @@ public class TestEditLogRace {
// spammers to overflow the edit queue, then waits for a permit
// from blockerSemaphore that will be released at the bottom of
// this loop.
- Future blockingEdit = executor.submit(new Callable() {
+ Future<Void> blockingEdit = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log blocker");
+ final FSEditLogOp blockingOp = getSetOwnerOp(cache, "b");
editLog.logEdit(blockingOp);
editLog.logSync();
return null;
@@ -685,7 +703,7 @@ public class TestEditLogRace {
// what log rolling does), unblock the op currently holding the
// monitor, and ensure deadlock does not occur.
CountDownLatch readyLatch = new CountDownLatch(1);
- Future synchedEdits = executor.submit(new Callable() {
+ Future<Void> synchedEdits = executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Thread.currentThread().setName("Log synchronizer");
@@ -693,7 +711,7 @@ public class TestEditLogRace {
// log rolling to deadlock when queue is full.
readyLatch.countDown();
synchronized (editLog) {
- editLog.logEdit(reuseOp);
+ editLog.logEdit(getSetOwnerOp(cache, "g"));
editLog.logSync();
}
return null;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index 8d712c1..307fe04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -372,4 +372,16 @@ public abstract class HATestUtil {
lastSeenStateId.accumulate(stateId);
return currentStateId;
}
+
+ /**
+ * Get last seen stateId from the client AlignmentContext.
+ */
+ public static long getLastSeenStateId(DistributedFileSystem dfs)
+ throws Exception {
+ ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+ ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+ dfs.getClient().getNamenode())).getProxyProvider();
+ ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
+ return ac.getLastSeenStateId();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index ed05aba..29cae6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -29,13 +29,18 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -53,12 +58,14 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -489,6 +496,117 @@ public class TestObserverNode {
assertTrue(result.contains("The filesystem under path '/' is CORRUPT"));
}
+ /**
+ * The test models the race of two mkdirs RPC calls on the same path to
+ * Active NameNode. The first arrived call will journal a mkdirs transaction.
+ * The subsequent call hitting the NameNode before the mkdirs transaction is
+ * synced will see that the directory already exists, but will obtain
+ * lastSeenStateId smaller than the txId of the mkdirs transaction
+ * since the latter hasn't been synced yet.
+ * This causes stale read from Observer for the second client.
+ * See HDFS-15915.
+ */
+ @Test
+ public void testMkdirsRaceWithObserverRead() throws Exception {
+ dfs.mkdir(testPath, FsPermission.getDefault());
+ assertSentTo(0);
+ dfsCluster.rollEditLogAndTail(0);
+ dfs.getFileStatus(testPath);
+ assertSentTo(2);
+
+ // Create a spy on FSEditLog, which delays MkdirOp transaction by 100 mec
+ FSEditLog spyEditLog = NameNodeAdapter.spyDelayMkDirTransaction(
+ dfsCluster.getNameNode(0), 100);
+
+ final int numThreads = 4;
+ ClientState[] clientStates = new ClientState[numThreads];
+ final ExecutorService threadPool =
+ HadoopExecutors.newFixedThreadPool(numThreads);
+ final Future<?>[] futures = new Future<?>[numThreads];
+
+ Configuration conf2 = new Configuration(conf);
+ // Disable FS cache so that different DFS clients are used
+ conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+ for (int i = 0; i < numThreads; i++) {
+ clientStates[i] = new ClientState();
+ futures[i] = threadPool.submit(new MkDirRunner(conf2, clientStates[i]));
+ }
+
+ Thread.sleep(150); // wait until mkdir is logged
+ long activStateId =
+ dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId();
+ dfsCluster.rollEditLogAndTail(0);
+ boolean finished = true;
+ // wait for all dispatcher threads to finish
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ finished = false;
+ LOG.warn("MkDirRunner thread failed", e.getCause());
+ }
+ }
+ assertTrue("Not all threads finished", finished);
+ threadPool.shutdown();
+
+ assertEquals("Active and Observer stateIds don't match",
+ dfsCluster.getNameNode(0).getFSImage().getLastAppliedOrWrittenTxId(),
+ dfsCluster.getNameNode(2).getFSImage().getLastAppliedOrWrittenTxId());
+ for (int i = 0; i < numThreads; i++) {
+ assertTrue("Client #" + i
+ + " lastSeenStateId=" + clientStates[i].lastSeenStateId
+ + " activStateId=" + activStateId
+ + "\n" + clientStates[i].fnfe,
+ clientStates[i].lastSeenStateId >= activStateId &&
+ clientStates[i].fnfe == null);
+ }
+
+ // Restore edit log
+ Mockito.reset(spyEditLog);
+ }
+
+ static class ClientState {
+ private long lastSeenStateId = -7;
+ private FileNotFoundException fnfe;
+ }
+
+ static class MkDirRunner implements Runnable {
+ private static final Path DIR_PATH =
+ new Path("/TestObserverNode/testMkdirsRaceWithObserverRead");
+
+ private DistributedFileSystem fs;
+ private ClientState clientState;
+
+ MkDirRunner(Configuration conf, ClientState cs) throws IOException {
+ super();
+ fs = (DistributedFileSystem) FileSystem.get(conf);
+ clientState = cs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ fs.mkdirs(DIR_PATH);
+ clientState.lastSeenStateId = HATestUtil.getLastSeenStateId(fs);
+ assertSentTo(fs, 0);
+
+ FileStatus stat = fs.getFileStatus(DIR_PATH);
+ assertSentTo(fs, 2);
+ assertTrue("Should be a directory", stat.isDirectory());
+ } catch (FileNotFoundException ioe) {
+ clientState.fnfe = ioe;
+ } catch (Exception e) {
+ fail("Unexpected exception: " + e);
+ }
+ }
+ }
+
+ private static void assertSentTo(DistributedFileSystem fs, int nnIdx)
+ throws IOException {
+ assertTrue("Request was not sent to the expected namenode " + nnIdx,
+ HATestUtil.isSentToAnyOfNameNodes(fs, dfsCluster, nnIdx));
+ }
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org