You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/11 00:30:53 UTC
svn commit: r1383137 - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/qjournal/client/
src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/
src/main/java/org/apache/hadoop/hdfs/qjournal/...
Author: todd
Date: Mon Sep 10 22:30:52 2012
New Revision: 1383137
URL: http://svn.apache.org/viewvc?rev=1383137&view=rev
Log:
HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync. Contributed by Todd Lipcon.
Modified:
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Mon Sep 10 22:30:52 2012
@@ -58,3 +58,5 @@ HDFS-3898. QJM: enable TCP_NODELAY for I
HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)
HDFS-3900. QJM: avoid validating log segments on log rolls (todd)
+
+HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync (todd)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Mon Sep 10 22:30:52 2012
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -52,6 +53,7 @@ import org.apache.hadoop.security.Securi
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -92,6 +94,19 @@ public class IPCLoggerChannel implements
* The highest txid that has been successfully logged on the remote JN.
*/
private long highestAckedTxId = 0;
+
+ /**
+ * Nanotime of the last time we successfully journaled some edits
+ * to the remote node.
+ */
+ private long lastAckNanos = 0;
+
+ /**
+ * Nanotime of the last time that committedTxId was update. Used
+ * to calculate the lag in terms of time, rather than just a number
+ * of txns.
+ */
+ private long lastCommitNanos = 0;
/**
* The maximum number of bytes that can be pending in the queue.
@@ -109,6 +124,13 @@ public class IPCLoggerChannel implements
*/
private boolean outOfSync = false;
+ /**
+ * Stopwatch which starts counting on each heartbeat that is sent
+ */
+ private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
+
+ private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+
static final Factory FACTORY = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
@@ -145,6 +167,7 @@ public class IPCLoggerChannel implements
"Trying to move committed txid backwards in client " +
"old: %s new: %s", committedTxId, txid);
this.committedTxId = txid;
+ this.lastCommitNanos = System.nanoTime();
}
@Override
@@ -295,6 +318,11 @@ public class IPCLoggerChannel implements
} catch (LoggerTooFarBehindException e) {
return Futures.immediateFailedFuture(e);
}
+
+ // When this batch is acked, we use its submission time in order
+ // to calculate how far we are lagging.
+ final long submitNanos = System.nanoTime();
+
ListenableFuture<Void> ret = null;
try {
ret = executor.submit(new Callable<Void>() {
@@ -318,6 +346,7 @@ public class IPCLoggerChannel implements
}
synchronized (IPCLoggerChannel.this) {
highestAckedTxId = firstTxnId + numTxns - 1;
+ lastAckNanos = submitNanos;
}
return null;
}
@@ -347,15 +376,40 @@ public class IPCLoggerChannel implements
return ret;
}
- private synchronized void throwIfOutOfSync() throws JournalOutOfSyncException {
- if (outOfSync) {
- // TODO: send a "heartbeat" here so that the remote node knows the newest
- // committed txid, for metrics purposes
+ private void throwIfOutOfSync()
+ throws JournalOutOfSyncException, IOException {
+ if (isOutOfSync()) {
+ // Even if we're out of sync, it's useful to send an RPC
+ // to the remote node in order to update its lag metrics, etc.
+ heartbeatIfNecessary();
throw new JournalOutOfSyncException(
"Journal disabled until next roll");
}
}
+ /**
+ * When we've entered an out-of-sync state, it's still useful to periodically
+ * send an empty RPC to the server, such that it has the up to date
+ * committedTxId. This acts as a sanity check during recovery, and also allows
+ * that node's metrics to be up-to-date about its lag.
+ *
+ * In the future, this method may also be used in order to check that the
+ * current node is still the current writer, even if no edits are being
+ * written.
+ */
+ private void heartbeatIfNecessary() throws IOException {
+ if (lastHeartbeatStopwatch.elapsedMillis() > HEARTBEAT_INTERVAL_MILLIS ||
+ !lastHeartbeatStopwatch.isRunning()) {
+ try {
+ getProxy().heartbeat(createReqInfo());
+ } finally {
+ // Don't send heartbeats more often than the configured interval,
+ // even if they fail.
+ lastHeartbeatStopwatch.reset().start();
+ }
+ }
+ }
+
private synchronized void reserveQueueSpace(int size)
throws LoggerTooFarBehindException {
Preconditions.checkArgument(size >= 0);
@@ -479,13 +533,27 @@ public class IPCLoggerChannel implements
@Override
public synchronized void appendHtmlReport(StringBuilder sb) {
sb.append("Written txid ").append(highestAckedTxId);
- long behind = committedTxId - highestAckedTxId;
- assert behind >= 0;
+ long behind = getLagTxns();
if (behind > 0) {
- sb.append(" (" + behind + " behind)");
+ if (lastAckNanos != 0) {
+ long lagMillis = getLagTimeMillis();
+ sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
+ } else {
+ sb.append(" (never written");
+ }
}
if (outOfSync) {
- sb.append(" (will re-join on next segment)");
+ sb.append(" (will try to re-sync on next segment)");
}
}
+
+ private long getLagTxns() {
+ return Math.max(committedTxId - highestAckedTxId, 0);
+ }
+
+ private long getLagTimeMillis() {
+ return TimeUnit.MILLISECONDS.convert(
+ Math.max(lastCommitNanos - lastAckNanos, 0),
+ TimeUnit.NANOSECONDS);
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Mon Sep 10 22:30:52 2012
@@ -77,6 +77,15 @@ public interface QJournalProtocol {
int numTxns,
byte[] records) throws IOException;
+
+ /**
+ * Heartbeat.
+ * This is a no-op on the server, except that it verifies that the
+ * caller is in fact still the active writer, and provides up-to-date
+ * information on the most recently committed txid.
+ */
+ public void heartbeat(RequestInfo reqInfo) throws IOException;
+
/**
* Start writing to a new log segment on the JournalNode.
* Before calling this, one should finalize the previous segment
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Mon Sep 10 22:30:52 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.qjournal.p
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
@@ -118,6 +120,18 @@ public class QJournalProtocolServerSideT
return JournalResponseProto.newBuilder().build();
}
+ /** @see JournalProtocol#heartbeat */
+ @Override
+ public HeartbeatResponseProto heartbeat(RpcController controller,
+ HeartbeatRequestProto req) throws ServiceException {
+ try {
+ impl.heartbeat(convert(req.getReqInfo()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ return HeartbeatResponseProto.getDefaultInstance();
+ }
+
/** @see JournalProtocol#startLogSegment */
@Override
public StartLogSegmentResponseProto startLogSegment(RpcController controller,
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Mon Sep 10 22:30:52 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.p
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
@@ -141,6 +142,17 @@ public class QJournalProtocolTranslatorP
throw ProtobufHelper.getRemoteException(e);
}
}
+
+ @Override
+ public void heartbeat(RequestInfo reqInfo) throws IOException {
+ try {
+ rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
private QJournalProtocolProtos.RequestInfoProto convert(
RequestInfo reqInfo) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Mon Sep 10 22:30:52 2012
@@ -75,6 +75,7 @@ class Journal implements Closeable {
private EditLogOutputStream curSegment;
private long curSegmentTxId = HdfsConstants.INVALID_TXID;
private long nextTxId = HdfsConstants.INVALID_TXID;
+ private long highestWrittenTxId = 0;
private final String journalId;
@@ -123,6 +124,11 @@ class Journal implements Closeable {
this.fjm = storage.getJournalManager();
this.metrics = JournalMetrics.create(this);
+
+ EditLogFile latest = scanStorageForLatestEdits();
+ if (latest != null) {
+ highestWrittenTxId = latest.getLastTxId();
+ }
}
/**
@@ -224,6 +230,19 @@ class Journal implements Closeable {
return committedTxnId.get();
}
+ synchronized long getCurrentLagTxns() throws IOException {
+ long committed = committedTxnId.get();
+ if (committed == 0) {
+ return 0;
+ }
+
+ return Math.max(committed - highestWrittenTxId, 0L);
+ }
+
+ synchronized long getHighestWrittenTxId() {
+ return highestWrittenTxId;
+ }
+
@VisibleForTesting
JournalMetrics getMetricsForTests() {
return metrics;
@@ -329,19 +348,20 @@ class Journal implements Closeable {
// This batch of edits has already been committed on a quorum of other
// nodes. So, we are in "catch up" mode. This gets its own metric.
metrics.batchesWrittenWhileLagging.incr(1);
- metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId);
- } else {
- metrics.currentLagTxns.set(0L);
}
metrics.batchesWritten.incr(1);
metrics.bytesWritten.incr(records.length);
metrics.txnsWritten.incr(numTxns);
- metrics.lastWrittenTxId.set(lastTxnId);
- nextTxId += numTxns;
+ highestWrittenTxId = lastTxnId;
+ nextTxId = lastTxnId + 1;
}
+ public void heartbeat(RequestInfo reqInfo) throws IOException {
+ checkRequest(reqInfo);
+ }
+
/**
* Ensure that the given request is coming from the correct writer and in-order.
* @param reqInfo the request info
@@ -690,6 +710,10 @@ class Journal implements Closeable {
if (currentSegment == null) {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": no current segment in place");
+
+ // Update the highest txid for lag metrics
+ highestWrittenTxId = Math.max(segment.getEndTxId(),
+ highestWrittenTxId);
} else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(currentSegment) +
@@ -708,8 +732,15 @@ class Journal implements Closeable {
": would discard already-committed txn " +
committedTxnId.get());
}
+
+ // If we're shortening the log, update our highest txid
+ // used for lag metrics.
+ if (txnRange(currentSegment).contains(highestWrittenTxId)) {
+ highestWrittenTxId = segment.getEndTxId();
+ }
}
syncLog(reqInfo, segment, fromUrl);
+
} else {
LOG.info("Skipping download of log " +
TextFormat.shortDebugString(segment) +
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Mon Sep 10 22:30:52 2012
@@ -51,12 +51,6 @@ class JournalMetrics {
MutableQuantiles[] syncsQuantiles;
- @Metric("Transaction lag behind the most recent commit")
- MutableGaugeLong currentLagTxns;
-
- @Metric("Last written txid")
- MutableGaugeLong lastWrittenTxId;
-
private final Journal journal;
JournalMetrics(Journal journal) {
@@ -99,6 +93,20 @@ class JournalMetrics {
}
}
+ @Metric("The highest txid stored on this JN")
+ public long getLastWrittenTxId() {
+ return journal.getHighestWrittenTxId();
+ }
+
+ @Metric("Number of transactions that this JN is lagging")
+ public long getCurrentLagTxns() {
+ try {
+ return journal.getCurrentLagTxns();
+ } catch (IOException e) {
+ return -1L;
+ }
+ }
+
void addSync(long us) {
for (MutableQuantiles q : syncsQuantiles) {
q.add(us);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Mon Sep 10 22:30:52 2012
@@ -137,6 +137,12 @@ class JournalNodeRpcServer implements QJ
jn.getOrCreateJournal(reqInfo.getJournalId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
+
+ @Override
+ public void heartbeat(RequestInfo reqInfo) throws IOException {
+ jn.getOrCreateJournal(reqInfo.getJournalId())
+ .heartbeat(reqInfo);
+ }
@Override
public void startLogSegment(RequestInfo reqInfo, long txid)
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Mon Sep 10 22:30:52 2012
@@ -72,6 +72,17 @@ message JournalResponseProto {
}
/**
+ * heartbeat()
+ */
+
+message HeartbeatRequestProto {
+ required RequestInfoProto reqInfo = 1;
+}
+
+message HeartbeatResponseProto { // void response
+}
+
+/**
* startLogSegment()
*/
message StartLogSegmentRequestProto {
@@ -207,6 +218,8 @@ service QJournalProtocolService {
rpc journal(JournalRequestProto) returns (JournalResponseProto);
+ rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
+
rpc startLogSegment(StartLogSegmentRequestProto)
returns (StartLogSegmentResponseProto);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Mon Sep 10 22:30:52 2012
@@ -22,8 +22,10 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -193,6 +195,9 @@ public class TestNNWithQJM {
MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
+ // Speed up the test
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
@@ -217,7 +222,18 @@ public class TestNNWithQJM {
contents = DFSTestUtil.urlGet(url);
System.out.println(contents);
- assertTrue(contents.contains("(1 behind)"));
+ assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
+ .find());
+
+ // Restart NN while JN0 is still down.
+ cluster.restartNameNode();
+
+ contents = DFSTestUtil.urlGet(url);
+ System.out.println(contents);
+ assertTrue(Pattern.compile("never written").matcher(contents)
+ .find());
+
+
} finally {
cluster.shutdown();
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Mon Sep 10 22:30:52 2012
@@ -163,11 +163,14 @@ public class TestIPCLoggerChannel {
ee.getCause());
}
- // It should have failed without even sending an RPC, since it was not sync.
+ // It should have failed without even sending the edits, since it was not sync.
Mockito.verify(mockProxy, Mockito.never()).journal(
Mockito.<RequestInfo>any(),
Mockito.eq(1L), Mockito.eq(2L),
Mockito.eq(1), Mockito.same(FAKE_DATA));
+ // It should have sent a heartbeat instead.
+ Mockito.verify(mockProxy).heartbeat(
+ Mockito.<RequestInfo>any());
// After a roll, sending new edits should not fail.
ch.startLogSegment(3L).get();
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Mon Sep 10 22:30:52 2012
@@ -101,6 +101,7 @@ public class TestJournalNode {
journal.getMetricsForTests().getName());
MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
IPCLoggerChannel ch = new IPCLoggerChannel(
conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
@@ -113,6 +114,17 @@ public class TestJournalNode {
journal.getMetricsForTests().getName());
MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+ ch.setCommittedTxId(100L);
+ ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
+
+ metrics = MetricsAsserts.getMetrics(
+ journal.getMetricsForTests().getName());
+ MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
+ MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
+ MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
+
}