You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/07 22:24:01 UTC

svn commit: r1370497 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/test/java/org/apache/hadoop/hdfs/qjournal/ src/test/java/org/apache/hadoop/hdfs/qjournal/client/

Author: todd
Date: Tue Aug  7 20:24:01 2012
New Revision: 1370497

URL: http://svn.apache.org/viewvc?rev=1370497&view=rev
Log:
HDFS-3741. Exhaustive failure injection test for skipped RPCs. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Tue Aug  7 20:24:01 2012
@@ -10,3 +10,5 @@ HDFS-3692. Support purgeEditLogs() call 
 HDFS-3693. JNStorage should read its storage info even before a writer becomes active (todd)
 
 HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
+
+HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Tue Aug  7 20:24:01 2012
@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
+import java.net.InetSocketAddress;
 import java.net.URL;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
 import com.google.common.util.concurrent.ListenableFuture;
@@ -43,6 +46,11 @@ import com.google.common.util.concurrent
  */
 interface AsyncLogger {
   
+  interface Factory {
+    AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr);
+  }
+
   /**
    * Send a batch of edits to the logger.
    * @param firstTxnId the first txid of the edits.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Tue Aug  7 20:24:01 2012
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Tue Aug  7 20:24:01 2012
@@ -65,7 +65,7 @@ import com.google.common.util.concurrent
 public class IPCLoggerChannel implements AsyncLogger {
 
   private final Configuration conf;
-  private final InetSocketAddress addr;
+  protected final InetSocketAddress addr;
   private QJournalProtocol proxy;
 
   private final ListeningExecutorService executor;
@@ -87,7 +87,16 @@ public class IPCLoggerChannel implements
    * overflows and it starts to treat the logger as having errored.
    */
   private final int queueSizeLimitBytes;
+
   
