You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 21:18:57 UTC
svn commit: r1373587 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs:
CHANGES.HDFS-3077.txt
src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
Author: todd
Date: Wed Aug 15 19:18:57 2012
New Revision: 1373587
URL: http://svn.apache.org/viewvc?rev=1373587&view=rev
Log:
HDFS-3800. improvements to QJM fault testing. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 19:18:57 2012
@@ -24,3 +24,5 @@ HDFS-3798. Avoid throwing NPE when final
HDFS-3799. QJM: handle empty log segments during recovery (todd)
HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd)
+
+HDFS-3800. improvements to QJM fault testing (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Wed Aug 15 19:18:57 2012
@@ -61,7 +61,7 @@ public abstract class QJMTestUtil {
}
public static void writeSegment(MiniJournalCluster cluster,
- QuorumJournalManager qjm, int startTxId, int numTxns,
+ QuorumJournalManager qjm, long startTxId, int numTxns,
boolean finalize) throws IOException {
EditLogOutputStream stm = qjm.startLogSegment(startTxId);
// Should create in-progress
@@ -81,7 +81,7 @@ public abstract class QJMTestUtil {
stm.write(op);
}
- public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
+ public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns)
throws IOException {
for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
writeOp(stm, txid);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Aug 15 19:18:57 2012
@@ -17,26 +17,39 @@
*/
package org.apache.hadoop.hdfs.qjournal.client;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.Closeable;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.SortedSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.junit.BeforeClass;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -46,23 +59,27 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import com.google.common.util.concurrent.MoreExecutors;
public class TestQJMWithFaults {
private static final Log LOG = LogFactory.getLog(
TestQJMWithFaults.class);
+ private static final String RAND_SEED_PROPERTY =
+ "TestQJMWithFaults.random-seed";
+
+ private static final int NUM_WRITER_ITERS = 500;
+ private static final int SEGMENTS_PER_WRITER = 2;
+
private static Configuration conf = new Configuration();
static {
// Don't retry connections - it just slows down the tests.
- conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+ conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+
+ // Make tests run faster by avoiding fsync()
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
- private static long MAX_IPC_NUMBER;
-
/**
* Run through the creation of a log without any faults injected,
@@ -70,10 +87,10 @@ public class TestQJMWithFaults {
* bounds for the other test cases, so they can exhaustively explore
* the space of potential failures.
*/
- @BeforeClass
- public static void determineMaxIpcNumber() throws Exception {
+ private static long determineMaxIpcNumber() throws Exception {
Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+ long ret;
try {
QuorumJournalManager qjm = createInjectableQJM(cluster);
qjm.format(FAKE_NSINFO);
@@ -90,11 +107,12 @@ public class TestQJMWithFaults {
// were no failures.
assertEquals(1, ipcCounts.size());
- MAX_IPC_NUMBER = ipcCounts.first();
- LOG.info("Max IPC count = " + MAX_IPC_NUMBER);
+ ret = ipcCounts.first();
+ LOG.info("Max IPC count = " + ret);
} finally {
cluster.shutdown();
}
+ return ret;
}
/**
@@ -106,6 +124,8 @@ public class TestQJMWithFaults {
*/
@Test
public void testRecoverAfterDoubleFailures() throws Exception {
+ final long MAX_IPC_NUMBER = determineMaxIpcNumber();
+
for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
String injectionStr = "(" + failA + ", " + failB + ")";
@@ -132,25 +152,133 @@ public class TestQJMWithFaults {
}
// Now should be able to recover
+ qjm = createInjectableQJM(cluster);
+ long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+ assertTrue(lastRecoveredTxn >= lastAckedTxn);
+
+ writeSegment(cluster, qjm, lastRecoveredTxn + 1, 3, true);
+ } catch (Throwable t) {
+ // Test failure! Rethrow with the test setup info so it can be
+ // easily triaged.
+ throw new RuntimeException("Test failed with injection: " + injectionStr,
+ t);
+ } finally {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Test case in which three JournalNodes randomly flip flop between
+ * up and down states every time they get an RPC.
+ *
+ * The writer keeps track of the latest ACKed edit, and on every
+ * recovery operation, ensures that it recovers at least to that
+ * point or higher. Since at any given point, a majority of JNs
+ * may be injecting faults, any writer operation is allowed to fail,
+ * so long as the exception message indicates it failed due to injected
+ * faults.
+ *
+ * Given a random seed, the test should be entirely deterministic.
+ */
+ @Test
+ public void testRandomized() throws Exception {
+ long seed;
+ Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY);
+ if (userSpecifiedSeed != null) {
+ LOG.info("Using seed specified in system property");
+ seed = userSpecifiedSeed;
+
+ // If the user specifies a seed, then we should gather all the
+ // IPC trace information so that debugging is easier. This makes
+ // the test run about 25% slower otherwise.
+ ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
+ } else {
+ seed = new Random().nextLong();
+ }
+ LOG.info("Random seed: " + seed);
+
+ Random r = new Random(seed);
+
+ MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+ .build();
+
+ // Format the cluster using a non-faulty QJM.
+ QuorumJournalManager qjmForInitialFormat =
+ createInjectableQJM(cluster);
+ qjmForInitialFormat.format(FAKE_NSINFO);
+ qjmForInitialFormat.close();
+
+ try {
+ long txid = 0;
+ long lastAcked = 0;
+
+ for (int i = 0; i < NUM_WRITER_ITERS; i++) {
+ LOG.info("Starting writer " + i + "\n-------------------");
+
+ QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
+ try {
+ if (txid > 100) {
+ qjm.purgeLogsOlderThan(txid - 100);
+ }
+
+ long recovered;
try {
- qjm = createInjectableQJM(cluster);
- qjm.recoverUnfinalizedSegments();
- writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true);
- // TODO: verify log segments
+ recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
} catch (Throwable t) {
- // Test failure! Rethrow with the test setup info so it can be
- // easily triaged.
- throw new RuntimeException("Test failed with injection: " + injectionStr,
- t);
+ LOG.info("Failed recovery", t);
+ GenericTestUtils.assertExceptionContains("faking being down", t);
+ continue;
+ }
+ assertTrue("Recovered only up to txnid " + recovered +
+ " but had gotten an ack for " + lastAcked,
+ recovered >= lastAcked);
+
+ txid = recovered + 1;
+
+ Holder<Throwable> thrown = new Holder<Throwable>(null);
+ for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
+ lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
+ if (thrown.held != null) {
+ LOG.info("Failed write", thrown.held);
+ GenericTestUtils.assertExceptionContains("faking being down",
+ thrown.held);
+ break;
+ }
+ txid += 4;
}
} finally {
- cluster.shutdown();
- cluster = null;
+ qjm.close();
}
}
+ } finally {
+ cluster.shutdown();
}
}
+ private long writeSegmentUntilCrash(MiniJournalCluster cluster,
+ QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
+
+ long firstTxId = txid;
+ long lastAcked = txid - 1;
+ try {
+ EditLogOutputStream stm = qjm.startLogSegment(txid);
+
+ for (int i = 0; i < numTxns; i++) {
+ QJMTestUtil.writeTxns(stm, txid++, 1);
+ lastAcked++;
+ }
+
+ stm.close();
+ qjm.finalizeLogSegment(firstTxId, lastAcked);
+ } catch (Throwable t) {
+ thrown.held = t;
+ }
+ return lastAcked;
+ }
+
/**
* Run a simple workload of becoming the active writer and writing
* two log segments: 1-3 and 4-6.
@@ -179,6 +307,43 @@ public class TestQJMWithFaults {
private void failIpcNumber(AsyncLogger logger, int idx) {
((InvocationCountingChannel)logger).failIpcNumber(idx);
}
+
+ private static class RandomFaultyChannel extends IPCLoggerChannel {
+ private final Random random;
+ private float injectionProbability = 0.1f;
+ private boolean isUp = true;
+
+ public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr, long seed) {
+ super(conf, nsInfo, journalId, addr);
+ this.random = new Random(seed);
+ }
+
+ @Override
+ protected QJournalProtocol createProxy() throws IOException {
+ QJournalProtocol realProxy = super.createProxy();
+ return mockProxy(
+ new WrapEveryCall<Object>(realProxy) {
+ @Override
+ void beforeCall(InvocationOnMock invocation) throws Exception {
+ if (random.nextFloat() < injectionProbability) {
+ isUp = !isUp;
+ LOG.info("transitioned " + addr + " to " +
+ (isUp ? "up" : "down"));
+ }
+
+ if (!isUp) {
+ throw new IOException("Injected - faking being down");
+ }
+ }
+ });
+ }
+
+ @Override
+ protected ExecutorService createExecutor() {
+ return MoreExecutors.sameThreadExecutor();
+ }
+ }
private static class InvocationCountingChannel extends IPCLoggerChannel {
private int rpcCount = 0;
@@ -211,10 +376,9 @@ public class TestQJMWithFaults {
@Override
protected QJournalProtocol createProxy() throws IOException {
final QJournalProtocol realProxy = super.createProxy();
- QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
- new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ QJournalProtocol mock = mockProxy(
+ new WrapEveryCall<Object>(realProxy) {
+ void beforeCall(InvocationOnMock invocation) throws Exception {
rpcCount++;
String callStr = "[" + addr + "] " +
invocation.getMethod().getName() + "(" +
@@ -228,14 +392,44 @@ public class TestQJMWithFaults {
} else {
LOG.info("IPC call #" + rpcCount + ": " + callStr);
}
-
- return invocation.getMethod().invoke(realProxy,
- invocation.getArguments());
}
});
return mock;
}
}
+
+
+ private static QJournalProtocol mockProxy(WrapEveryCall<Object> wrapper)
+ throws IOException {
+ QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
+ Mockito.withSettings()
+ .defaultAnswer(wrapper)
+ .extraInterfaces(Closeable.class));
+ Mockito.doNothing().when((Closeable)mock).close();
+ return mock;
+ }
+
+ private static abstract class WrapEveryCall<T> implements Answer<T> {
+ private final Object realObj;
+ WrapEveryCall(Object realObj) {
+ this.realObj = realObj;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T answer(InvocationOnMock invocation) throws Throwable {
+ beforeCall(invocation);
+
+ try {
+ return (T) invocation.getMethod().invoke(realObj,
+ invocation.getArguments());
+ } catch (InvocationTargetException ite) {
+ throw ite.getCause();
+ }
+ }
+
+ abstract void beforeCall(InvocationOnMock invocation) throws Exception;
+ }
private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
throws IOException, URISyntaxException {
@@ -249,4 +443,21 @@ public class TestQJMWithFaults {
return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
FAKE_NSINFO, spyFactory);
}
+
+ private static QuorumJournalManager createRandomFaultyQJM(
+ MiniJournalCluster cluster, final Random seedGenerator)
+ throws IOException, URISyntaxException {
+
+ AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+ @Override
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
+ seedGenerator.nextLong());
+ }
+ };
+ return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+ FAKE_NSINFO, spyFactory);
+ }
+
}