You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/15 21:18:57 UTC

svn commit: r1373587 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: CHANGES.HDFS-3077.txt src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java

Author: todd
Date: Wed Aug 15 19:18:57 2012
New Revision: 1373587

URL: http://svn.apache.org/viewvc?rev=1373587&view=rev
Log:
HDFS-3800. improvements to QJM fault testing. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 19:18:57 2012
@@ -24,3 +24,5 @@ HDFS-3798. Avoid throwing NPE when final
 HDFS-3799. QJM: handle empty log segments during recovery (todd)
 
 HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd)
+
+HDFS-3800. improvements to QJM fault testing (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Wed Aug 15 19:18:57 2012
@@ -61,7 +61,7 @@ public abstract class QJMTestUtil {
   }
   
   public static void writeSegment(MiniJournalCluster cluster,
-      QuorumJournalManager qjm, int startTxId, int numTxns,
+      QuorumJournalManager qjm, long startTxId, int numTxns,
       boolean finalize) throws IOException {
     EditLogOutputStream stm = qjm.startLogSegment(startTxId);
     // Should create in-progress
@@ -81,7 +81,7 @@ public abstract class QJMTestUtil {
     stm.write(op);
   }
 
-  public static void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
+  public static void writeTxns(EditLogOutputStream stm, long startTxId, int numTxns)
       throws IOException {
     for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
       writeOp(stm, txid);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1373587&r1=1373586&r2=1373587&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Aug 15 19:18:57 2012
@@ -17,26 +17,39 @@
  */
 package org.apache.hadoop.hdfs.qjournal.client;
 
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.SortedSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.junit.BeforeClass;
+import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -46,23 +59,27 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.JID;
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.FAKE_NSINFO;
-import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment;
+import com.google.common.util.concurrent.MoreExecutors;
 
 
 public class TestQJMWithFaults {
   private static final Log LOG = LogFactory.getLog(
       TestQJMWithFaults.class);
 
+  private static final String RAND_SEED_PROPERTY =
+      "TestQJMWithFaults.random-seed";
+
+  private static final int NUM_WRITER_ITERS = 500;
+  private static final int SEGMENTS_PER_WRITER = 2;
+
   private static Configuration conf = new Configuration();
   static {
     // Don't retry connections - it just slows down the tests.
-    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);    
+    conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    
+    // Make tests run faster by avoiding fsync()
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
   }
-  private static long MAX_IPC_NUMBER;
-
 
   /**
    * Run through the creation of a log without any faults injected,
@@ -70,10 +87,10 @@ public class TestQJMWithFaults {
    * bounds for the other test cases, so they can exhaustively explore
    * the space of potential failures.
    */
-  @BeforeClass
-  public static void determineMaxIpcNumber() throws Exception {
+  private static long determineMaxIpcNumber() throws Exception {
     Configuration conf = new Configuration();
     MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+    long ret;
     try {
       QuorumJournalManager qjm = createInjectableQJM(cluster);
       qjm.format(FAKE_NSINFO);
@@ -90,11 +107,12 @@ public class TestQJMWithFaults {
       // were no failures.
       assertEquals(1, ipcCounts.size());
       
-      MAX_IPC_NUMBER = ipcCounts.first();
-      LOG.info("Max IPC count = " + MAX_IPC_NUMBER);
+      ret = ipcCounts.first();
+      LOG.info("Max IPC count = " + ret);
     } finally {
       cluster.shutdown();
     }
+    return ret;
   }
   
   /**
@@ -106,6 +124,8 @@ public class TestQJMWithFaults {
    */
   @Test
   public void testRecoverAfterDoubleFailures() throws Exception {
+    final long MAX_IPC_NUMBER = determineMaxIpcNumber();
+    
     for (int failA = 1; failA <= MAX_IPC_NUMBER; failA++) {
       for (int failB = 1; failB <= MAX_IPC_NUMBER; failB++) {
         String injectionStr = "(" + failA + ", " + failB + ")";
@@ -132,25 +152,133 @@ public class TestQJMWithFaults {
           }
 
           // Now should be able to recover
+          qjm = createInjectableQJM(cluster);
+          long lastRecoveredTxn = QJMTestUtil.recoverAndReturnLastTxn(qjm);
+          assertTrue(lastRecoveredTxn >= lastAckedTxn);
+          
+          writeSegment(cluster, qjm, lastRecoveredTxn + 1, 3, true);
+        } catch (Throwable t) {
+          // Test failure! Rethrow with the test setup info so it can be
+          // easily triaged.
+          throw new RuntimeException("Test failed with injection: " + injectionStr,
+                t); 
+        } finally {
+          cluster.shutdown();
+          cluster = null;
+        }
+      }
+    }
+  }
+  
+  /**
+   * Test case in which three JournalNodes randomly flip flop between
+   * up and down states every time they get an RPC.
+   * 
+   * The writer keeps track of the latest ACKed edit, and on every
+   * recovery operation, ensures that it recovers at least to that
+   * point or higher. Since at any given point, a majority of JNs
+   * may be injecting faults, any writer operation is allowed to fail,
+   * so long as the exception message indicates it failed due to injected
+   * faults.
+   * 
+   * Given a random seed, the test should be entirely deterministic.
+   */
+  @Test
+  public void testRandomized() throws Exception {
+    long seed;
+    Long userSpecifiedSeed = Long.getLong(RAND_SEED_PROPERTY);
+    if (userSpecifiedSeed != null) {
+      LOG.info("Using seed specified in system property");
+      seed = userSpecifiedSeed;
+      
+      // If the user specifies a seed, then we should gather all the
+      // IPC trace information so that debugging is easier. This makes
+      // the test run about 25% slower otherwise.
+      ((Log4JLogger)ProtobufRpcEngine.LOG).getLogger().setLevel(Level.ALL);
+    } else {
+      seed = new Random().nextLong();
+    }
+    LOG.info("Random seed: " + seed);
+    
+    Random r = new Random(seed);
+    
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf)
+      .build();
+    
+    // Format the cluster using a non-faulty QJM.
+    QuorumJournalManager qjmForInitialFormat =
+        createInjectableQJM(cluster);
+    qjmForInitialFormat.format(FAKE_NSINFO);
+    qjmForInitialFormat.close();
+    
+    try {
+      long txid = 0;
+      long lastAcked = 0;
+      
+      for (int i = 0; i < NUM_WRITER_ITERS; i++) {
+        LOG.info("Starting writer " + i + "\n-------------------");
+        
+        QuorumJournalManager qjm = createRandomFaultyQJM(cluster, r);
+        try {
+          if (txid > 100) {
+            qjm.purgeLogsOlderThan(txid - 100);
+          }
+  
+          long recovered;
           try {
-            qjm = createInjectableQJM(cluster);
-            qjm.recoverUnfinalizedSegments();
-            writeSegment(cluster, qjm, lastAckedTxn + 1, 3, true);
-            // TODO: verify log segments
+            recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm);
           } catch (Throwable t) {
-            // Test failure! Rethrow with the test setup info so it can be
-            // easily triaged.
-            throw new RuntimeException("Test failed with injection: " + injectionStr,
-                t);
+            LOG.info("Failed recovery", t);
+            GenericTestUtils.assertExceptionContains("faking being down", t);
+            continue;
+          }
+          assertTrue("Recovered only up to txnid " + recovered +
+              " but had gotten an ack for " + lastAcked,
+              recovered >= lastAcked);
+          
+          txid = recovered + 1;
+          
+          Holder<Throwable> thrown = new Holder<Throwable>(null);
+          for (int j = 0; j < SEGMENTS_PER_WRITER; j++) {
+            lastAcked = writeSegmentUntilCrash(cluster, qjm, txid, 4, thrown);
+            if (thrown.held != null) {
+              LOG.info("Failed write", thrown.held);
+              GenericTestUtils.assertExceptionContains("faking being down",
+                  thrown.held);
+              break;
+            }
+            txid += 4;
           }
         } finally {
-          cluster.shutdown();
-          cluster = null;
+          qjm.close();
         }
       }
+    } finally {
+      cluster.shutdown();
     }
   }
 
+  private long writeSegmentUntilCrash(MiniJournalCluster cluster,
+      QuorumJournalManager qjm, long txid, int numTxns, Holder<Throwable> thrown) {
+    
+    long firstTxId = txid;
+    long lastAcked = txid - 1;
+    try {
+      EditLogOutputStream stm = qjm.startLogSegment(txid);
+      
+      for (int i = 0; i < numTxns; i++) {
+        QJMTestUtil.writeTxns(stm, txid++, 1);
+        lastAcked++;
+      }
+      
+      stm.close();
+      qjm.finalizeLogSegment(firstTxId, lastAcked);
+    } catch (Throwable t) {
+      thrown.held = t;
+    }
+    return lastAcked;
+  }
+
   /**
    * Run a simple workload of becoming the active writer and writing
    * two log segments: 1-3 and 4-6.
@@ -179,6 +307,43 @@ public class TestQJMWithFaults {
   private void failIpcNumber(AsyncLogger logger, int idx) {
     ((InvocationCountingChannel)logger).failIpcNumber(idx);
   }
+  
+  private static class RandomFaultyChannel extends IPCLoggerChannel {
+    private final Random random;
+    private float injectionProbability = 0.1f;
+    private boolean isUp = true;
+    
+    public RandomFaultyChannel(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr, long seed) {
+      super(conf, nsInfo, journalId, addr);
+      this.random = new Random(seed);
+    }
+
+    @Override
+    protected QJournalProtocol createProxy() throws IOException {
+      QJournalProtocol realProxy = super.createProxy();
+      return mockProxy(
+          new WrapEveryCall<Object>(realProxy) {
+            @Override
+            void beforeCall(InvocationOnMock invocation) throws Exception {
+              if (random.nextFloat() < injectionProbability) {
+                isUp = !isUp;
+                LOG.info("transitioned " + addr + " to " +
+                    (isUp ? "up" : "down"));
+              }
+    
+              if (!isUp) {
+                throw new IOException("Injected - faking being down");
+              }
+            }
+          });
+    }
+
+    @Override
+    protected ExecutorService createExecutor() {
+      return MoreExecutors.sameThreadExecutor();
+    }
+  }
 
   private static class InvocationCountingChannel extends IPCLoggerChannel {
     private int rpcCount = 0;
@@ -211,10 +376,9 @@ public class TestQJMWithFaults {
     @Override
     protected QJournalProtocol createProxy() throws IOException {
       final QJournalProtocol realProxy = super.createProxy();
-      QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
-          new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
+      QJournalProtocol mock = mockProxy(
+          new WrapEveryCall<Object>(realProxy) {
+            void beforeCall(InvocationOnMock invocation) throws Exception {
               rpcCount++;
               String callStr = "[" + addr + "] " + 
                   invocation.getMethod().getName() + "(" +
@@ -228,14 +392,44 @@ public class TestQJMWithFaults {
               } else {
                 LOG.info("IPC call #" + rpcCount + ": " + callStr);
               }
-
-              return invocation.getMethod().invoke(realProxy,
-                  invocation.getArguments());
             }
           });
       return mock;
     }
   }
+
+
+  private static QJournalProtocol mockProxy(WrapEveryCall<Object> wrapper)
+      throws IOException {
+    QJournalProtocol mock = Mockito.mock(QJournalProtocol.class,
+        Mockito.withSettings()
+          .defaultAnswer(wrapper)
+          .extraInterfaces(Closeable.class));
+    Mockito.doNothing().when((Closeable)mock).close();
+    return mock;
+  }
+
+  private static abstract class WrapEveryCall<T> implements Answer<T> {
+    private final Object realObj;
+    WrapEveryCall(Object realObj) {
+      this.realObj = realObj;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public T answer(InvocationOnMock invocation) throws Throwable {
+      beforeCall(invocation);
+
+      try {
+        return (T) invocation.getMethod().invoke(realObj,
+          invocation.getArguments());
+      } catch (InvocationTargetException ite) {
+        throw ite.getCause();
+      }
+    }
+
+    abstract void beforeCall(InvocationOnMock invocation) throws Exception;
+  }
   
   private static QuorumJournalManager createInjectableQJM(MiniJournalCluster cluster)
       throws IOException, URISyntaxException {
@@ -249,4 +443,21 @@ public class TestQJMWithFaults {
     return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
         FAKE_NSINFO, spyFactory);
   }
+  
+  private static QuorumJournalManager createRandomFaultyQJM(
+      MiniJournalCluster cluster, final Random seedGenerator)
+          throws IOException, URISyntaxException {
+    
+    AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
+      @Override
+      public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+          String journalId, InetSocketAddress addr) {
+        return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
+            seedGenerator.nextLong());
+      }
+    };
+    return new QuorumJournalManager(conf, cluster.getQuorumJournalURI(JID),
+        FAKE_NSINFO, spyFactory);
+  }
+
 }