You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/09/19 20:52:16 UTC

svn commit: r1387704 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/main/j...

Author: todd
Date: Wed Sep 19 18:52:15 2012
New Revision: 1387704

URL: http://svn.apache.org/viewvc?rev=1387704&view=rev
Log:
HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Sep 19 18:52:15 2012
@@ -76,3 +76,5 @@ HDFS-3894. QJM: testRecoverAfterDoubleFa
 HDFS-3926. QJM: Add user documentation for QJM. (atm)
 
 HDFS-3943. QJM: remove currently-unused md5sum field (todd)
+
+HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Sep 19 18:52:15 2012
@@ -422,6 +422,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
   public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
   public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+  public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
   public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
   public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
@@ -429,5 +430,6 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
   public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
   public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
 }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Wed Sep 19 18:52:15 2012
@@ -25,24 +25,20 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.jasper.compiler.JspUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 
 /**
  * Wrapper around a set of Loggers, taking care of fanning out
@@ -171,6 +167,11 @@ class AsyncLoggerSet {
     return loggers.size();
   }
   
+  @Override
+  public String toString() {
+    return "[" + Joiner.on(", ").join(loggers) + "]";
+  }
+
   /**
    * Append an HTML-formatted status readout on the current
    * state of the underlying loggers.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Sep 19 18:52:15 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.security.Securi
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.net.InetAddresses;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -554,7 +555,8 @@ public class IPCLoggerChannel implements
 
   @Override
   public String toString() {
-    return "Channel to journal node " + addr; 
+    return InetAddresses.toAddrString(addr.getAddress()) + ':' +
+        addr.getPort();
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Sep 19 18:52:15 2012
@@ -71,6 +71,7 @@ public class QuorumJournalManager implem
   private final int selectInputStreamsTimeoutMs;
   private final int getJournalStateTimeoutMs;
   private final int newEpochTimeoutMs;
+  private final int writeTxnsTimeoutMs;
 
   // Since these don't occur during normal operation, we can
   // use rather lengthy timeouts, and don't need to make them
@@ -84,6 +85,8 @@ public class QuorumJournalManager implem
   private boolean isActiveWriter;
   
   private final AsyncLoggerSet loggers;
+
+  private int outputBufferCapacity = 512 * 1024;
   
   public QuorumJournalManager(Configuration conf,
       URI uri, NamespaceInfo nsInfo) throws IOException {
@@ -122,8 +125,9 @@ public class QuorumJournalManager implem
     this.newEpochTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY,
         DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT);
-    
-        
+    this.writeTxnsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT);
   }
   
   protected List<AsyncLogger> createLoggers(
@@ -303,9 +307,6 @@ public class QuorumJournalManager implem
       return;
     }
     
-    
-    // TODO: check that md5s match up between any "tied" logs
-    
     SegmentStateProto logToSync = bestResponse.getSegmentState();
     assert segmentTxId == logToSync.getStartTxId();
     
@@ -329,12 +330,11 @@ public class QuorumJournalManager implem
     QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
     loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs,
         "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")");
-    
-    // TODO:
-    // we should only try to finalize loggers who successfully synced above
-    // eg if a logger was down, we don't want to send the finalize request.
-    // write a test for this!
-    
+
+    // If one of the loggers above missed the synchronization step above, but
+    // we send a finalize() here, that's OK. It validates the log before
+    // finalizing. Hence, even if it is not "in sync", it won't incorrectly
+    // finalize.
     QuorumCall<AsyncLogger, Void> finalize =
         loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); 
     loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs,
@@ -386,7 +386,8 @@ public class QuorumJournalManager implem
     QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
     loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
         "startLogSegment(" + txId + ")");
-    return new QuorumOutputStream(loggers, txId);
+    return new QuorumOutputStream(loggers, txId,
+        outputBufferCapacity, writeTxnsTimeoutMs);
   }
 
   @Override
@@ -400,8 +401,7 @@ public class QuorumJournalManager implem
 
   @Override
   public void setOutputBufferCapacity(int size) {
-    // TODO Auto-generated method stub
-    
+    outputBufferCapacity = size;
   }
 
   @Override
@@ -416,9 +416,14 @@ public class QuorumJournalManager implem
   public void recoverUnfinalizedSegments() throws IOException {
     Preconditions.checkState(!isActiveWriter, "already active writer");
     
+    LOG.info("Starting recovery process for unclosed journal segments...");
     Map<AsyncLogger, NewEpochResponseProto> resps = createNewUniqueEpoch();
-    LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
+    LOG.info("Successfully started new epoch " + loggers.getEpoch());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
         QuorumCall.mapToString(resps));
+    }
     
     long mostRecentSegmentTxId = Long.MIN_VALUE;
     for (NewEpochResponseProto r : resps.values()) {
@@ -476,7 +481,7 @@ public class QuorumJournalManager implem
   
   @Override
   public String toString() {
-    return "Quorum journal manager " + uri;
+    return "QJM to " + loggers;
   }
 
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Wed Sep 19 18:52:15 2012
@@ -32,13 +32,16 @@ class QuorumOutputStream extends EditLog
   private final AsyncLoggerSet loggers;
   private EditsDoubleBuffer buf;
   private final long segmentTxId;
+  private final int writeTimeoutMs;
 
   public QuorumOutputStream(AsyncLoggerSet loggers,
-      long txId) throws IOException {
+      long txId, int outputBufferCapacity,
+      int writeTimeoutMs) throws IOException {
     super();
-    this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf
+    this.buf = new EditsDoubleBuffer(outputBufferCapacity);
     this.loggers = loggers;
     this.segmentTxId = txId;
+    this.writeTimeoutMs = writeTimeoutMs;
   }
 
   @Override
@@ -101,7 +104,7 @@ class QuorumOutputStream extends EditLog
       QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
           segmentTxId, firstTxToFlush,
           numReadyTxns, data);
-      loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout
+      loggers.waitForWriteQuorum(qcall, writeTimeoutMs, "sendEdits");
       
       // Since we successfully wrote this batch, let the loggers know. Any future
       // RPCs will thus let the loggers know of the most recent transaction, even
@@ -118,4 +121,8 @@ class QuorumOutputStream extends EditLog
     return sb.toString();
   }
   
+  @Override
+  public String toString() {
+    return "QuorumOutputStream starting at txid " + segmentTxId;
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Wed Sep 19 18:52:15 2012
@@ -37,18 +37,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ServletUtil;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Wed Sep 19 18:52:15 2012
@@ -28,8 +28,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
-import com.google.common.base.Preconditions;
-
 /**
  * A {@link Storage} implementation for the {@link JournalNode}.
  * 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Sep 19 18:52:15 2012
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.util.Atomi
 import org.apache.hadoop.hdfs.util.BestEffortLongFile;
 import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.SecurityUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -87,6 +88,17 @@ class Journal implements Closeable {
    * number of that writer is stored persistently on disk.
    */
   private PersistentLongFile lastPromisedEpoch;
