You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/11 00:30:53 UTC

svn commit: r1383137 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/ src/main/java/org/apache/hadoop/hdfs/qjournal/...

Author: todd
Date: Mon Sep 10 22:30:52 2012
New Revision: 1383137

URL: http://svn.apache.org/viewvc?rev=1383137&view=rev
Log:
HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Mon Sep 10 22:30:52 2012
@@ -58,3 +58,5 @@ HDFS-3898. QJM: enable TCP_NODELAY for I
 HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)
 
 HDFS-3900. QJM: avoid validating log segments on log rolls (todd)
+
+HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Mon Sep 10 22:30:52 2012
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -52,6 +53,7 @@ import org.apache.hadoop.security.Securi
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -92,6 +94,19 @@ public class IPCLoggerChannel implements
    * The highest txid that has been successfully logged on the remote JN.
    */
   private long highestAckedTxId = 0;
+
+  /**
+   * Nanotime of the last time we successfully journaled some edits
+   * to the remote node.
+   */
+  private long lastAckNanos = 0;
+
+  /**
+   * Nanotime of the last time that committedTxId was update. Used
+   * to calculate the lag in terms of time, rather than just a number
+   * of txns.
+   */
+  private long lastCommitNanos = 0;
   
   /**
    * The maximum number of bytes that can be pending in the queue.
@@ -109,6 +124,13 @@ public class IPCLoggerChannel implements
    */
   private boolean outOfSync = false;
   
+  /**
+   * Stopwatch which starts counting on each heartbeat that is sent
+   */
+  private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
+  
+  private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+  
   static final Factory FACTORY = new AsyncLogger.Factory() {
     @Override
     public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
@@ -145,6 +167,7 @@ public class IPCLoggerChannel implements
         "Trying to move committed txid backwards in client " +
          "old: %s new: %s", committedTxId, txid);
     this.committedTxId = txid;
+    this.lastCommitNanos = System.nanoTime();
   }
   
   @Override
@@ -295,6 +318,11 @@ public class IPCLoggerChannel implements
     } catch (LoggerTooFarBehindException e) {
       return Futures.immediateFailedFuture(e);
     }
+    
+    // When this batch is acked, we use its submission time in order
+    // to calculate how far we are lagging.
+    final long submitNanos = System.nanoTime();
+    
     ListenableFuture<Void> ret = null;
     try {
       ret = executor.submit(new Callable<Void>() {
@@ -318,6 +346,7 @@ public class IPCLoggerChannel implements
           }
           synchronized (IPCLoggerChannel.this) {
             highestAckedTxId = firstTxnId + numTxns - 1;
+            lastAckNanos = submitNanos;
           }
           return null;
         }
@@ -347,15 +376,40 @@ public class IPCLoggerChannel implements
     return ret;
   }
 