+  static final Factory FACTORY = new AsyncLogger.Factory() {
+    @Override
+    public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
+    }
+  };
+
   public IPCLoggerChannel(Configuration conf,
       NamespaceInfo nsInfo,
       String journalId,
@@ -131,15 +140,18 @@ public class IPCLoggerChannel implements
   
   protected QJournalProtocol getProxy() throws IOException {
     if (proxy != null) return proxy;
-
+    proxy = createProxy();
+    return proxy;
+  }
+  
+  protected QJournalProtocol createProxy() throws IOException {
     RPC.setProtocolEngine(conf,
         QJournalProtocolPB.class, ProtobufRpcEngine.class);
     QJournalProtocolPB pbproxy = RPC.getProxy(
         QJournalProtocolPB.class,
         RPC.getProtocolVersion(QJournalProtocolPB.class),
         addr, conf);
-    proxy = new QJournalProtocolTranslatorPB(pbproxy);
-    return proxy;
+    return new QJournalProtocolTranslatorPB(pbproxy);
   }
   
   @Override

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Tue Aug  7 20:24:01 2012
@@ -34,8 +34,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -78,14 +76,20 @@ public class QuorumJournalManager implem
   
   private final AsyncLoggerSet loggers;
   
-  public QuorumJournalManager(Configuration conf,
+  QuorumJournalManager(Configuration conf,
       URI uri, NamespaceInfo nsInfo) throws IOException {
+    this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
+  }
+  
+  QuorumJournalManager(Configuration conf,
+      URI uri, NamespaceInfo nsInfo,
+      AsyncLogger.Factory loggerFactory) throws IOException {
     Preconditions.checkArgument(conf != null, "must be configured");
 
     this.conf = conf;
     this.uri = uri;
     this.nsInfo = nsInfo;
-    this.loggers = new AsyncLoggerSet(createLoggers());
+    this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
 
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
@@ -106,6 +110,11 @@ public class QuorumJournalManager implem
         
   }
   
+  protected List<AsyncLogger> createLoggers(
+      AsyncLogger.Factory factory) throws IOException {
+    return createLoggers(conf, uri, nsInfo, factory);
+  }
+
   static String parseJournalId(URI uri) {
     String path = uri.getPath();
     Preconditions.checkArgument(path != null && !path.isEmpty(),
@@ -234,17 +243,14 @@ public class QuorumJournalManager implem
       }
   };
 
-  protected List<AsyncLogger> createLoggers() throws IOException {
-    return createLoggers(conf, uri, nsInfo);
-  }
-  
   static List<AsyncLogger> createLoggers(Configuration conf,
-      URI uri, NamespaceInfo nsInfo) throws IOException {
+      URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
+          throws IOException {
     List<AsyncLogger> ret = Lists.newArrayList();
     List<InetSocketAddress> addrs = getLoggerAddresses(uri);
     String jid = parseJournalId(uri);
     for (InetSocketAddress addr : addrs) {
-      ret.add(new IPCLoggerChannel(conf, nsInfo, jid, addr));
+      ret.add(factory.createLogger(conf, nsInfo, jid, addr));
     }
     return ret;
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Tue Aug  7 20:24:01 2012
@@ -17,13 +17,31 @@
  */
 package org.apache.hadoop.hdfs.qjournal;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.DataOutputBuffer;
 
 public abstract class QJMTestUtil {
+  public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  public static final String JID = "test-journal";
 
   public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
     DataOutputBuffer buf = new DataOutputBuffer();
@@ -38,4 +56,82 @@ public abstract class QJMTestUtil {
     return Arrays.copyOf(buf.getData(), buf.getLength());
   }
   
+  public static void writeSegment(MiniJournalCluster cluster,
+      QuorumJournalManager qjm, int startTxId, int numTxns,
+      boolean finalize) throws IOException {
+    EditLogOutputStream stm = qjm.startLogSegment(startTxId);
+    // Should create in-progress
+    assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(startTxId));
+    
+    writeTxns(stm, startTxId, numTxns);
+    if (finalize) {
+      stm.close();
+      qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
+    }
+  }
+
+  public static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
+    FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+    op.setTransactionId(txid);
+    stm.write(op);
+  }
+
+  public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
+      throws IOException {
+    for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
+      writeOp(stm, txid);
+    }
+    stm.setReadyToFlush();
+    stm.flush();
+  }
+  
+  /**
+   * Verify that the given list of streams contains exactly the range of
+   * transactions specified, inclusive.
+   */
+  public static void verifyEdits(List<EditLogInputStream> streams,
+      int firstTxnId, int lastTxnId) throws IOException {
+    
+    Iterator<EditLogInputStream> iter = streams.iterator();
+    assertTrue(iter.hasNext());
+    EditLogInputStream stream = iter.next();
+    
+    for (int expected = firstTxnId;
+        expected <= lastTxnId;
+        expected++) {
+      
+      FSEditLogOp op = stream.readOp();
+      while (op == null) {
+        assertTrue("Expected to find txid " + expected + ", " +
+            "but no more streams available to read from",
+            iter.hasNext());
+        stream = iter.next();
+        op = stream.readOp();
+      }
+      
+      assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
+      assertEquals(expected, op.getTransactionId());
+    }
+    
+    assertNull(stream.readOp());
+    assertFalse("Expected no more txns after " + lastTxnId +
+        " but more streams are available", iter.hasNext());
+  }
+  
+
+  public static void assertExistsInQuorum(MiniJournalCluster cluster,
+      String fname) {
+    int count = 0;
+    for (int i = 0; i < 3; i++) {
+      File dir = cluster.getCurrentDir(i, JID);
+      if (new File(dir, fname).exists()) {
+        count++;
+      }
+    }
+    assertTrue("File " + fname + " should exist in a quorum of dirs",
+        count >= cluster.getQuorumSize());
+  }
+  
+
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Tue Aug  7 20:24:01 2012
@@ -28,7 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder;
 import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
 import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
@@ -59,7 +58,8 @@ public class TestEpochsAreUnique {
       // With no failures or contention, epochs should increase one-by-one
       for (int i = 0; i < 5; i++) {
         AsyncLoggerSet als = new AsyncLoggerSet(
-            QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO));
+            QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
+                IPCLoggerChannel.FACTORY));
         als.createNewUniqueEpoch(FAKE_NSINFO);
         assertEquals(i + 1, als.getEpoch());
       }
