You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/11 08:31:43 UTC

svn commit: r1383251 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/test/java/org/apache/hadoop/hdfs/qjournal/client/

Author: todd
Date: Tue Sep 11 06:31:42 2012
New Revision: 1383251

URL: http://svn.apache.org/viewvc?rev=1383251&view=rev
Log:
HDFS-3906. QJM: quorum timeout on failover with large log segment. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Tue Sep 11 06:31:42 2012
@@ -66,3 +66,5 @@ HDFS-3899. QJM: Add client-side metrics 
 HDFS-3914. QJM: acceptRecovery should abort current segment (todd)
 
 HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd)
+
+HDFS-3906. QJM: quorum timeout on failover with large log segment (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Sep 11 06:31:42 2012
@@ -413,10 +413,14 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
   public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
   public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+  public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
   public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
-  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 20000;
-  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 60000;
-  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
 }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Tue Sep 11 06:31:42 2012
@@ -52,8 +52,6 @@ import com.google.common.util.concurrent
 class AsyncLoggerSet {
   static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);
 
-  private static final int NEWEPOCH_TIMEOUT_MS = 10000;
-  
   private final List<AsyncLogger> loggers;
   
   private static final long INVALID_EPOCH = -1;
@@ -63,44 +61,15 @@ class AsyncLoggerSet {
     this.loggers = ImmutableList.copyOf(loggers);
   }
   