-  private synchronized void throwIfOutOfSync() throws JournalOutOfSyncException {
-    if (outOfSync) {
-      // TODO: send a "heartbeat" here so that the remote node knows the newest
-      // committed txid, for metrics purposes
+  private void throwIfOutOfSync()
+      throws JournalOutOfSyncException, IOException {
+    if (isOutOfSync()) {
+      // Even if we're out of sync, it's useful to send an RPC
+      // to the remote node in order to update its lag metrics, etc.
+      heartbeatIfNecessary();
       throw new JournalOutOfSyncException(
           "Journal disabled until next roll");
     }
   }
 
+  /**
+   * When we've entered an out-of-sync state, it's still useful to periodically
+   * send an empty RPC to the server, such that it has the up to date
+   * committedTxId. This acts as a sanity check during recovery, and also allows
+   * that node's metrics to be up-to-date about its lag.
+   * 
+   * In the future, this method may also be used in order to check that the
+   * current node is still the current writer, even if no edits are being
+   * written.
+   */
+  private void heartbeatIfNecessary() throws IOException {
+    if (lastHeartbeatStopwatch.elapsedMillis() > HEARTBEAT_INTERVAL_MILLIS ||
+        !lastHeartbeatStopwatch.isRunning()) {
+      try {
+        getProxy().heartbeat(createReqInfo());
+      } finally {
+        // Don't send heartbeats more often than the configured interval,
+        // even if they fail.
+        lastHeartbeatStopwatch.reset().start();
+      }
+    }
+  }
+
   private synchronized void reserveQueueSpace(int size)
       throws LoggerTooFarBehindException {
     Preconditions.checkArgument(size >= 0);
@@ -479,13 +533,27 @@ public class IPCLoggerChannel implements
   @Override
   public synchronized void appendHtmlReport(StringBuilder sb) {
     sb.append("Written txid ").append(highestAckedTxId);
-    long behind = committedTxId - highestAckedTxId;
-    assert behind >= 0;
+    long behind = getLagTxns();
     if (behind > 0) {
-      sb.append(" (" + behind + " behind)");
+      if (lastAckNanos != 0) {
+        long lagMillis = getLagTimeMillis();
+        sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
+      } else {
+        sb.append(" (never written");
+      }
     }
     if (outOfSync) {
-      sb.append(" (will re-join on next segment)");
+      sb.append(" (will try to re-sync on next segment)");
     }
   }
+  
+  private long getLagTxns() {
+    return Math.max(committedTxId - highestAckedTxId, 0);
+  }
+  
+  private long getLagTimeMillis() {
+    return TimeUnit.MILLISECONDS.convert(
+        Math.max(lastCommitNanos - lastAckNanos, 0),
+        TimeUnit.NANOSECONDS);
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Mon Sep 10 22:30:52 2012
@@ -77,6 +77,15 @@ public interface QJournalProtocol {
                       int numTxns,
                       byte[] records) throws IOException;
 
+  
+  /**
+   * Heartbeat.
+   * This is a no-op on the server, except that it verifies that the
+   * caller is in fact still the active writer, and provides up-to-date
+   * information on the most recently committed txid.
+   */
+  public void heartbeat(RequestInfo reqInfo) throws IOException;
+  
   /**
    * Start writing to a new log segment on the JournalNode.
    * Before calling this, one should finalize the previous segment

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Mon Sep 10 22:30:52 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
@@ -118,6 +120,18 @@ public class QJournalProtocolServerSideT
     return JournalResponseProto.newBuilder().build();
   }
 
+  /** @see JournalProtocol#heartbeat */
+  @Override
+  public HeartbeatResponseProto heartbeat(RpcController controller,
+      HeartbeatRequestProto req) throws ServiceException {
+    try {
+      impl.heartbeat(convert(req.getReqInfo()));
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return HeartbeatResponseProto.getDefaultInstance();
+  }
+
   /** @see JournalProtocol#startLogSegment */
   @Override
   public StartLogSegmentResponseProto startLogSegment(RpcController controller,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Mon Sep 10 22:30:52 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
@@ -141,6 +142,17 @@ public class QJournalProtocolTranslatorP
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    try {
+      rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
+            .setReqInfo(convert(reqInfo))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   private QJournalProtocolProtos.RequestInfoProto convert(
       RequestInfo reqInfo) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Mon Sep 10 22:30:52 2012
@@ -75,6 +75,7 @@ class Journal implements Closeable {
   private EditLogOutputStream curSegment;
   private long curSegmentTxId = HdfsConstants.INVALID_TXID;
   private long nextTxId = HdfsConstants.INVALID_TXID;
+  private long highestWrittenTxId = 0;
   
   private final String journalId;
   
@@ -123,6 +124,11 @@ class Journal implements Closeable {
     this.fjm = storage.getJournalManager();
     
     this.metrics = JournalMetrics.create(this);
+    
+    EditLogFile latest = scanStorageForLatestEdits();
+    if (latest != null) {
+      highestWrittenTxId = latest.getLastTxId();
+    }
   }
 
   /**
@@ -224,6 +230,19 @@ class Journal implements Closeable {
     return committedTxnId.get();
   }
   
+  synchronized long getCurrentLagTxns() throws IOException {
+    long committed = committedTxnId.get();
+    if (committed == 0) {
+      return 0;
+    }
+    
+    return Math.max(committed - highestWrittenTxId, 0L);
+  }
+  
+  synchronized long getHighestWrittenTxId() {
+    return highestWrittenTxId;
+  }
+  
   @VisibleForTesting
   JournalMetrics getMetricsForTests() {
     return metrics;
@@ -329,19 +348,20 @@ class Journal implements Closeable {
       // This batch of edits has already been committed on a quorum of other
       // nodes. So, we are in "catch up" mode. This gets its own metric.
       metrics.batchesWrittenWhileLagging.incr(1);
-      metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId);
-    } else {
-      metrics.currentLagTxns.set(0L);
     }
     
     metrics.batchesWritten.incr(1);
     metrics.bytesWritten.incr(records.length);
     metrics.txnsWritten.incr(numTxns);
-    metrics.lastWrittenTxId.set(lastTxnId);
     
-    nextTxId += numTxns;
+    highestWrittenTxId = lastTxnId;
+    nextTxId = lastTxnId + 1;
   }
 
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    checkRequest(reqInfo);
+  }
+  
   /**
    * Ensure that the given request is coming from the correct writer and in-order.
    * @param reqInfo the request info
@@ -690,6 +710,10 @@ class Journal implements Closeable {
       if (currentSegment == null) {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
             ": no current segment in place");
+        
+        // Update the highest txid for lag metrics
+        highestWrittenTxId = Math.max(segment.getEndTxId(),
+            highestWrittenTxId);
       } else {
         LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
             ": old segment " + TextFormat.shortDebugString(currentSegment) +
@@ -708,8 +732,15 @@ class Journal implements Closeable {
               ": would discard already-committed txn " +
               committedTxnId.get());
         }
+        
+        // If we're shortening the log, update our highest txid
+        // used for lag metrics.
+        if (txnRange(currentSegment).contains(highestWrittenTxId)) {
+          highestWrittenTxId = segment.getEndTxId();
+        }
       }
       syncLog(reqInfo, segment, fromUrl);
+      
     } else {
       LOG.info("Skipping download of log " +
           TextFormat.shortDebugString(segment) +

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Mon Sep 10 22:30:52 2012
@@ -51,12 +51,6 @@ class JournalMetrics {
   
   MutableQuantiles[] syncsQuantiles;
   
-  @Metric("Transaction lag behind the most recent commit")
-  MutableGaugeLong currentLagTxns;
-  
-  @Metric("Last written txid")
-  MutableGaugeLong lastWrittenTxId;
-  
   private final Journal journal;
 
   JournalMetrics(Journal journal) {
@@ -99,6 +93,20 @@ class JournalMetrics {
     }
   }
   
+  @Metric("The highest txid stored on this JN")
+  public long getLastWrittenTxId() {
+    return journal.getHighestWrittenTxId();
+  }
+  
+  @Metric("Number of transactions that this JN is lagging")
+  public long getCurrentLagTxns() {
+    try {
+      return journal.getCurrentLagTxns();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
   void addSync(long us) {
     for (MutableQuantiles q : syncsQuantiles) {
       q.add(us);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Mon Sep 10 22:30:52 2012
@@ -137,6 +137,12 @@ class JournalNodeRpcServer implements QJ
     jn.getOrCreateJournal(reqInfo.getJournalId())
        .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
   }
+  
+  @Override
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .heartbeat(reqInfo);
+  }
 
   @Override
   public void startLogSegment(RequestInfo reqInfo, long txid)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Mon Sep 10 22:30:52 2012
@@ -72,6 +72,17 @@ message JournalResponseProto { 
 }
 
 /**
+ * heartbeat()
+ */
+
+message HeartbeatRequestProto {
+  required RequestInfoProto reqInfo = 1;
+}
+
+message HeartbeatResponseProto { // void response
+}
+
+/**
  * startLogSegment()
  */
 message StartLogSegmentRequestProto {
@@ -207,6 +218,8 @@ service QJournalProtocolService {
 
   rpc journal(JournalRequestProto) returns (JournalResponseProto);
 
+  rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto);
+
   rpc startLogSegment(StartLogSegmentRequestProto) 
       returns (StartLogSegmentResponseProto);
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Mon Sep 10 22:30:52 2012
@@ -22,8 +22,10 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -193,6 +195,9 @@ public class TestNNWithQJM {
         MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
     conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         mjc.getQuorumJournalURI("myjournal").toString());
+    // Speed up the test
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
     
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .numDataNodes(0)
@@ -217,7 +222,18 @@ public class TestNNWithQJM {
       
       contents = DFSTestUtil.urlGet(url); 
       System.out.println(contents);
-      assertTrue(contents.contains("(1 behind)"));
+      assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents)
+          .find());
+
+      // Restart NN while JN0 is still down.
+      cluster.restartNameNode();
+
+      contents = DFSTestUtil.urlGet(url); 
+      System.out.println(contents);
+      assertTrue(Pattern.compile("never written").matcher(contents)
+          .find());
+      
+
     } finally {
       cluster.shutdown();
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Mon Sep 10 22:30:52 2012
@@ -163,11 +163,14 @@ public class TestIPCLoggerChannel {
           ee.getCause());
     }
     
-    // It should have failed without even sending an RPC, since it was not sync.
+    // It should have failed without even sending the edits, since it was not sync.
     Mockito.verify(mockProxy, Mockito.never()).journal(
         Mockito.<RequestInfo>any(),
         Mockito.eq(1L), Mockito.eq(2L),
         Mockito.eq(1), Mockito.same(FAKE_DATA));
+    // It should have sent a heartbeat instead.
+    Mockito.verify(mockProxy).heartbeat(
+        Mockito.<RequestInfo>any());
     
     // After a roll, sending new edits should not fail.
     ch.startLogSegment(3L).get();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1383137&r1=1383136&r2=1383137&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Mon Sep 10 22:30:52 2012
@@ -101,6 +101,7 @@ public class TestJournalNode {
         journal.getMetricsForTests().getName());
     MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
     MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
 
     IPCLoggerChannel ch = new IPCLoggerChannel(
         conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
@@ -113,6 +114,17 @@ public class TestJournalNode {
         journal.getMetricsForTests().getName());
     MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics);
     MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
+
+    ch.setCommittedTxId(100L);
+    ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
+
+    metrics = MetricsAsserts.getMetrics(
+        journal.getMetricsForTests().getName());
+    MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics);
+    MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics);
+    MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics);
+
   }