+
+  /**
+   * Each IPC that comes from a given client contains a serial number
+   * which only increases from the client's perspective. Whenever
+   * we switch epochs, we reset this back to -1. Whenever an IPC
+   * comes from a client, we ensure that it is strictly higher
+   * than any previous IPC. This guards against any bugs in the IPC
+   * layer that would re-order IPCs or cause a stale retry from an old
+   * request to resurface and confuse things.
+   */
+  private long currentEpochIpcSerial = -1;
   
   /**
    * The epoch number of the last writer to actually write a transaction.
@@ -262,13 +274,15 @@ class Journal implements Closeable {
 
     checkFormatted();
     storage.checkConsistentNamespace(nsInfo);
-    
+
+    // Check that the new epoch being proposed is in fact newer than
+    // any other that we've promised. 
     if (epoch <= getLastPromisedEpoch()) {
       throw new IOException("Proposed epoch " + epoch + " <= last promise " +
           getLastPromisedEpoch());
     }
     
-    lastPromisedEpoch.set(epoch);
+    updateLastPromisedEpoch(epoch);
     abortCurSegment();
     
     NewEpochResponseProto.Builder builder =
@@ -283,6 +297,16 @@ class Journal implements Closeable {
     return builder.build();
   }
 
+  private void updateLastPromisedEpoch(long newEpoch) throws IOException {
+    LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
+        " to " + newEpoch + " for client " + Server.getRemoteIp());
+    lastPromisedEpoch.set(newEpoch);
+    
+    // Since we have a new writer, reset the IPC serial - it will start
+    // counting again from 0 for this writer.
+    currentEpochIpcSerial = -1;
+  }
+
   private void abortCurSegment() throws IOException {
     if (curSegment == null) {
       return;
@@ -372,14 +396,19 @@ class Journal implements Closeable {
       throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
           " is less than the last promised epoch " +
           lastPromisedEpoch.get());
+    } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
+      // A newer client has arrived. Fence any previous writers by updating
+      // the promise.
+      updateLastPromisedEpoch(reqInfo.getEpoch());
     }
     
-    // TODO: should other requests check the _exact_ epoch instead of
-    // the <= check? <= should probably only be necessary for the
-    // first calls
-    
-    // TODO: some check on serial number that they only increase from a given
-    // client
+    // Ensure that the IPCs are arriving in-order as expected.
+    checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
+        "IPC serial %s from client %s was not higher than prior highest " +
+        "IPC serial %s", reqInfo.getIpcSerialNumber(),
+        Server.getRemoteIp(),
+        currentEpochIpcSerial);
+    currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
 
     if (reqInfo.hasCommittedTxId()) {
       Preconditions.checkArgument(
@@ -425,6 +454,22 @@ class Journal implements Closeable {
   }
 
   /**
+   * @throws AssertionError if the given expression is not true.
+   * The message of the exception is formatted using the 'msg' and
+   * 'formatArgs' parameters.
+   * 
+   * This should be used in preference to Java's built-in assert in
+   * non-performance-critical paths, where a failure of this invariant
+   * might cause the protocol to lose data. 
+   */
+  private void alwaysAssert(boolean expression, String msg,
+      Object... formatArgs) {
+    if (!expression) {
+      throw new AssertionError(String.format(msg, formatArgs));
+    }
+  }
+  
+  /**
    * Start a new segment at the given txid. The previous segment
    * must have already been finalized.
    */