-  /**
-   * Fence any previous writers, and obtain a unique epoch number
-   * for write-access to the journal nodes.
-   *
-   * @param nsInfo the expected namespace information. If the remote
-   * node does not match with this namespace, the request will be rejected.
-   * @return the new, unique epoch number
-   * @throws IOException
-   */
-  Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch(
-      NamespaceInfo nsInfo) throws IOException {
-    Preconditions.checkState(myEpoch == -1,
-        "epoch already created: epoch=" + myEpoch);
-    
-    Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
-      waitForWriteQuorum(getJournalState(), NEWEPOCH_TIMEOUT_MS);
-    
-    long maxPromised = Long.MIN_VALUE;
-    for (GetJournalStateResponseProto resp : lastPromises.values()) {
-      maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
-    }
-    assert maxPromised >= 0;
-    
-    long myEpoch = maxPromised + 1;
-    Map<AsyncLogger, NewEpochResponseProto> resps =
-        waitForWriteQuorum(newEpoch(nsInfo, myEpoch), NEWEPOCH_TIMEOUT_MS);
-    this.myEpoch = myEpoch;
-    setEpoch(myEpoch);
-    return resps;
-  }
-  
-  private void setEpoch(long e) {
+  void setEpoch(long e) {
+    Preconditions.checkState(!isEpochEstablished(),
+        "Epoch already established: epoch=%s", myEpoch);
+    myEpoch = e;
     for (AsyncLogger l : loggers) {
       l.setEpoch(e);
     }
   }
 
-
   /**
    * Set the highest successfully committed txid seen by the writer.
    * This should be called after a successful write to a quorum, and is used
@@ -113,6 +82,13 @@ class AsyncLoggerSet {
   }
 
   /**
+   * @return true if an epoch has been established.
+   */
+  boolean isEpochEstablished() {
+    return myEpoch != INVALID_EPOCH;
+  }
+  
+  /**
    * @return the epoch number for this writer. This may only be called after
    * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
    */
@@ -143,19 +119,20 @@ class AsyncLoggerSet {
    * can't be achieved, throws a QuorumException.
    * @param q the quorum call
    * @param timeoutMs the number of millis to wait
+   * @param operationName textual description of the operation, for logging
    * @return a map of successful results
    * @throws QuorumException if a quorum doesn't respond with success
    * @throws IOException if the thread is interrupted or times out
    */
   <V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
-      int timeoutMs) throws IOException {
+      int timeoutMs, String operationName) throws IOException {
     int majority = getMajoritySize();
     try {
       q.waitFor(
           loggers.size(), // either all respond 
           majority, // or we get a majority successes
           majority, // or we get a majority failures,
-          timeoutMs);
+          timeoutMs, operationName);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
@@ -227,7 +204,7 @@ class AsyncLoggerSet {
   // in a QuorumCall.
   ///////////////////////////////////////////////////////////////////////////
   
-  private QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
+  public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
     Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
         Maps.newHashMap();
     for (AsyncLogger logger : loggers) {
@@ -266,7 +243,7 @@ class AsyncLoggerSet {
     return QuorumCall.create(calls);
   }
 
-  private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
+  public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
       NamespaceInfo nsInfo,
       long epoch) {
     Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Tue Sep 11 06:31:42 2012
@@ -39,6 +39,23 @@ import com.google.protobuf.TextFormat;
 class QuorumCall<KEY, RESULT> {
   private final Map<KEY, RESULT> successes = Maps.newHashMap();
   private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
+
+  /**
+   * Interval, in milliseconds, at which a log message will be made
+   * while waiting for a quorum call.
+   */
+  private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000;
+  
+  /**
+   * Start logging messages at INFO level periodically after waiting for
+   * this fraction of the configured timeout for any call.
+   */
+  private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f;
+  /**
+   * Start logging messages at WARN level after waiting for this
+   * fraction of the configured timeout for any call.
+   */
+  private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
   
   static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
       Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
@@ -85,17 +102,35 @@ class QuorumCall<KEY, RESULT> {
    */
   public synchronized void waitFor(
       int minResponses, int minSuccesses, int maxExceptions,
-      int millis)
+      int millis, String operationName)
       throws InterruptedException, TimeoutException {
-    long et = Time.monotonicNow() + millis;
+    long st = Time.monotonicNow();
+    long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
+    long et = st + millis;
     while (true) {
       if (minResponses > 0 && countResponses() >= minResponses) return;
       if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
       if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
-      long rem = et - Time.monotonicNow();
+      long now = Time.monotonicNow();
+      
+      if (now > nextLogTime) {
+        long waited = now - st;
+        String msg = String.format(
+            "Waited %s ms (timeout=%s ms) for a response for %s",
+            waited, millis, operationName);
+        if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
+          QuorumJournalManager.LOG.warn(msg);
+        } else {
+          QuorumJournalManager.LOG.info(msg);
+        }
+        nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
+      }
+      long rem = et - now;
       if (rem <= 0) {
         throw new TimeoutException();
       }
+      rem = Math.min(rem, nextLogTime - now);
+      rem = Math.max(rem, 1);
       wait(rem);
     }
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Tue Sep 11 06:31:42 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -52,6 +53,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.protobuf.TextFormat;
 
 /**
  * A JournalManager that writes to a set of remote JournalNodes,
@@ -67,6 +69,8 @@ public class QuorumJournalManager implem
   private final int acceptRecoveryTimeoutMs;
   private final int finalizeSegmentTimeoutMs;
   private final int selectInputStreamsTimeoutMs;
+  private final int getJournalStateTimeoutMs;
+  private final int newEpochTimeoutMs;
 
   // Since these don't occur during normal operation, we can
   // use rather lengthy timeouts, and don't need to make them
@@ -112,6 +116,13 @@ public class QuorumJournalManager implem
     this.selectInputStreamsTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
         DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
+    this.getJournalStateTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
+    this.newEpochTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
+    
         
   }
   
@@ -138,11 +149,43 @@ public class QuorumJournalManager implem
         "bad journal id: " + jid);
   }
 
+  
+  /**
+   * Fence any previous writers, and obtain a unique epoch number
+   * for write-access to the journal nodes.
+   *
+   * @return the new, unique epoch number
+   */
+  Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
+      throws IOException {
+    Preconditions.checkState(!loggers.isEpochEstablished(),
+        "epoch already created");
+    
+    Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
+      loggers.waitForWriteQuorum(loggers.getJournalState(),
+          getJournalStateTimeoutMs, "getJournalState()");
+    
+    long maxPromised = Long.MIN_VALUE;
+    for (GetJournalStateResponseProto resp : lastPromises.values()) {
+      maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
+    }
+    assert maxPromised >= 0;
+    
+    long myEpoch = maxPromised + 1;
+    Map<AsyncLogger, NewEpochResponseProto> resps =
+        loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
+            newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
+        
+    loggers.setEpoch(myEpoch);
+    return resps;
+  }
+  
   @Override
   public void format(NamespaceInfo nsInfo) throws IOException {
     QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
     try {
-      call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
+      call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
+          "format");
     } catch (InterruptedException e) {
       throw new IOException("Interrupted waiting for format() response");
     } catch (TimeoutException e) {
@@ -160,7 +203,7 @@ public class QuorumJournalManager implem
         loggers.isFormatted();
 
     try {
-      call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
+      call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
     } catch (InterruptedException e) {
       throw new IOException("Interrupted while determining if JNs have data");
     } catch (TimeoutException e) {
@@ -206,7 +249,8 @@ public class QuorumJournalManager implem
     QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
         loggers.prepareRecovery(segmentTxId);
     Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
-        loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs);
+        loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
+            "prepareRecovery(" + segmentTxId + ")");
     LOG.info("Recovery prepare phase complete. Responses:\n" +
         QuorumCall.mapToString(prepareResponses));
 
@@ -283,7 +327,8 @@ public class QuorumJournalManager implem
     URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
     
     QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
-    loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs);
+    loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
+        "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
     
     // TODO:
     // we should only try to finalize loggers who successfully synced above
@@ -292,7 +337,10 @@ public class QuorumJournalManager implem
     
     QuorumCall<AsyncLogger, Void> finalize =
         loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); 
-    loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
+    loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
+        String.format("finalizeLogSegment(%s-%s)",
+            logToSync.getStartTxId(),
+            logToSync.getEndTxId()));
   }
   
   static List<AsyncLogger> createLoggers(Configuration conf,
@@ -336,7 +384,8 @@ public class QuorumJournalManager implem
     Preconditions.checkState(isActiveWriter,
         "must recover segments before starting a new one");
     QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
-    loggers.waitForWriteQuorum(q, startSegmentTimeoutMs);
+    loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
+        "startLogSegment(" + txId + ")");
     return new QuorumOutputStream(loggers, txId);
   }
 
