You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/11 08:31:43 UTC
svn commit: r1383251 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/qjournal/client/
src/test/java/org/apache/hadoop/hdfs/qjournal/client/
Author: todd
Date: Tue Sep 11 06:31:42 2012
New Revision: 1383251
URL: http://svn.apache.org/viewvc?rev=1383251&view=rev
Log:
HDFS-3906. QJM: quorum timeout on failover with large log segment. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Tue Sep 11 06:31:42 2012
@@ -66,3 +66,5 @@ HDFS-3899. QJM: Add client-side metrics
HDFS-3914. QJM: acceptRecovery should abort current segment (todd)
HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd)
+
+HDFS-3906. QJM: quorum timeout on failover with large log segment (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Sep 11 06:31:42 2012
@@ -413,10 +413,14 @@ public class DFSConfigKeys extends Commo
public static final String DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
public static final String DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+ public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+ public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
- public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 20000;
- public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 60000;
- public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 20000;
+ public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+ public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+ public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Tue Sep 11 06:31:42 2012
@@ -52,8 +52,6 @@ import com.google.common.util.concurrent
class AsyncLoggerSet {
static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);
- private static final int NEWEPOCH_TIMEOUT_MS = 10000;
-
private final List<AsyncLogger> loggers;
private static final long INVALID_EPOCH = -1;
@@ -63,44 +61,15 @@ class AsyncLoggerSet {
this.loggers = ImmutableList.copyOf(loggers);
}
- /**
- * Fence any previous writers, and obtain a unique epoch number
- * for write-access to the journal nodes.
- *
- * @param nsInfo the expected namespace information. If the remote
- * node does not match with this namespace, the request will be rejected.
- * @return the new, unique epoch number
- * @throws IOException
- */
- Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch(
- NamespaceInfo nsInfo) throws IOException {
- Preconditions.checkState(myEpoch == -1,
- "epoch already created: epoch=" + myEpoch);
-
- Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
- waitForWriteQuorum(getJournalState(), NEWEPOCH_TIMEOUT_MS);
-
- long maxPromised = Long.MIN_VALUE;
- for (GetJournalStateResponseProto resp : lastPromises.values()) {
- maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
- }
- assert maxPromised >= 0;
-
- long myEpoch = maxPromised + 1;
- Map<AsyncLogger, NewEpochResponseProto> resps =
- waitForWriteQuorum(newEpoch(nsInfo, myEpoch), NEWEPOCH_TIMEOUT_MS);
- this.myEpoch = myEpoch;
- setEpoch(myEpoch);
- return resps;
- }
-
- private void setEpoch(long e) {
+ void setEpoch(long e) {
+ Preconditions.checkState(!isEpochEstablished(),
+ "Epoch already established: epoch=%s", myEpoch);
+ myEpoch = e;
for (AsyncLogger l : loggers) {
l.setEpoch(e);
}
}
-
/**
* Set the highest successfully committed txid seen by the writer.
* This should be called after a successful write to a quorum, and is used
@@ -113,6 +82,13 @@ class AsyncLoggerSet {
}
/**
+ * @return true if an epoch has been established.
+ */
+ boolean isEpochEstablished() {
+ return myEpoch != INVALID_EPOCH;
+ }
+
+ /**
* @return the epoch number for this writer. This may only be called after
* a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
*/
@@ -143,19 +119,20 @@ class AsyncLoggerSet {
* can't be achieved, throws a QuorumException.
* @param q the quorum call
* @param timeoutMs the number of millis to wait
+ * @param operationName textual description of the operation, for logging
* @return a map of successful results
* @throws QuorumException if a quorum doesn't respond with success
* @throws IOException if the thread is interrupted or times out
*/
<V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
- int timeoutMs) throws IOException {
+ int timeoutMs, String operationName) throws IOException {
int majority = getMajoritySize();
try {
q.waitFor(
loggers.size(), // either all respond
majority, // or we get a majority successes
majority, // or we get a majority failures,
- timeoutMs);
+ timeoutMs, operationName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
@@ -227,7 +204,7 @@ class AsyncLoggerSet {
// in a QuorumCall.
///////////////////////////////////////////////////////////////////////////
- private QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
+ public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
@@ -266,7 +243,7 @@ class AsyncLoggerSet {
return QuorumCall.create(calls);
}
- private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
+ public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
NamespaceInfo nsInfo,
long epoch) {
Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Tue Sep 11 06:31:42 2012
@@ -39,6 +39,23 @@ import com.google.protobuf.TextFormat;
class QuorumCall<KEY, RESULT> {
private final Map<KEY, RESULT> successes = Maps.newHashMap();
private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
+
+ /**
+ * Interval, in milliseconds, at which a log message will be made
+ * while waiting for a quorum call.
+ */
+ private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000;
+
+ /**
+ * Start logging messages at INFO level periodically after waiting for
+ * this fraction of the configured timeout for any call.
+ */
+ private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f;
+ /**
+ * Start logging messages at WARN level after waiting for this
+ * fraction of the configured timeout for any call.
+ */
+ private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f;
static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
@@ -85,17 +102,35 @@ class QuorumCall<KEY, RESULT> {
*/
public synchronized void waitFor(
int minResponses, int minSuccesses, int maxExceptions,
- int millis)
+ int millis, String operationName)
throws InterruptedException, TimeoutException {
- long et = Time.monotonicNow() + millis;
+ long st = Time.monotonicNow();
+ long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD);
+ long et = st + millis;
while (true) {
if (minResponses > 0 && countResponses() >= minResponses) return;
if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
- long rem = et - Time.monotonicNow();
+ long now = Time.monotonicNow();
+
+ if (now > nextLogTime) {
+ long waited = now - st;
+ String msg = String.format(
+ "Waited %s ms (timeout=%s ms) for a response for %s",
+ waited, millis, operationName);
+ if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) {
+ QuorumJournalManager.LOG.warn(msg);
+ } else {
+ QuorumJournalManager.LOG.info(msg);
+ }
+ nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS;
+ }
+ long rem = et - now;
if (rem <= 0) {
throw new TimeoutException();
}
+ rem = Math.min(rem, nextLogTime - now);
+ rem = Math.max(rem, 1);
wait(rem);
}
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Tue Sep 11 06:31:42 2012
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -52,6 +53,7 @@ import com.google.common.annotations.Vis
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.protobuf.TextFormat;
/**
* A JournalManager that writes to a set of remote JournalNodes,
@@ -67,6 +69,8 @@ public class QuorumJournalManager implem
private final int acceptRecoveryTimeoutMs;
private final int finalizeSegmentTimeoutMs;
private final int selectInputStreamsTimeoutMs;
+ private final int getJournalStateTimeoutMs;
+ private final int newEpochTimeoutMs;
// Since these don't occur during normal operation, we can
// use rather lengthy timeouts, and don't need to make them
@@ -112,6 +116,13 @@ public class QuorumJournalManager implem
this.selectInputStreamsTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
+ this.getJournalStateTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT);
+ this.newEpochTimeoutMs = conf.getInt(
+ DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
+
}
@@ -138,11 +149,43 @@ public class QuorumJournalManager implem
"bad journal id: " + jid);
}
+
+ /**
+ * Fence any previous writers, and obtain a unique epoch number
+ * for write-access to the journal nodes.
+ *
+ * @return the new, unique epoch number
+ */
+ Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch()
+ throws IOException {
+ Preconditions.checkState(!loggers.isEpochEstablished(),
+ "epoch already created");
+
+ Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
+ loggers.waitForWriteQuorum(loggers.getJournalState(),
+ getJournalStateTimeoutMs, "getJournalState()");
+
+ long maxPromised = Long.MIN_VALUE;
+ for (GetJournalStateResponseProto resp : lastPromises.values()) {
+ maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
+ }
+ assert maxPromised >= 0;
+
+ long myEpoch = maxPromised + 1;
+ Map<AsyncLogger, NewEpochResponseProto> resps =
+ loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch),
+ newEpochTimeoutMs, "newEpoch(" + myEpoch + ")");
+
+ loggers.setEpoch(myEpoch);
+ return resps;
+ }
+
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
try {
- call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
+ call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS,
+ "format");
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for format() response");
} catch (TimeoutException e) {
@@ -160,7 +203,7 @@ public class QuorumJournalManager implem
loggers.isFormatted();
try {
- call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
+ call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData");
} catch (InterruptedException e) {
throw new IOException("Interrupted while determining if JNs have data");
} catch (TimeoutException e) {
@@ -206,7 +249,8 @@ public class QuorumJournalManager implem
QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
loggers.prepareRecovery(segmentTxId);
Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
- loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs);
+ loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs,
+ "prepareRecovery(" + segmentTxId + ")");
LOG.info("Recovery prepare phase complete. Responses:\n" +
QuorumCall.mapToString(prepareResponses));
@@ -283,7 +327,8 @@ public class QuorumJournalManager implem
URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
- loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs);
+ loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
+ "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
// TODO:
// we should only try to finalize loggers who successfully synced above
@@ -292,7 +337,10 @@ public class QuorumJournalManager implem
QuorumCall<AsyncLogger, Void> finalize =
loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId());
- loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
+ loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
+ String.format("finalizeLogSegment(%s-%s)",
+ logToSync.getStartTxId(),
+ logToSync.getEndTxId()));
}
static List<AsyncLogger> createLoggers(Configuration conf,
@@ -336,7 +384,8 @@ public class QuorumJournalManager implem
Preconditions.checkState(isActiveWriter,
"must recover segments before starting a new one");
QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
- loggers.waitForWriteQuorum(q, startSegmentTimeoutMs);
+ loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
+ "startLogSegment(" + txId + ")");
return new QuorumOutputStream(loggers, txId);
}
@@ -345,7 +394,8 @@ public class QuorumJournalManager implem
throws IOException {
QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
firstTxId, lastTxId);
- loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs);
+ loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
+ String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
}
@Override
@@ -366,8 +416,7 @@ public class QuorumJournalManager implem
public void recoverUnfinalizedSegments() throws IOException {
Preconditions.checkState(!isActiveWriter, "already active writer");
- Map<AsyncLogger, NewEpochResponseProto> resps =
- loggers.createNewUniqueEpoch(nsInfo);
+ Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
QuorumCall.mapToString(resps));
@@ -399,7 +448,8 @@ public class QuorumJournalManager implem
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId);
Map<AsyncLogger, RemoteEditLogManifest> resps =
- loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
+ loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
+ "selectInputStreams");
LOG.debug("selectInputStream manifests:\n" +
Joiner.on("\n").withKeyValueSeparator(": ").join(resps));
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Tue Sep 11 06:31:42 2012
@@ -101,7 +101,7 @@ class QuorumOutputStream extends EditLog
QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
segmentTxId, firstTxToFlush,
numReadyTxns, data);
- loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
+ loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout
// Since we successfully wrote this batch, let the loggers know. Any future
// RPCs will thus let the loggers know of the most recent transaction, even
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Tue Sep 11 06:31:42 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal.
import static org.junit.Assert.*;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import java.util.Random;
@@ -56,35 +57,43 @@ public class TestEpochsAreUnique {
URI uri = cluster.getQuorumJournalURI(JID);
QuorumJournalManager qjm = new QuorumJournalManager(
conf, uri, FAKE_NSINFO);
- qjm.format(FAKE_NSINFO);
+ try {
+ qjm.format(FAKE_NSINFO);
+ } finally {
+ qjm.close();
+ }
try {
// With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) {
- AsyncLoggerSet als = new AsyncLoggerSet(
- QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
- IPCLoggerChannel.FACTORY));
- als.createNewUniqueEpoch(FAKE_NSINFO);
- assertEquals(i + 1, als.getEpoch());
+ qjm = new QuorumJournalManager(
+ conf, uri, FAKE_NSINFO);
+ try {
+ qjm.createNewUniqueEpoch();
+ assertEquals(i + 1, qjm.getLoggerSetForTests().getEpoch());
+ } finally {
+ qjm.close();
+ }
}
long prevEpoch = 5;
// With some failures injected, it should still always increase, perhaps
// skipping some
for (int i = 0; i < 20; i++) {
- AsyncLoggerSet als = new AsyncLoggerSet(
- makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO,
- IPCLoggerChannel.FACTORY)));
long newEpoch = -1;
while (true) {
+ qjm = new QuorumJournalManager(
+ conf, uri, FAKE_NSINFO, new FaultyLoggerFactory());
try {
- als.createNewUniqueEpoch(FAKE_NSINFO);
- newEpoch = als.getEpoch();
+ qjm.createNewUniqueEpoch();
+ newEpoch = qjm.getLoggerSetForTests().getEpoch();
break;
} catch (IOException ioe) {
// It's OK to fail to create an epoch, since we randomly inject
// faults. It's possible we'll inject faults in too many of the
// underlying nodes, and a failure is expected in that case
+ } finally {
+ qjm.close();
}
}
LOG.info("Created epoch " + newEpoch);
@@ -97,20 +106,23 @@ public class TestEpochsAreUnique {
}
}
-
- private List<AsyncLogger> makeFaulty(List<AsyncLogger> loggers) {
- List<AsyncLogger> ret = Lists.newArrayList();
- for (AsyncLogger l : loggers) {
- AsyncLogger spy = Mockito.spy(l);
+ private class FaultyLoggerFactory implements AsyncLogger.Factory {
+ @Override
+ public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+ String journalId, InetSocketAddress addr) {
+ AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
+ conf, nsInfo, journalId, addr);
+ AsyncLogger spy = Mockito.spy(ch);
Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
.when(spy).getJournalState();
Mockito.doAnswer(new SometimesFaulty<Void>(0.40f))
.when(spy).newEpoch(Mockito.anyLong());
- ret.add(spy);
+
+ return spy;
}
- return ret;
+
}
-
+
private class SometimesFaulty<T> implements Answer<ListenableFuture<T>> {
private float faultProbability;
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Tue Sep 11 06:31:42 2012
@@ -42,8 +42,8 @@ public class TestQuorumCall {
assertEquals(0, q.countResponses());
futures.get("f1").set("first future");
- q.waitFor(1, 0, 0, 100000); // wait for 1 response
- q.waitFor(0, 1, 0, 100000); // wait for 1 success
+ q.waitFor(1, 0, 0, 100000, "test"); // wait for 1 response
+ q.waitFor(0, 1, 0, 100000, "test"); // wait for 1 success
assertEquals(1, q.countResponses());
@@ -51,8 +51,8 @@ public class TestQuorumCall {
assertEquals(2, q.countResponses());
futures.get("f3").set("second future");
- q.waitFor(3, 0, 100, 100000); // wait for 3 responses
- q.waitFor(0, 2, 100, 100000); // 2 successes
+ q.waitFor(3, 0, 100, 100000, "test"); // wait for 3 responses
+ q.waitFor(0, 2, 100, 100000, "test"); // 2 successes
assertEquals(3, q.countResponses());
assertEquals("f1=first future,f3=second future",
@@ -60,7 +60,7 @@ public class TestQuorumCall {
new TreeMap<String, String>(q.getResults())));
try {
- q.waitFor(0, 4, 100, 10);
+ q.waitFor(0, 4, 100, 10, "test");
fail("Didn't time out waiting for more responses than came back");
} catch (TimeoutException te) {
// expected