You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/07 22:24:01 UTC
svn commit: r1370497 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/qjournal/client/
src/test/java/org/apache/hadoop/hdfs/qjournal/
src/test/java/org/apache/hadoop/hdfs/qjournal/client/
Author: todd
Date: Tue Aug 7 20:24:01 2012
New Revision: 1370497
URL: http://svn.apache.org/viewvc?rev=1370497&view=rev
Log:
HDFS-3741. Exhaustive failure injection test for skipped RPCs. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Tue Aug 7 20:24:01 2012
@@ -10,3 +10,5 @@ HDFS-3692. Support purgeEditLogs() call
HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd)
HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
+
+HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Tue Aug 7 20:24:01 2012
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hdfs.qjournal.client;
+import java.net.InetSocketAddress;
import java.net.URL;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.util.concurrent.ListenableFuture;
@@ -43,6 +46,11 @@ import com.google.common.util.concurrent
*/
interface AsyncLogger {
+ interface Factory {
+ AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr);
+ }
+
/**
* Send a batch of edits to the logger.
* @param firstTxnId the first txid of the edits.
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Tue Aug 7 20:24:01 2012
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutExcep
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Tue Aug 7 20:24:01 2012
@@ -65,7 +65,7 @@ import com.google.common.util.concurrent
public class IPCLoggerChannel implements AsyncLogger {
private final Configuration conf;
- private final InetSocketAddress addr;
+ protected final InetSocketAddress addr;
private QJournalProtocol proxy;
private final ListeningExecutorService executor;
@@ -87,7 +87,16 @@ public class IPCLoggerChannel implements
* overflows and it starts to treat the logger as having errored.
*/
private final int queueSizeLimitBytes;
+
+ static final Factory FACTORY = new AsyncLogger.Factory() {
+ @Override
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
+ }
+ };
+
public IPCLoggerChannel(Configuration conf,
NamespaceInfo nsInfo,
String journalId,
@@ -131,15 +140,18 @@ public class IPCLoggerChannel implements
protected QJournalProtocol getProxy() throws IOException {
if (proxy != null) return proxy;
-
+ proxy = createProxy();
+ return proxy;
+ }
+
+ protected QJournalProtocol createProxy() throws IOException {
RPC.setProtocolEngine(conf,
QJournalProtocolPB.class, ProtobufRpcEngine.class);
QJournalProtocolPB pbproxy = RPC.getProxy(
QJournalProtocolPB.class,
RPC.getProtocolVersion(QJournalProtocolPB.class),
addr, conf);
- proxy = new QJournalProtocolTranslatorPB(pbproxy);
- return proxy;
+ return new QJournalProtocolTranslatorPB(pbproxy);
}
@Override
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Tue Aug 7 20:24:01 2012
@@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -78,14 +76,20 @@ public class QuorumJournalManager implem
private final AsyncLoggerSet loggers;
- public QuorumJournalManager(Configuration conf,
+ QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException {
+ this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
+ }
+
+ QuorumJournalManager(Configuration conf,
+ URI uri, NamespaceInfo nsInfo,
+ AsyncLogger.Factory loggerFactory) throws IOException {
Preconditions.checkArgument(conf != null, "must be configured");
this.conf = conf;
this.uri = uri;
this.nsInfo = nsInfo;
- this.loggers = new AsyncLoggerSet(createLoggers());
+ this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
// Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt(
@@ -106,6 +110,11 @@ public class QuorumJournalManager implem
}
+ protected List<AsyncLogger> createLoggers(
+ AsyncLogger.Factory factory) throws IOException {
+ return createLoggers(conf, uri, nsInfo, factory);
+ }
+
static String parseJournalId(URI uri) {
String path = uri.getPath();
Preconditions.checkArgument(path != null && !path.isEmpty(),
@@ -234,17 +243,14 @@ public class QuorumJournalManager implem
}
};
- protected List<AsyncLogger> createLoggers() throws IOException {
- return createLoggers(conf, uri, nsInfo);
- }
-
static List<AsyncLogger> createLoggers(Configuration conf,
- URI uri, NamespaceInfo nsInfo) throws IOException {
+ URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
+ throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = getLoggerAddresses(uri);
String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) {
- ret.add(new IPCLoggerChannel(conf, nsInfo, jid, addr));
+ ret.add(factory.createLogger(conf, nsInfo, jid, addr));
}
return ret;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Tue Aug 7 20:24:01 2012
@@ -17,13 +17,31 @@
*/
package org.apache.hadoop.hdfs.qjournal;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.DataOutputBuffer;
public abstract class QJMTestUtil {
+ public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+ 12345, "mycluster", "my-bp", 0L, 0);
+ public static final String JID = "test-journal";
public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
DataOutputBuffer buf = new DataOutputBuffer();
@@ -38,4 +56,82 @@ public abstract class QJMTestUtil {
return Arrays.copyOf(buf.getData(), buf.getLength());
}
+ public static void writeSegment(MiniJournalCluster cluster,
+ QuorumJournalManager qjm, int startTxId, int numTxns,
+ boolean finalize) throws IOException {
+ EditLogOutputStream stm = qjm.startLogSegment(startTxId);
+ // Should create in-progress
+ assertExistsInQuorum(cluster,
+ NNStorage.getInProgressEditsFileName(startTxId));
+
+ writeTxns(stm, startTxId, numTxns);
+ if (finalize) {
+ stm.close();
+ qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
+ }
+ }
+
+ public static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
+ FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+ op.setTransactionId(txid);
+ stm.write(op);
+ }
+
+ public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
+ throws IOException {
+ for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
+ writeOp(stm, txid);
+ }
+ stm.setReadyToFlush();
+ stm.flush();
+ }
+
+ /**
+ * Verify that the given list of streams contains exactly the range of
+ * transactions specified, inclusive.
+ */
+ public static void verifyEdits(List<EditLogInputStream> streams,
+ int firstTxnId, int lastTxnId) throws IOException {
+
+ Iterator<EditLogInputStream> iter = streams.iterator();
+ assertTrue(iter.hasNext());
+ EditLogInputStream stream = iter.next();
+
+ for (int expected = firstTxnId;
+ expected <= lastTxnId;
+ expected++) {
+
+ FSEditLogOp op = stream.readOp();
+ while (op == null) {
+ assertTrue("Expected to find txid " + expected + ", " +
+ "but no more streams available to read from",
+ iter.hasNext());
+ stream = iter.next();
+ op = stream.readOp();
+ }
+
+ assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+ assertEquals(expected, op.getTransactionId());
+ }
+
+ assertNull(stream.readOp());
+ assertFalse("Expected no more txns after " + lastTxnId +
+ " but more streams are available", iter.hasNext());
+ }
+
+
+ public static void assertExistsInQuorum(MiniJournalCluster cluster,
+ String fname) {
+ int count = 0;
+ for (int i = 0; i < 3; i++) {
+ File dir = cluster.getCurrentDir(i, JID);
+ if (new File(dir, fname).exists()) {
+ count++;
+ }
+ }
+ assertTrue("File " + fname + " should exist in a quorum of dirs",
+ count >= cluster.getQuorumSize());
+ }
+
+
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Tue Aug 7 20:24:01 2012
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
@@ -59,7 +58,8 @@ public class TestEpochsAreUnique {
// With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) {
AsyncLoggerSet als = new AsyncLoggerSet(
- QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO));
+ QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
+ IPCLoggerChannel.FACTORY));
als.createNewUniqueEpoch(FAKE_NSINFO);
assertEquals(i + 1, als.getEpoch());
}
@@ -69,7 +69,8 @@ public class TestEpochsAreUnique {
// skipping some
for (int i = 0; i < 20; i++) {
AsyncLoggerSet als = new AsyncLoggerSet(
- makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)));
+ makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
+ IPCLoggerChannel.FACTORY)));
long newEpoch = -1;
while (true) {
try {
Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1370497&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Tue Aug 7 20:24:01 2012
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+
+
+public class TestQJMWithFaults {
+ private static final Log LOG = LogFactory.getLog(
+ TestQJMWithFaults.class);
+
+ private static Configuration conf = new Configuration();
+ static {
+ // Don't retry connections - it just slows down the tests.
+ conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+ }
+ private static long MAX_IPC_NUMBER;
+
+
+ /**
+ * Run through the creation of a log without any faults injected,
+ * and count how many RPCs are made to each node. This sets the
+ * bounds for the other test cases, so they can exhaustively explore
+ * the space of potential failures.
+ */
+ @BeforeClass
+ public static void determineMaxIpcNumber() throws Exception {
+ Configuration conf = new Configuration();
+ MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+ try {
+ QuorumJournalManager qjm = createInjectableQJM(cluster);
+ doWorkload(cluster, qjm);
+
+ SortedSet<Integer> ipcCounts = Sets.newTreeSet();
+ for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
+ InvocationCountingChannel ch = (InvocationCountingChannel)l;
+ ch.waitForAllPendingCalls();
+ ipcCounts.add(ch.getRpcCount());
+ }
+
+ // All of the loggers should have sent the same number of RPCs, since there
+ // were no failures.
+ assertEquals(1, ipcCounts.size());
+
+ MAX_IPC_NUMBER = ipcCounts.first();
+ LOG.info("Max IPC count = " + MAX_IPC_NUMBER);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Sets up two of the nodes to each drop a single RPC, at all
+ * possible combinations of RPCs. This may result in the
+ * active writer failing to write. After this point, a new writer
+ * should be able to recover and continue writing without
+ * data loss.
+ */
+ @Test
+ public void testRecoverAfterDoubleFailures() throws Exception {
+ for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
+ for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
+ String injectionStr = "(" + failA + ", " + failB + ")";
+
+ LOG.info("\n\n-------------------------------------------\n" +
+ "Beginning test, failing at " + injectionStr + "\n" +
+ "-------------------------------------------\n\n");
+
+ MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+ .build();
+ try {
+ QuorumJournalManager qjm;
+ qjm = createInjectableQJM(cluster);
+ List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
+ failIpcNumber(loggers.get(0), failA);
+ failIpcNumber(loggers.get(1), failB);
+ int lastAckedTxn = doWorkload(cluster, qjm);
+
+ if (lastAckedTxn < 6) {
+ LOG.info("Failed after injecting failures at " + injectionStr +
+ ". This is expected since we injected a failure in the " +
+ "majority.");
+ }
+
+ // Now should be able to recover
+ try {
+ qjm = createInjectableQJM(cluster);
+ qjm.recoverUnfinalizedSegments();
+ writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true);
+ // TODO: verify log segments
+ } catch (Throwable t) {
+ // Test failure! Rethrow with the test setup info so it can be
+ // easily triaged.
+ throw new RuntimeException("Test failed with injection: " + injectionStr,
+ t);
+ }
+ } finally {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Run a simple workload of becoming the active writer and writing
+ * two log segments: 1-3 and 4-6.
+ */
+ private static int doWorkload(MiniJournalCluster cluster,
+ QuorumJournalManager qjm) throws IOException {
+ int lastAcked = 0;
+ try {
+ qjm.recoverUnfinalizedSegments();
+ writeSegment(cluster, qjm, 1, 3, true);
+ lastAcked = 3;
+ writeSegment(cluster, qjm, 4, 3, true);
+ lastAcked = 6;
+ } catch (QuorumException qe) {
+ LOG.info("Failed to write at txid " + lastAcked,
+ qe);
+ }
+ return lastAcked;
+ }
+
+ /**
+ * Inject a failure at the given IPC number, such that the JN never
+ * receives the RPC. The client side sees an IOException. Future
+ * IPCs after this number will be received as usual.
+ */
+ private void failIpcNumber(AsyncLogger logger, int idx) {
+ ((InvocationCountingChannel)logger).failIpcNumber(idx);
+ }
+
+ private static class InvocationCountingChannel extends IPCLoggerChannel {
+ private int rpcCount = 0;
+ private Map<Integer, Callable<Void>> injections = Maps.newHashMap();
+
+ public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ super(conf, nsInfo, journalId, addr);
+ }
+
+ int getRpcCount() {
+ return rpcCount;
+ }
+
+ void failIpcNumber(final int idx) {
+ Preconditions.checkArgument(idx > 0,
+ "id must be positive");
+ inject(idx, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ throw new IOException("injected failed IPC at " + idx);
+ }
+ });
+ }
+
+ private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
+ injections.put(beforeRpcNumber, injectedCode);
+ }
+
+ @Override
+ protected QJournalProtocol createProxy() throws IOException {
+ final QJournalProtocol realProxy = super.createProxy();
+ QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ rpcCount++;
+ String callStr = "[" + addr + "] " +
+ invocation.getMethod().getName() + "(" +
+ Joiner.on(", ").join(invocation.getArguments()) + ")";
+
+ Callable<Void> inject = injections.get(rpcCount);
+ if (inject != null) {
+ LOG.info("Injecting code before IPC #" + rpcCount + ": " +
+ callStr);
+ inject.call();
+ } else {
+ LOG.info("IPC call #" + rpcCount + ": " + callStr);
+ }
+
+ return invocation.getMethod().invoke(realProxy,
+ invocation.getArguments());
+ }
+ });
+ return mock;
+ }
+ }
+
+ private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
+ throws IOException, URISyntaxException {
+ AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+ @Override
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
+ }
+ };
+ return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+ FAKE_NSINFO, spyFactory);
+ }
+}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Tue Aug 7 20:24:01 2012
@@ -18,14 +18,18 @@
package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.*;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URISyntaxException;
-import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,14 +37,9 @@ import org.apache.commons.logging.impl.L
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
@@ -55,7 +54,6 @@ import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
/**
* Functional tests for QuorumJournalManager.
@@ -65,9 +63,6 @@ public class TestQuorumJournalManager {
private static final Log LOG = LogFactory.getLog(
TestQuorumJournalManager.class);
- private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
- 12345, "mycluster", "my-bp", 0L, 0);
- private static final String JID = "testQuorumJournalManager";
private MiniJournalCluster cluster;
private Configuration conf;
private QuorumJournalManager qjm;
@@ -95,18 +90,20 @@ public class TestQuorumJournalManager {
@After
public void shutdown() throws IOException {
- cluster.shutdown();
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
@Test
public void testSingleWriter() throws Exception {
- writeSegment(qjm, 1, 3, true);
+ writeSegment(cluster, qjm, 1, 3, true);
// Should be finalized
checkRecovery(cluster, 1, 3);
// Start a new segment
- writeSegment(qjm, 4, 1, true);
+ writeSegment(cluster, qjm, 4, 1, true);
// Should be finalized
checkRecovery(cluster, 4, 4);
@@ -119,7 +116,7 @@ public class TestQuorumJournalManager {
List<EditLogInputStream> streams = Lists.newArrayList();
readerQjm.selectInputStreams(streams, 0, false);
assertEquals(0, streams.size());
- writeSegment(qjm, 1, 3, true);
+ writeSegment(cluster, qjm, 1, 3, true);
readerQjm.selectInputStreams(streams, 0, false);
try {
@@ -138,7 +135,7 @@ public class TestQuorumJournalManager {
// Ensure correct results when there is a stream in-progress, but we don't
// ask for in-progress.
- writeSegment(qjm, 4, 3, false);
+ writeSegment(cluster, qjm, 4, 3, false);
readerQjm.selectInputStreams(streams, 0, false);
try {
assertEquals(1, streams.size());
@@ -178,13 +175,13 @@ public class TestQuorumJournalManager {
*/
@Test
public void testOneJNMissingSegments() throws Exception {
- writeSegment(qjm, 1, 3, true);
+ writeSegment(cluster, qjm, 1, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(0).stopAndJoin(0);
- writeSegment(qjm, 4, 3, true);
+ writeSegment(cluster, qjm, 4, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.restartJournalNode(0);
- writeSegment(qjm, 7, 3, true);
+ writeSegment(cluster, qjm, 7, 3, true);
waitForAllPendingCalls(qjm.getLoggerSetForTests());
cluster.getJournalNode(1).stopAndJoin(0);
@@ -199,37 +196,6 @@ public class TestQuorumJournalManager {
}
}
- /**
- * TODO: this test needs to be fleshed out to be an exhaustive failure test
- * @throws Exception
- */
- @Test
- public void testOrchestratedFailures() throws Exception {
- writeSegment(qjm, 1, 3, true);
- writeSegment(qjm, 4, 3, true);
-
- SortedSet<Long> serials = Sets.newTreeSet();
- for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
- IPCLoggerChannel ch = (IPCLoggerChannel)l;
- ch.waitForAllPendingCalls();
- serials.add(ch.getNextIpcSerial());
- }
-
- // All of the loggers should have sent the same number of RPCs, since there
- // were no failures.
- assertEquals(1, serials.size());
-
- long maxSerial = serials.first();
- LOG.info("Max IPC serial = " + maxSerial);
-
- cluster.shutdown();
-
- cluster = new MiniJournalCluster.Builder(conf)
- .build();
- qjm = createSpyingQJM();
- spies = qjm.getLoggerSetForTests().getLoggersForTests();
-
- }
/**
* Test case where a new writer picks up from an old one with no failures
@@ -238,8 +204,8 @@ public class TestQuorumJournalManager {
*/
@Test
public void testChangeWritersLogsInSync() throws Exception {
- writeSegment(qjm, 1, 3, false);
- assertExistsInQuorum(cluster,
+ writeSegment(cluster, qjm, 1, 3, false);
+ QJMTestUtil.assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(1));
// Make a new QJM
@@ -301,7 +267,7 @@ public class TestQuorumJournalManager {
qe);
}
- assertExistsInQuorum(cluster,
+ QJMTestUtil.assertExistsInQuorum(cluster,
NNStorage.getInProgressEditsFileName(1));
// Shut down the specified JN, so it's not present during recovery.
@@ -320,7 +286,7 @@ public class TestQuorumJournalManager {
.when(spy).sendEdits(
Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
}
-
+
/**
* edit lengths [3,4,5]
* first recovery:
@@ -389,7 +355,7 @@ public class TestQuorumJournalManager {
@Test
public void testPurgeLogs() throws Exception {
for (int txid = 1; txid <= 5; txid++) {
- writeSegment(qjm, txid, 1, true);
+ writeSegment(cluster, qjm, txid, 1, true);
}
File curDir = cluster.getCurrentDir(0, JID);
GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
@@ -428,78 +394,18 @@ public class TestQuorumJournalManager {
private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {
- return new QuorumJournalManager(
- conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) {
- @Override
- protected List<AsyncLogger> createLoggers() throws IOException {
- List<AsyncLogger> realLoggers = super.createLoggers();
- List<AsyncLogger> spies = Lists.newArrayList();
- for (AsyncLogger logger : realLoggers) {
- spies.add(Mockito.spy(logger));
- }
- return spies;
- }
- };
- }
-
- private void writeSegment(QuorumJournalManager qjm,
- int startTxId, int numTxns, boolean finalize) throws IOException {
- EditLogOutputStream stm = qjm.startLogSegment(startTxId);
- // Should create in-progress
- assertExistsInQuorum(cluster,
- NNStorage.getInProgressEditsFileName(startTxId));
-
- writeTxns(stm, startTxId, numTxns);
- if (finalize) {
- stm.close();
- qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
- }
- }
-
- private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
- throws IOException {
- for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
- TestQuorumJournalManagerUnit.writeOp(stm, txid);
- }
- stm.setReadyToFlush();
- stm.flush();
- }
-
- /**
- * Verify that the given list of streams contains exactly the range of
- * transactions specified, inclusive.
- */
- private void verifyEdits(List<EditLogInputStream> streams,
- int firstTxnId, int lastTxnId) throws IOException {
-
- Iterator<EditLogInputStream> iter = streams.iterator();
- assertTrue(iter.hasNext());
- EditLogInputStream stream = iter.next();
-
- for (int expected = firstTxnId;
- expected <= lastTxnId;
- expected++) {
-
- FSEditLogOp op = stream.readOp();
- while (op == null) {
- assertTrue("Expected to find txid " + expected + ", " +
- "but no more streams available to read from",
- iter.hasNext());
- stream = iter.next();
- op = stream.readOp();
+ AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+ @Override
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
+ conf, nsInfo, journalId, addr));
}
-
- assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
- assertEquals(expected, op.getTransactionId());
- }
-
- assertNull(stream.readOp());
- assertFalse("Expected no more txns after " + lastTxnId +
- " but more streams are available", iter.hasNext());
+ };
+ return new QuorumJournalManager(
+ conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
}
-
-
private static void waitForAllPendingCalls(AsyncLoggerSet als)
throws InterruptedException {
for (AsyncLogger l : als.getLoggersForTests()) {
@@ -508,19 +414,6 @@ public class TestQuorumJournalManager {
}
}
- private void assertExistsInQuorum(MiniJournalCluster cluster,
- String fname) {
- int count = 0;
- for (int i = 0; i < 3; i++) {
- File dir = cluster.getCurrentDir(i, JID);
- if (new File(dir, fname).exists()) {
- count++;
- }
- }
- assertTrue("File " + fname + " should exist in a quorum of dirs",
- count >= cluster.getQuorumSize());
- }
-
private void checkRecovery(MiniJournalCluster cluster,
long segmentTxId, long expectedEndTxId)
throws IOException {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Tue Aug 7 20:24:01 2012
@@ -32,8 +32,6 @@ import org.apache.hadoop.hdfs.qjournal.c
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@@ -47,6 +45,8 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+
/**
* True unit tests for QuorumJournalManager
*/
@@ -70,7 +70,7 @@ public class TestQuorumJournalManagerUni
qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
@Override
- protected List<AsyncLogger> createLoggers() {
+ protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
return spyLoggers;
}
};
@@ -192,10 +192,4 @@ public class TestQuorumJournalManagerUni
EditLogOutputStream stm = qjm.startLogSegment(1);
return stm;
}
-
- static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
- FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
- op.setTransactionId(txid);
- stm.write(op);
- }
}