@@ -466,7 +511,9 @@ class Journal implements Closeable {
     
     long curLastWriterEpoch = lastWriterEpoch.get();
     if (curLastWriterEpoch != reqInfo.getEpoch()) {
-      LOG.info("Recording lastWriterEpoch = " + reqInfo.getEpoch());
+      LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
+          " to " + reqInfo.getEpoch() + " for client " +
+          Server.getRemoteIp());
       lastWriterEpoch.set(reqInfo.getEpoch());
     }
 
@@ -689,9 +736,8 @@ class Journal implements Closeable {
 
     long segmentTxId = segment.getStartTxId();
 
-    // TODO: right now, a recovery of a segment when the log is
-    // completely emtpy (ie startLogSegment() but no txns)
-    // will fail this assertion here, since endTxId < startTxId
+    // Basic sanity checks that the segment is well-formed and contains
+    // at least one transaction.
     Preconditions.checkArgument(segment.getEndTxId() > 0 &&
         segment.getEndTxId() >= segmentTxId,
         "bad recovery state for segment %s: %s",
@@ -702,8 +748,12 @@ class Journal implements Closeable {
         .setAcceptedInEpoch(reqInfo.getEpoch())
         .setSegmentState(segment)
         .build();
+    
+    // If we previously acted on acceptRecovery() from a higher-numbered writer,
+    // this call is out of sync. We should never actually trigger this, since the
+    // checkRequest() call above should filter non-increasing epoch numbers.
     if (oldData != null) {
-      Preconditions.checkState(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
+      alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
           "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
           oldData, newData);
     }
@@ -737,6 +787,12 @@ class Journal implements Closeable {
               committedTxnId.get());
         }
         
+        // Another paranoid check: we should not be asked to synchronize a log
+        // on top of a finalized segment.
+        alwaysAssert(currentSegment.getIsInProgress(),
+            "Should never be asked to synchronize a different log on top of an " +
+            "already-finalized segment");
+        
         // If we're shortening the log, update our highest txid
         // used for lag metrics.
         if (txnRange(currentSegment).contains(highestWrittenTxId)) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Wed Sep 19 18:52:15 2012
@@ -24,7 +24,6 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 
 /**

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Sep 19 18:52:15 2012
@@ -70,11 +70,14 @@ class JournalNodeRpcServer implements QJ
     BlockingService service = QJournalProtocolService
         .newReflectiveBlockingService(translator);
     
-    this.server = RPC.getServer(
-        QJournalProtocolPB.class,
-        service, addr.getHostName(),
-            addr.getPort(), HANDLER_COUNT, false, confCopy,
-            null /*secretManager*/);
+    this.server = new RPC.Builder(confCopy)
+      .setProtocol(QJournalProtocolPB.class)
+      .setInstance(service)
+      .setBindAddress(addr.getHostName())
+      .setPort(addr.getPort())
+      .setNumHandlers(HANDLER_COUNT)
+      .setVerbose(false)
+      .build();
 
     // set service-level authorization security policy
     if (confCopy.getBoolean(

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java Wed Sep 19 18:52:15 2012
@@ -57,7 +57,9 @@ public class PersistentLongFile {
   }
   
   public void set(long newVal) throws IOException {
-    writeFile(file, newVal);
+    if (value != newVal || !loaded) {
+      writeFile(file, newVal);
+    }
     value = newVal;
     loaded = true;
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Wed Sep 19 18:52:15 2012
@@ -211,7 +211,7 @@ public class TestNNWithQJM {
       cluster.getFileSystem().mkdirs(TEST_PATH);
       
       String contents = DFSTestUtil.urlGet(url); 
-      assertTrue(contents.contains("Channel to journal node"));
+      assertTrue(contents.contains("QJM to ["));
       assertTrue(contents.contains("Written txid 2"));
 
       // Stop one JN, do another txn, and make sure it shows as behind

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Wed Sep 19 18:52:15 2012
@@ -22,7 +22,6 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -30,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
 import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
-import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.junit.Test;
@@ -38,7 +36,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java Wed Sep 19 18:52:15 2012
@@ -74,6 +74,8 @@ public class TestQJMWithFaults {
   private static final int SEGMENTS_PER_WRITER = 2;
 
   private static Configuration conf = new Configuration();
+
+
   static {
     // Don't retry connections - it just slows down the tests.
     conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Sep 19 18:52:15 2012
@@ -799,6 +799,13 @@ public class TestQuorumJournalManager {
         "3");
   }
   
+  @Test
+  public void testToString() throws Exception {
+    GenericTestUtils.assertMatches(
+        qjm.toString(),
+        "QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]");
+  }
+  
   
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Sep 19 18:52:15 2012
@@ -177,7 +177,7 @@ public class TestJournal {
     journal.journal(makeRI(2), 1, 1, 2, 
         QJMTestUtil.createTxnData(1, 2));
     journal.finalizeLogSegment(makeRI(3), 1, 2);
-    journal.startLogSegment(makeRI(3), 3);
+    journal.startLogSegment(makeRI(4), 3);
     NewEpochResponseProto resp = journal.newEpoch(FAKE_NSINFO, 2);
     assertEquals(1, resp.getLastSegmentTxId());
   }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1387704&r1=1387703&r2=1387704&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Sep 19 18:52:15 2012
@@ -240,7 +240,6 @@ public class TestJournalNode {
     assertTrue(prep.hasSegmentState());
     
     // accept() should save the accepted value in persistent storage
-    // TODO: should be able to accept without a URL here
     ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
 
     // So another prepare() call from a new epoch would return this value
@@ -326,12 +325,7 @@ public class TestJournalNode {
         " bytes in " + time + "ms");
     float avgRtt = (float)time/(float)numEdits;
     long throughput = ((long)numEdits * editsSize * 1000L)/time;
-    System.err.println("Time per batch: " + avgRtt);
+    System.err.println("Time per batch: " + avgRtt + "ms");
     System.err.println("Throughput: " + throughput + " bytes/sec");
   }
-  
-  // TODO:
-  // - add test that checks formatting behavior
-  // - add test that checks rejects newEpoch if nsinfo doesn't match
-  
 }