@@ -69,7 +69,8 @@ public class TestEpochsAreUnique {
       // skipping some
       for (int i = 0; i < 20; i++) {
         AsyncLoggerSet als = new AsyncLoggerSet(
-            makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)));
+            makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
+                IPCLoggerChannel.FACTORY)));
         long newEpoch = -1;
         while (true) {
           try {

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1370497&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Tue Aug  7 20:24:01 2012
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+
+
+public class TestQJMWithFaults {
+  private static final Log LOG = LogFactory.getLog(
+      TestQJMWithFaults.class);
+
+  private static Configuration conf = new Configuration();
+  static {
+    // Don't retry connections - it just slows down the tests.
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);    
+  }
+  private static long MAX_IPC_NUMBER;
+
+
+  /**
+   * Run through the creation of a log without any faults injected,
+   * and count how many RPCs are made to each node. This sets the
+   * bounds for the other test cases, so they can exhaustively explore
+   * the space of potential failures.
+   */
+  @BeforeClass
+  public static void determineMaxIpcNumber() throws Exception {
+    Configuration conf = new Configuration();
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+    try {
+      QuorumJournalManager qjm = createInjectableQJM(cluster);
+      doWorkload(cluster, qjm);
+      
+      SortedSet<Integer> ipcCounts = Sets.newTreeSet();
+      for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
+        InvocationCountingChannel ch = (InvocationCountingChannel)l;
+        ch.waitForAllPendingCalls();
+        ipcCounts.add(ch.getRpcCount());
+      }
+  
+      // All of the loggers should have sent the same number of RPCs, since there
+      // were no failures.
+      assertEquals(1, ipcCounts.size());
+      
+      MAX_IPC_NUMBER = ipcCounts.first();
+      LOG.info("Max IPC count = " + MAX_IPC_NUMBER);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Sets up two of the nodes to each drop a single RPC, at all
+   * possible combinations of RPCs. This may result in the
+   * active writer failing to write. After this point, a new writer
+   * should be able to recover and continue writing without
+   * data loss.
+   */
+  @Test
+  public void testRecoverAfterDoubleFailures() throws Exception {
+    for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
+      for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
+        String injectionStr = "(" + failA + ", " + failB + ")";
+        
+        LOG.info("\n\n-------------------------------------------\n" +
+            "Beginning test, failing at " + injectionStr + "\n" +
+            "-------------------------------------------\n\n");
+        
+        MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+          .build();
+        try {
+          QuorumJournalManager qjm;
+          qjm = createInjectableQJM(cluster);
+          List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
+          failIpcNumber(loggers.get(0), failA);
+          failIpcNumber(loggers.get(1), failB);
+          int lastAckedTxn = doWorkload(cluster, qjm);
+
+          if (lastAckedTxn < 6) {
+            LOG.info("Failed after injecting failures at " + injectionStr + 
+                ". This is expected since we injected a failure in the " +
+                "majority.");
+          }
+
+          // Now should be able to recover
+          try {
+            qjm = createInjectableQJM(cluster);
+            qjm.recoverUnfinalizedSegments();
+            writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true);
+            // TODO: verify log segments
+          } catch (Throwable t) {
+            // Test failure! Rethrow with the test setup info so it can be
+            // easily triaged.
+            throw new RuntimeException("Test failed with injection: " + injectionStr,
+                t);
+          }
+        } finally {
+          cluster.shutdown();
+          cluster = null;
+        }
+      }
+    }
+  }
+
+  /**
+   * Run a simple workload of becoming the active writer and writing
+   * two log segments: 1-3 and 4-6.
+   */
+  private static int doWorkload(MiniJournalCluster cluster,
+      QuorumJournalManager qjm) throws IOException {
+    int lastAcked = 0;
+    try {
+      qjm.recoverUnfinalizedSegments();
+      writeSegment(cluster, qjm, 1, 3, true);
+      lastAcked = 3;
+      writeSegment(cluster, qjm, 4, 3, true);
+      lastAcked = 6;
+    } catch (QuorumException qe) {
+      LOG.info("Failed to write at txid " + lastAcked,
+          qe);
+    }
+    return lastAcked;
+  }
+
+  /**
+   * Inject a failure at the given IPC number, such that the JN never
+   * receives the RPC. The client side sees an IOException. Future
+   * IPCs after this number will be received as usual.
+   */
+  private void failIpcNumber(AsyncLogger logger, int idx) {
+    ((InvocationCountingChannel)logger).failIpcNumber(idx);
+  }
+
+  private static class InvocationCountingChannel extends IPCLoggerChannel {
+    private int rpcCount = 0;
+    private Map<Integer, Callable<Void>> injections = Maps.newHashMap();
+    
+    public InvocationCountingChannel(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      super(conf, nsInfo, journalId, addr);
+    }
+    
+    int getRpcCount() {
+      return rpcCount;
+    }
+    
+    void failIpcNumber(final int idx) {
+      Preconditions.checkArgument(idx > 0,
+          "id must be positive");
+      inject(idx, new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          throw new IOException("injected failed IPC at " + idx);
+        }
+      });
+    }
+    
+    private void inject(int beforeRpcNumber, Callable<Void> injectedCode) {
+      injections.put(beforeRpcNumber, injectedCode);
+    }
+
+    @Override
+    protected QJournalProtocol createProxy() throws IOException {
+      final QJournalProtocol realProxy = super.createProxy();
+      QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
+          new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+              rpcCount++;
+              String callStr = "[" + addr + "] " + 
+                  invocation.getMethod().getName() + "(" +
+                  Joiner.on(", ").join(invocation.getArguments()) + ")";
+ 
+              Callable<Void> inject = injections.get(rpcCount);
+              if (inject != null) {
+                LOG.info("Injecting code before IPC #" + rpcCount + ": " +
+                    callStr);
+                inject.call();
+              } else {
+                LOG.info("IPC call #" + rpcCount + ": " + callStr);
+              }
+
+              return invocation.getMethod().invoke(realProxy,
+                  invocation.getArguments());
+            }
+          });
+      return mock;
+    }
+  }
+  
+  private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
+      throws IOException, URISyntaxException {
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
+      }
+    };
+    return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+        FAKE_NSINFO, spyFactory);
+  }
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Tue Aug  7 20:24:01 2012
@@ -18,14 +18,18 @@
 package org.apache.hadoop.hdfs.qjournal.client;
 
 import static org.junit.Assert.*;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
 
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
-import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,14 +37,9 @@ import org.apache.commons.logging.impl.L
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
@@ -55,7 +54,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  * Functional tests for QuorumJournalManager.
@@ -65,9 +63,6 @@ public class TestQuorumJournalManager {
   private static final Log LOG = LogFactory.getLog(
       TestQuorumJournalManager.class);
   
-  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
-      12345, "mycluster", "my-bp", 0L, 0);
-  private static final String JID = "testQuorumJournalManager";
   private MiniJournalCluster cluster;
   private Configuration conf;
   private QuorumJournalManager qjm;