@@ -345,7 +394,8 @@ public class QuorumJournalManager implem
       throws IOException {
     QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
         firstTxId, lastTxId);
-    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs);
+    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
+        String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
   }
 
   @Override
@@ -366,8 +416,7 @@ public class QuorumJournalManager implem
   public void recoverUnfinalizedSegments() throws IOException {
     Preconditions.checkState(!isActiveWriter, "already active writer");
     
-    Map<AsyncLogger, NewEpochResponseProto> resps =
-        loggers.createNewUniqueEpoch(nsInfo);
+    Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
     LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
         QuorumCall.mapToString(resps));
     
@@ -399,7 +448,8 @@ public class QuorumJournalManager implem
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
         loggers.getEditLogManifest(fromTxnId);
     Map<AsyncLogger, RemoteEditLogManifest> resps =
-        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
+        loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+            "selectInputStreams");
     
     LOG.debug("selectInputStream manifests:\n" +
         Joiner.on("\n").withKeyValueSeparator(": ").join(resps));

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Tue Sep 11 06:31:42 2012
@@ -101,7 +101,7 @@ class QuorumOutputStream extends EditLog
       QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
           segmentTxId, firstTxToFlush,
           numReadyTxns, data);
-      loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
+      loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout
       
       // Since we successfully wrote this batch, let the loggers know. Any future
       // RPCs will thus let the loggers know of the most recent transaction, even

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Tue Sep 11 06:31:42 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.List;
 import java.util.Random;