@@ -95,18 +90,20 @@ public class TestQuorumJournalManager {
   
   @After
   public void shutdown() throws IOException {
-    cluster.shutdown();
+    if (cluster != null) {
+      cluster.shutdown();
+    }
   }
   
   @Test
   public void testSingleWriter() throws Exception {
-    writeSegment(qjm, 1, 3, true);
+    writeSegment(cluster, qjm, 1, 3, true);
     
     // Should be finalized
     checkRecovery(cluster, 1, 3);
     
     // Start a new segment
-    writeSegment(qjm, 4, 1, true);
+    writeSegment(cluster, qjm, 4, 1, true);
 
     // Should be finalized
     checkRecovery(cluster, 4, 4);
@@ -119,7 +116,7 @@ public class TestQuorumJournalManager {
     List<EditLogInputStream> streams = Lists.newArrayList();
     readerQjm.selectInputStreams(streams, 0, false);
     assertEquals(0, streams.size());
-    writeSegment(qjm, 1, 3, true);
+    writeSegment(cluster, qjm, 1, 3, true);
 
     readerQjm.selectInputStreams(streams, 0, false);
     try {
@@ -138,7 +135,7 @@ public class TestQuorumJournalManager {
     
     // Ensure correct results when there is a stream in-progress, but we don't
     // ask for in-progress.
-    writeSegment(qjm, 4, 3, false);
+    writeSegment(cluster, qjm, 4, 3, false);
     readerQjm.selectInputStreams(streams, 0, false);
     try {
       assertEquals(1, streams.size());
@@ -178,13 +175,13 @@ public class TestQuorumJournalManager {
    */
   @Test
   public void testOneJNMissingSegments() throws Exception {
-    writeSegment(qjm, 1, 3, true);
+    writeSegment(cluster, qjm, 1, 3, true);
     waitForAllPendingCalls(qjm.getLoggerSetForTests());
     cluster.getJournalNode(0).stopAndJoin(0);
-    writeSegment(qjm, 4, 3, true);
+    writeSegment(cluster, qjm, 4, 3, true);
     waitForAllPendingCalls(qjm.getLoggerSetForTests());
     cluster.restartJournalNode(0);
-    writeSegment(qjm, 7, 3, true);
+    writeSegment(cluster, qjm, 7, 3, true);
     waitForAllPendingCalls(qjm.getLoggerSetForTests());
     cluster.getJournalNode(1).stopAndJoin(0);
     
@@ -199,37 +196,6 @@ public class TestQuorumJournalManager {
     }
   }
   
-  /**
-   * TODO: this test needs to be fleshed out to be an exhaustive failure test
-   * @throws Exception
-   */
-  @Test
-  public void testOrchestratedFailures() throws Exception {
-    writeSegment(qjm, 1, 3, true);
-    writeSegment(qjm, 4, 3, true);
-    
-    SortedSet<Long> serials = Sets.newTreeSet();
-    for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
-      IPCLoggerChannel ch = (IPCLoggerChannel)l;
-      ch.waitForAllPendingCalls();
-      serials.add(ch.getNextIpcSerial());
-    }
-
-    // All of the loggers should have sent the same number of RPCs, since there
-    // were no failures.
-    assertEquals(1, serials.size());
-    
-    long maxSerial = serials.first();
-    LOG.info("Max IPC serial = " + maxSerial);
-    
-    cluster.shutdown();
-    
-    cluster = new MiniJournalCluster.Builder(conf)
-      .build();
-    qjm = createSpyingQJM();
-    spies = qjm.getLoggerSetForTests().getLoggersForTests();
-
-  }
   
   /**
    * Test case where a new writer picks up from an old one with no failures
@@ -238,8 +204,8 @@ public class TestQuorumJournalManager {
    */
   @Test
   public void testChangeWritersLogsInSync() throws Exception {
-    writeSegment(qjm, 1, 3, false);
-    assertExistsInQuorum(cluster,
+    writeSegment(cluster, qjm, 1, 3, false);
+    QJMTestUtil.assertExistsInQuorum(cluster,
         NNStorage.getInProgressEditsFileName(1));
 
     // Make a new QJM
@@ -301,7 +267,7 @@ public class TestQuorumJournalManager {
           qe);
     }
     
-    assertExistsInQuorum(cluster,
+    QJMTestUtil.assertExistsInQuorum(cluster,
         NNStorage.getInProgressEditsFileName(1));
 
     // Shut down the specified JN, so it's not present during recovery.
@@ -320,7 +286,7 @@ public class TestQuorumJournalManager {
       .when(spy).sendEdits(
         Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
   }
-
+  
   /**
    * edit lengths [3,4,5]
    * first recovery:
@@ -389,7 +355,7 @@ public class TestQuorumJournalManager {
   @Test
   public void testPurgeLogs() throws Exception {
     for (int txid = 1; txid <= 5; txid++) {
-      writeSegment(qjm, txid, 1, true);
+      writeSegment(cluster, qjm, txid, 1, true);
     }
     File curDir = cluster.getCurrentDir(0, JID);
     GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
@@ -428,78 +394,18 @@ public class TestQuorumJournalManager {
   
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {
-    return new QuorumJournalManager(
-        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) {
-          @Override
-          protected List<AsyncLogger> createLoggers() throws IOException {
-            List<AsyncLogger> realLoggers = super.createLoggers();
-            List<AsyncLogger> spies = Lists.newArrayList();
-            for (AsyncLogger logger : realLoggers) {
-              spies.add(Mockito.spy(logger));
-            }
-            return spies;
-          }
-    };
-  }
-
-  private void writeSegment(QuorumJournalManager qjm,
-      int startTxId, int numTxns, boolean finalize) throws IOException {
-    EditLogOutputStream stm = qjm.startLogSegment(startTxId);
-    // Should create in-progress
-    assertExistsInQuorum(cluster,
-        NNStorage.getInProgressEditsFileName(startTxId));
-    
-    writeTxns(stm, startTxId, numTxns);
-    if (finalize) {
-      stm.close();
-      qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
-    }
-  }
-
-  private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
-      throws IOException {
-    for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
-      TestQuorumJournalManagerUnit.writeOp(stm, txid);
-    }
-    stm.setReadyToFlush();
-    stm.flush();
-  }
-  
-  /**
-   * Verify that the given list of streams contains exactly the range of
-   * transactions specified, inclusive.
-   */
-  private void verifyEdits(List<EditLogInputStream> streams,
-      int firstTxnId, int lastTxnId) throws IOException {
-    
-    Iterator<EditLogInputStream> iter = streams.iterator();
-    assertTrue(iter.hasNext());
-    EditLogInputStream stream = iter.next();
-    
-    for (int expected = firstTxnId;
-        expected <= lastTxnId;
-        expected++) {
-      
-      FSEditLogOp op = stream.readOp();
-      while (op == null) {
-        assertTrue("Expected to find txid " + expected + ", " +
-            "but no more streams available to read from",
-            iter.hasNext());
-        stream = iter.next();
-        op = stream.readOp();
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        return Mockito.spy(IPCLoggerChannel.FACTORY.createLogger(
+            conf, nsInfo, journalId, addr));
       }
-      
-      assertEquals(FSEditLogOpCodes.OP_MKDIR, op.opCode);
-      assertEquals(expected, op.getTransactionId());
-    }
-    
-    assertNull(stream.readOp());
-    assertFalse("Expected no more txns after " + lastTxnId +
-        " but more streams are available", iter.hasNext());
+    };
+    return new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO, spyFactory);
   }
 
-
-  
   private static void waitForAllPendingCalls(AsyncLoggerSet als)
       throws InterruptedException {
     for (AsyncLogger l : als.getLoggersForTests()) {
@@ -508,19 +414,6 @@ public class TestQuorumJournalManager {
     }
   }
 
-  private void assertExistsInQuorum(MiniJournalCluster cluster,
-      String fname) {
-    int count = 0;
-    for (int i = 0; i < 3; i++) {
-      File dir = cluster.getCurrentDir(i, JID);
-      if (new File(dir, fname).exists()) {
-        count++;
-      }
-    }
-    assertTrue("File " + fname + " should exist in a quorum of dirs",
-        count >= cluster.getQuorumSize());
-  }
-  
   private void checkRecovery(MiniJournalCluster cluster,
       long segmentTxId, long expectedEndTxId)
       throws IOException {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1370497&r1=1370496&r2=1370497&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Tue Aug  7 20:24:01 2012
@@ -32,8 +32,6 @@ import org.apache.hadoop.hdfs.qjournal.c
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
@@ -47,6 +45,8 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
+
 /**
  * True unit tests for QuorumJournalManager
  */
@@ -70,7 +70,7 @@ public class TestQuorumJournalManagerUni
 
     qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
       @Override
-      protected List<AsyncLogger> createLoggers() {
+      protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
         return spyLoggers;
       }
     };
@@ -192,10 +192,4 @@ public class TestQuorumJournalManagerUni
     EditLogOutputStream stm = qjm.startLogSegment(1);
     return stm;
   }
-
-  static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
-    FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
-    op.setTransactionId(txid);
-    stm.write(op);
-  }
 }