@@ -56,35 +57,43 @@ public class TestEpochsAreUnique {
     URI uri = cluster.getQuorumJournalURI(JID);
     QuorumJournalManager qjm = new QuorumJournalManager(
         conf, uri, FAKE_NSINFO);
-    qjm.format(FAKE_NSINFO);
+    try {
+      qjm.format(FAKE_NSINFO);
+    } finally {
+      qjm.close();
+    }
     
     try {
       // With no failures or contention, epochs should increase one-by-one
       for (int i = 0; i < 5; i++) {
-        AsyncLoggerSet als = new AsyncLoggerSet(
-            QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
-                IPCLoggerChannel.FACTORY));
-        als.createNewUniqueEpoch(FAKE_NSINFO);
-        assertEquals(i + 1, als.getEpoch());
+        qjm = new QuorumJournalManager(
+            conf, uri, FAKE_NSINFO);
+        try {
+          qjm.createNewUniqueEpoch();
+          assertEquals(i + 1, qjm.getLoggerSetForTests().getEpoch());
+        } finally {
+          qjm.close();
+        }
       }
       
       long prevEpoch = 5;
       // With some failures injected, it should still always increase, perhaps
       // skipping some
       for (int i = 0; i < 20; i++) {
-        AsyncLoggerSet als = new AsyncLoggerSet(
-            makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
-                IPCLoggerChannel.FACTORY)));
         long newEpoch = -1;
         while (true) {
+          qjm = new QuorumJournalManager(
+              conf, uri, FAKE_NSINFO, new FaultyLoggerFactory());
           try {
-            als.createNewUniqueEpoch(FAKE_NSINFO);
-            newEpoch = als.getEpoch();
+            qjm.createNewUniqueEpoch();
+            newEpoch = qjm.getLoggerSetForTests().getEpoch();
             break;
           } catch (IOException ioe) {
             // It's OK to fail to create an epoch, since we randomly inject
             // faults. It's possible we'll inject faults in too many of the
             // underlying nodes, and a failure is expected in that case
+          } finally {
+            qjm.close();
           }
         }
         LOG.info("Created epoch " + newEpoch);
@@ -97,20 +106,23 @@ public class TestEpochsAreUnique {
     }
   }
 
-
-  private List<AsyncLogger> makeFaulty(List<AsyncLogger> loggers) {
-    List<AsyncLogger> ret = Lists.newArrayList();
-    for (AsyncLogger l : loggers) {
-      AsyncLogger spy = Mockito.spy(l);
+  private class FaultyLoggerFactory implements AsyncLogger.Factory {
+    @Override
+    public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
+          conf, nsInfo, journalId, addr);
+      AsyncLogger spy = Mockito.spy(ch);
       Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
           .when(spy).getJournalState();
       Mockito.doAnswer(new SometimesFaulty<Void>(0.40f))
           .when(spy).newEpoch(Mockito.anyLong());
-      ret.add(spy);
+
+      return spy;
     }
-    return ret;
+    
   }
-  
+
   private class SometimesFaulty<T> implements Answer<ListenableFuture<T>> {
     private float faultProbability;
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Tue Sep 11 06:31:42 2012
@@ -42,8 +42,8 @@ public class TestQuorumCall {
     assertEquals(0, q.countResponses());
     
     futures.get("f1").set("first future");
-    q.waitFor(1, 0, 0, 100000); // wait for 1 response
-    q.waitFor(0, 1, 0, 100000); // wait for 1 success
+    q.waitFor(1, 0, 0, 100000, "test"); // wait for 1 response
+    q.waitFor(0, 1, 0, 100000, "test"); // wait for 1 success
     assertEquals(1, q.countResponses());
     
     
@@ -51,8 +51,8 @@ public class TestQuorumCall {
     assertEquals(2, q.countResponses());
     
     futures.get("f3").set("second future");
-    q.waitFor(3, 0, 100, 100000); // wait for 3 responses
-    q.waitFor(0, 2, 100, 100000); // 2 successes
+    q.waitFor(3, 0, 100, 100000, "test"); // wait for 3 responses
+    q.waitFor(0, 2, 100, 100000, "test"); // 2 successes
 
     assertEquals(3, q.countResponses());
     assertEquals("f1=first future,f3=second future",
@@ -60,7 +60,7 @@ public class TestQuorumCall {
             new TreeMap<String, String>(q.getResults())));
     
     try {
-      q.waitFor(0, 4, 100, 10);
+      q.waitFor(0, 4, 100, 10, "test");
       fail("Didn't time out waiting for more responses than came back");
     } catch (TimeoutException te) {
       // expected