You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/02/24 19:34:34 UTC

[07/50] [abbrv] hadoop git commit: HDFS-4025. QJM: Sychronize past log segments to JNs that missed them. Contributed by Hanisha Koneru.

HDFS-4025. QJM: Sychronize past log segments to JNs that missed them. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13d4bcfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13d4bcfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13d4bcfe

Branch: refs/heads/YARN-2915
Commit: 13d4bcfe3535a2df79c2a56e7578716d15497ff4
Parents: b10e962
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Feb 22 16:33:38 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Feb 22 16:33:38 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  16 +
 .../qjournal/client/QuorumJournalManager.java   |  38 +-
 .../hadoop/hdfs/qjournal/server/JNStorage.java  |   9 +-
 .../hadoop/hdfs/qjournal/server/Journal.java    |  19 +
 .../hdfs/qjournal/server/JournalNode.java       |  23 +-
 .../hdfs/qjournal/server/JournalNodeSyncer.java | 413 +++++++++++++++++++
 .../hadoop/hdfs/server/common/Storage.java      |   9 +
 .../apache/hadoop/hdfs/server/common/Util.java  |  46 ++-
 .../hadoop/hdfs/server/namenode/NNStorage.java  |   5 +-
 .../hdfs/server/namenode/TransferFsImage.java   |   3 +-
 .../src/main/resources/hdfs-default.xml         |  41 ++
 .../hdfs/qjournal/MiniJournalCluster.java       |   8 +
 .../hadoop/hdfs/qjournal/MiniQJMHACluster.java  |   1 +
 .../hdfs/qjournal/TestJournalNodeSync.java      | 264 ++++++++++++
 14 files changed, 853 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cf1d21a..cfd16aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -707,6 +707,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
   public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
 
+  // Edit Log segment transfer timeout
+  public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
+      "dfs.edit.log.transfer.timeout";
+  public static final int DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT = 30 * 1000;
+
+  // Throttling Edit Log Segment transfer for Journal Sync
+  public static final String DFS_EDIT_LOG_TRANSFER_RATE_KEY =
+      "dfs.edit.log.transfer.bandwidthPerSec";
+  public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling
+
   // Datanode File IO Stats
   public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY =
       "dfs.datanode.enable.fileio.profiling";
@@ -891,6 +901,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
   public static final String  DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY = "dfs.journalnode.kerberos.principal";
   public static final String  DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+  public static final String DFS_JOURNALNODE_ENABLE_SYNC_KEY =
+      "dfs.journalnode.enable.sync";
+  public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT = false;
+  public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
+      "dfs.journalnode.sync.interval";
+  public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
 
   // Journal-node related configs for the client side.
   public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index ae3358b..97c0050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URL;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@@ -51,8 +51,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -362,41 +360,17 @@ public class QuorumJournalManager implements JournalManager {
       URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
           throws IOException {
     List<AsyncLogger> ret = Lists.newArrayList();
-    List<InetSocketAddress> addrs = getLoggerAddresses(uri);
+    List<InetSocketAddress> addrs = Util.getAddressesList(uri);
+    if (addrs.size() % 2 == 0) {
+      LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
+          "of Journal Nodes specified. This is not recommended!");
+    }
     String jid = parseJournalId(uri);
     for (InetSocketAddress addr : addrs) {
       ret.add(factory.createLogger(conf, nsInfo, jid, addr));
     }
     return ret;
   }
- 
-  private static List<InetSocketAddress> getLoggerAddresses(URI uri)
-      throws IOException {
-    String authority = uri.getAuthority();
-    Preconditions.checkArgument(authority != null && !authority.isEmpty(),
-        "URI has no authority: " + uri);
-    
-    String[] parts = StringUtils.split(authority, ';');
-    for (int i = 0; i < parts.length; i++) {
-      parts[i] = parts[i].trim();
-    }
-
-    if (parts.length % 2 == 0) {
-      LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
-          "of Journal Nodes specified. This is not recommended!");
-    }
-    
-    List<InetSocketAddress> addrs = Lists.newArrayList();
-    for (String addr : parts) {
-      InetSocketAddress isa = NetUtils.createSocketAddr(
-          addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
-      if (isa.isUnresolved()) {
-        throw new UnknownHostException(addr);
-      }
-      addrs.add(isa);
-    }
-    return addrs;
-  }
   
   @Override
   public EditLogOutputStream startLogSegment(long txId, int layoutVersion)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 07c9286..8f40f6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -49,7 +49,6 @@ class JNStorage extends Storage {
   private final FileJournalManager fjm;
   private final StorageDirectory sd;
   private StorageState state;
-  
 
   private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
       ImmutableList.of(
@@ -121,6 +120,14 @@ class JNStorage extends Storage {
     return new File(sd.getCurrentDir(), name);
   }
 
+  File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
+    return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
+  }
+
+  File getFinalizedEditsFile(long startTxId, long endTxId) {
+    return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
+  }
+
   /**
    * @return the path for the file which contains persisted data for the
    * paxos-like recovery process for the given log segment.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 3760641..ca21373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -1092,6 +1092,25 @@ public class Journal implements Closeable {
     committedTxnId.set(startTxId - 1);
   }
 
+  synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
+      long endTxId) throws IOException {
+    final boolean success;
+    if (endTxId <= committedTxnId.get()) {
+      success = tmpFile.renameTo(finalFile);
+      if (!success) {
+        LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
+            finalFile);
+      }
+    } else {
+      success = false;
+      LOG.error("The endTxId of the temporary file is not less than the " +
+          "last committed transaction id. Aborting renaming to final file" +
+          finalFile);
+    }
+
+    return success;
+  }
+
   public Long getJournalCTime() throws IOException {
     return storage.getJournalManager().getJournalCTime();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index cde0112..42e9be7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -68,6 +68,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
   private JournalNodeRpcServer rpcServer;
   private JournalNodeHttpServer httpServer;
   private final Map<String, Journal> journalsById = Maps.newHashMap();
+  private final Map<String, JournalNodeSyncer> journalSyncersById = Maps
+      .newHashMap();
   private ObjectName journalNodeInfoBeanName;
   private String httpServerURI;
   private File localDir;
@@ -92,11 +94,24 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
       LOG.info("Initializing journal in directory " + logDir);      
       journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
       journalsById.put(jid, journal);
+
+      // Start SyncJouranl thread, if JournalNode Sync is enabled
+      if (conf.getBoolean(
+          DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
+          DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) {
+        startSyncer(journal, jid);
+      }
     }
-    
+
     return journal;
   }
 
+  private void startSyncer(Journal journal, String jid) {
+    JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf);
+    journalSyncersById.put(jid, jSyncer);
+    jSyncer.start();
+  }
+
   @VisibleForTesting
   public Journal getOrCreateJournal(String jid) throws IOException {
     return getOrCreateJournal(jid, StartupOption.REGULAR);
@@ -190,7 +205,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
    */
   public void stop(int rc) {
     this.resultCode = rc;
-    
+
+    for (JournalNodeSyncer jSyncer : journalSyncersById.values()) {
+      jSyncer.stopSync();
+    }
+
     if (rpcServer != null) { 
       rpcServer.stop();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
new file mode 100644
index 0000000..f195c00
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+  .JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+  .GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+  .GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * A Journal Sync thread runs through the lifetime of the JN. It periodically
+ * gossips with other journal nodes to compare edit log manifests and if it
+ * detects any missing log segment, it downloads it from the other journal node
+ */
+@InterfaceAudience.Private
+public class JournalNodeSyncer {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      JournalNodeSyncer.class);
+  private final JournalNode jn;
+  private final Journal journal;
+  private final String jid;
+  private final JournalIdProto jidProto;
+  private final JNStorage jnStorage;
+  private final Configuration conf;
+  private volatile Daemon syncJournalDaemon;
+  private volatile boolean shouldSync = true;
+
+  private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList();
+  private int numOtherJNs;
+  private int journalNodeIndexForSync = 0;
+  private final long journalSyncInterval;
+  private final int logSegmentTransferTimeout;
+  private final DataTransferThrottler throttler;
+
+  JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
+      Configuration conf) {
+    this.jn = jouranlNode;
+    this.journal = journal;
+    this.jid = jid;
+    this.jidProto = convertJournalId(this.jid);
+    this.jnStorage = journal.getStorage();
+    this.conf = conf;
+    journalSyncInterval = conf.getLong(
+        DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT);
+    logSegmentTransferTimeout = conf.getInt(
+        DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
+    throttler = getThrottler(conf);
+  }
+
+  void stopSync() {
+    shouldSync = false;
+    if (syncJournalDaemon != null) {
+      syncJournalDaemon.interrupt();
+    }
+  }
+
+  public void start() {
+    LOG.info("Starting SyncJournal daemon for journal " + jid);
+    if (getOtherJournalNodeProxies()) {
+      startSyncJournalsDaemon();
+    } else {
+      LOG.warn("Failed to start SyncJournal daemon for journal " + jid);
+    }
+  }
+
+  private boolean getOtherJournalNodeProxies() {
+    List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
+    if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
+      LOG.warn("Other JournalNode addresses not available. Journal Syncing " +
+          "cannot be done");
+      return false;
+    }
+    for (InetSocketAddress addr : otherJournalNodes) {
+      try {
+        otherJNProxies.add(new JournalNodeProxy(addr));
+      } catch (IOException e) {
+        LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
+      }
+    }
+    if (otherJNProxies.isEmpty()) {
+      LOG.error("Cannot sync as there is no other JN available for sync.");
+      return false;
+    }
+    numOtherJNs = otherJNProxies.size();
+    return true;
+  }
+
+  private void startSyncJournalsDaemon() {
+    syncJournalDaemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        while(shouldSync) {
+          try {
+            if (!journal.isFormatted()) {
+              LOG.warn("Journal not formatted. Cannot sync.");
+            } else {
+              syncJournals();
+            }
+            Thread.sleep(journalSyncInterval);
+          } catch (Throwable t) {
+            if (!shouldSync) {
+              if (t instanceof InterruptedException) {
+                LOG.info("Stopping JournalNode Sync.");
+              } else {
+                LOG.warn("JournalNodeSyncer received an exception while " +
+                    "shutting down.", t);
+              }
+              break;
+            } else {
+              if (t instanceof InterruptedException) {
+                LOG.warn("JournalNodeSyncer interrupted", t);
+                break;
+              }
+            }
+            LOG.error(
+                "JournalNodeSyncer daemon received Runtime exception. ", t);
+          }
+        }
+      }
+    });
+    syncJournalDaemon.start();
+  }
+
+  private void syncJournals() {
+    syncWithJournalAtIndex(journalNodeIndexForSync);
+    journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
+  }
+
+  private void syncWithJournalAtIndex(int index) {
+    LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+        + jn.getBoundIpcAddress().getPort() + " with "
+        + otherJNProxies.get(index) + ", journal id: " + jid);
+    final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy;
+    if (jnProxy == null) {
+      LOG.error("JournalNode Proxy not found.");
+      return;
+    }
+
+    List<RemoteEditLog> thisJournalEditLogs;
+    try {
+      thisJournalEditLogs = journal.getEditLogManifest(0, false).getLogs();
+    } catch (IOException e) {
+      LOG.error("Exception in getting local edit log manifest", e);
+      return;
+    }
+
+    GetEditLogManifestResponseProto editLogManifest;
+    try {
+      editLogManifest = jnProxy.getEditLogManifest(null,
+          GetEditLogManifestRequestProto.newBuilder().setJid(jidProto)
+              .setSinceTxId(0)
+              .setInProgressOk(false).build());
+    } catch (ServiceException e) {
+      LOG.error("Could not sync with Journal at " +
+          otherJNProxies.get(journalNodeIndexForSync), e);
+      return;
+    }
+
+    getMissingLogSegments(thisJournalEditLogs, editLogManifest,
+        otherJNProxies.get(index));
+  }
+
+  private List<InetSocketAddress> getOtherJournalNodeAddrs() {
+    URI uri = null;
+    try {
+      String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+      if (uriStr == null || uriStr.isEmpty()) {
+        LOG.warn("Could not construct Shared Edits Uri");
+        return null;
+      }
+      uri = new URI(uriStr);
+      return Util.getLoggerAddresses(uri,
+          Sets.newHashSet(jn.getBoundIpcAddress()));
+    } catch (URISyntaxException e) {
+      LOG.error("The conf property " + DFSConfigKeys
+          .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly.");
+    } catch (IOException e) {
+      LOG.error("Could not parse JournalNode addresses: " + uri);
+    }
+    return null;
+  }
+
+  private JournalIdProto convertJournalId(String journalId) {
+    return QJournalProtocolProtos.JournalIdProto.newBuilder()
+      .setIdentifier(journalId)
+      .build();
+  }
+
+  private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
+      GetEditLogManifestResponseProto response,
+      JournalNodeProxy remoteJNproxy) {
+
+    List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
+        response.getManifest()).getLogs();
+    if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
+      LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs");
+      return;
+    }
+    List<RemoteEditLog> missingLogs = getMissingLogList(thisJournalEditLogs,
+        otherJournalEditLogs);
+
+    if (!missingLogs.isEmpty()) {
+      NamespaceInfo nsInfo = jnStorage.getNamespaceInfo();
+
+      for (RemoteEditLog missingLog : missingLogs) {
+        URL url = null;
+        boolean success = false;
+        try {
+          if (remoteJNproxy.httpServerUrl == null) {
+            if (response.hasFromURL()) {
+              URI uri = URI.create(response.getFromURL());
+              remoteJNproxy.httpServerUrl = getHttpServerURI(uri.getScheme(),
+                  uri.getHost(), uri.getPort());
+            } else {
+              remoteJNproxy.httpServerUrl = getHttpServerURI("http",
+                  remoteJNproxy.jnAddr.getHostName(), response.getHttpPort());
+            }
+          }
+
+          String urlPath = GetJournalEditServlet.buildPath(jid, missingLog
+              .getStartTxId(), nsInfo);
+          url = new URL(remoteJNproxy.httpServerUrl, urlPath);
+          success = downloadMissingLogSegment(url, missingLog);
+        } catch (MalformedURLException e) {
+          LOG.error("MalformedURL when download missing log segment", e);
+        } catch (Exception e) {
+          LOG.error("Exception in downloading missing log segment from url " +
+              url, e);
+        }
+        if (!success) {
+          LOG.error("Aborting current sync attempt.");
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   *  Returns the logs present in otherJournalEditLogs and missing from
+   *  thisJournalEditLogs.
+   */
+  private List<RemoteEditLog> getMissingLogList(
+      List<RemoteEditLog> thisJournalEditLogs,
+      List<RemoteEditLog> otherJournalEditLogs) {
+    if (thisJournalEditLogs.isEmpty()) {
+      return otherJournalEditLogs;
+    }
+
+    List<RemoteEditLog> missingEditLogs = Lists.newArrayList();
+
+    int thisJnIndex = 0, otherJnIndex = 0;
+    int thisJnNumLogs = thisJournalEditLogs.size();
+    int otherJnNumLogs = otherJournalEditLogs.size();
+
+    while (thisJnIndex < thisJnNumLogs && otherJnIndex < otherJnNumLogs) {
+      long localJNstartTxId = thisJournalEditLogs.get(thisJnIndex)
+          .getStartTxId();
+      long remoteJNstartTxId = otherJournalEditLogs.get(otherJnIndex)
+          .getStartTxId();
+
+      if (localJNstartTxId == remoteJNstartTxId) {
+        thisJnIndex++;
+        otherJnIndex++;
+      } else if (localJNstartTxId > remoteJNstartTxId) {
+        missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
+        otherJnIndex++;
+      } else {
+        thisJnIndex++;
+      }
+    }
+
+    if (otherJnIndex < otherJnNumLogs) {
+      for (; otherJnIndex < otherJnNumLogs; otherJnIndex++) {
+        missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
+      }
+    }
+
+    return missingEditLogs;
+  }
+
+  private URL getHttpServerURI(String scheme, String hostname, int port)
+    throws MalformedURLException {
+    return new URL(scheme, hostname, port, "");
+  }
+
+  /**
+   * Transfer an edit log from one journal node to another for sync-up.
+   */
+  private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
+      IOException {
+    LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
+        .getRoot());
+
+    assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log;
+    File finalEditsFile = jnStorage.getFinalizedEditsFile(log.getStartTxId(),
+        log.getEndTxId());
+
+    if (finalEditsFile.exists() && FileUtil.canRead(finalEditsFile)) {
+      LOG.info("Skipping download of remote edit log " + log + " since it's" +
+          " already stored locally at " + finalEditsFile);
+      return true;
+    }
+
+    final long milliTime = Time.monotonicNow();
+    File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
+        .getEndTxId(), milliTime);
+    try {
+      Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
+          logSegmentTransferTimeout, throttler);
+    } catch (IOException e) {
+      LOG.error("Download of Edit Log file for Syncing failed. Deleting temp " +
+          "file: " + tmpEditsFile);
+      if (!tmpEditsFile.delete()) {
+        LOG.warn("Deleting " + tmpEditsFile + " has failed");
+      }
+      return false;
+    }
+    LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
+        tmpEditsFile.length() + " bytes.");
+
+    LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
+        + finalEditsFile.getName());
+    boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
+        finalEditsFile, log.getEndTxId());
+    if (!renameSuccess) {
+      //If rename is not successful, delete the tmpFile
+      LOG.debug("Renaming unsuccessful. Deleting temporary file: "
+          + tmpEditsFile);
+      if (!tmpEditsFile.delete()) {
+        LOG.warn("Deleting " + tmpEditsFile + " has failed");
+      }
+      return false;
+    }
+    return true;
+  }
+
+  private static DataTransferThrottler getThrottler(Configuration conf) {
+    long transferBandwidth =
+        conf.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY,
+            DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT);
+    DataTransferThrottler throttler = null;
+    if (transferBandwidth > 0) {
+      throttler = new DataTransferThrottler(transferBandwidth);
+    }
+    return throttler;
+  }
+
+  private class JournalNodeProxy {
+    private final InetSocketAddress jnAddr;
+    private final QJournalProtocolPB jnProxy;
+    private URL httpServerUrl;
+
+    JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
+      this.jnAddr = jnAddr;
+      this.jnProxy = RPC.getProxy(QJournalProtocolPB.class,
+          RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf);
+    }
+
+    @Override
+    public String toString() {
+      return jnAddr.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 1af7877..4493772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.util.ToolRunner;
@@ -1010,6 +1011,14 @@ public abstract class Storage extends StorageInfo {
     return false;
   }
 
+  public NamespaceInfo getNamespaceInfo() {
+    return new NamespaceInfo(
+        getNamespaceID(),
+        getClusterID(),
+        null,
+        getCTime());
+  }
+
   /**
    * Return true if the layout of the given storage directory is from a version
    * of Hadoop prior to the introduction of the "current" and "previous"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
index f08c3fa..9c67f0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -22,9 +22,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.net.UnknownHostException;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.util.ArrayList;
@@ -32,18 +34,23 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 
@@ -143,7 +150,8 @@ public final class Util {
    * storage.
    */
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
-      Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
+      Storage dstStorage, boolean getChecksum, int timeout,
+      DataTransferThrottler throttler) throws IOException {
     HttpURLConnection connection;
     try {
       connection = (HttpURLConnection)
@@ -176,7 +184,7 @@ public final class Util {
 
     return receiveFile(url.toExternalForm(), localPaths, dstStorage,
         getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
-        null);
+        throttler);
   }
 
   /**
@@ -268,7 +276,7 @@ public final class Util {
       long xferKb = received / 1024;
       xferCombined += xferSec;
       xferStats.append(
-          String.format(" The fsimage download took %.2fs at %.2f KB/s.",
+          String.format(" The file download took %.2fs at %.2f KB/s.",
               xferSec, xferKb / xferSec));
     } finally {
       stream.close();
@@ -301,7 +309,7 @@ public final class Util {
             advertisedSize);
       }
     }
-    xferStats.insert(0, String.format("Combined time for fsimage download and" +
+    xferStats.insert(0, String.format("Combined time for file download and" +
         " fsync to all disks took %.2fs.", xferCombined));
     LOG.info(xferStats.toString());
 
@@ -350,4 +358,34 @@ public final class Util {
     String header = connection.getHeaderField(MD5_HEADER);
     return (header != null) ? new MD5Hash(header) : null;
   }
+
+  public static List<InetSocketAddress> getAddressesList(URI uri)
+      throws IOException{
+    String authority = uri.getAuthority();
+    Preconditions.checkArgument(authority != null && !authority.isEmpty(),
+        "URI has no authority: " + uri);
+
+    String[] parts = StringUtils.split(authority, ';');
+    for (int i = 0; i < parts.length; i++) {
+      parts[i] = parts[i].trim();
+    }
+
+    List<InetSocketAddress> addrs = Lists.newArrayList();
+    for (String addr : parts) {
+      InetSocketAddress isa = NetUtils.createSocketAddr(
+          addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
+      if (isa.isUnresolved()) {
+        throw new UnknownHostException(addr);
+      }
+      addrs.add(isa);
+    }
+    return addrs;
+  }
+
+  public static List<InetSocketAddress> getLoggerAddresses(URI uri,
+      Set<InetSocketAddress> addrsToExclude) throws IOException {
+    List<InetSocketAddress> addrsList = getAddressesList(uri);
+    addrsList.removeAll(addrsToExclude);
+    return addrsList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index c79ba4a..63d1a28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -763,13 +763,13 @@ public class NNStorage extends Storage implements Closeable,
     return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId));
   }
   
-  static File getFinalizedEditsFile(StorageDirectory sd,
+  public static File getFinalizedEditsFile(StorageDirectory sd,
       long startTxId, long endTxId) {
     return new File(sd.getCurrentDir(),
         getFinalizedEditsFileName(startTxId, endTxId));
   }
 
-  static File getTemporaryEditsFile(StorageDirectory sd,
+  public static File getTemporaryEditsFile(StorageDirectory sd,
       long startTxId, long endTxId, long timestamp) {
     return new File(sd.getCurrentDir(),
         getTemporaryEditsFileName(startTxId, endTxId, timestamp));
@@ -1106,6 +1106,7 @@ public class NNStorage extends Storage implements Closeable,
     return inspector;
   }
 
+  @Override
   public NamespaceInfo getNamespaceInfo() {
     return new NamespaceInfo(
         getNamespaceID(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 5821353..7316414 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -401,7 +401,8 @@ public class TransferFsImage {
   
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
+    return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout,
+        null);
   }
 
   private static MD5Hash parseMD5Header(HttpServletRequest request) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 03f1a08..652b216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1279,6 +1279,26 @@
 </property>
 
 <property>
+  <name>dfs.edit.log.transfer.timeout</name>
+  <value>30000</value>
+  <description>
+    Socket timeout for edit log transfer in milliseconds. This timeout
+    should be configured such that normal edit log transfer for journal
+    node syncing can complete successfully.
+  </description>
+</property>
+
+<property>
+  <name>dfs.edit.log.transfer.bandwidthPerSec</name>
+  <value>0</value>
+  <description>
+    Maximum bandwidth used for transferring edit log to between journal nodes
+    for syncing, in bytes per second.
+    A default value of 0 indicates that throttling is disabled.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.support.allow.format</name>
   <value>true</value>
   <description>Does HDFS namenode allow itself to be formatted?
@@ -3785,6 +3805,27 @@
 </property>
 
 <property>
+  <name>dfs.journalnode.enable.sync</name>
+  <value>true</value>
+  <description>
+    If true, the journal nodes wil sync with each other. The journal nodes
+    will periodically gossip with other journal nodes to compare edit log
+    manifests and if they detect any missing log segment, they will download
+    it from the other journal nodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.journalnode.sync.interval</name>
+  <value>120000</value>
+  <description>
+    Time interval, in milliseconds, between two Journal Node syncs.
+    This configuration takes effect only if the journalnode sync is enabled
+    by setting the configuration parameter dfs.journalnode.enable.sync to true.
+  </description>
+</property>
+
+<property>
   <name>dfs.journalnode.kerberos.internal.spnego.principal</name>
   <value></value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index 7b974c3..2314e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -255,4 +255,12 @@ public class MiniJournalCluster {
       }
     }
   }
+
+  public void setNamenodeSharedEditsConf(String jid) {
+    URI quorumJournalURI = getQuorumJournalURI(jid);
+    for (int i = 0; i < nodes.length; i++) {
+      nodes[i].node.getConf().set(DFSConfigKeys
+          .DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index 0764f12..c163894 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -101,6 +101,7 @@ public class MiniQJMHACluster {
         journalCluster = new MiniJournalCluster.Builder(conf).format(true)
             .build();
         journalCluster.waitActive();
+        journalCluster.setNamenodeSharedEditsConf(NAMESERVICE);
         URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
 
         // start cluster with specified NameNodes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
new file mode 100644
index 0000000..5375b02
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
+    .getLogFile;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit test for Journal Node formatting upon re-installation and syncing.
+ */
+public class TestJournalNodeSync {
+  private MiniQJMHACluster qjmhaCluster;
+  private MiniDFSCluster dfsCluster;
+  private MiniJournalCluster jCluster;
+  private FileSystem fs;
+  private FSNamesystem namesystem;
+  private int editsPerformed = 0;
+  private final String jid = "ns1";
+
+  @Before
+  public void setUpMiniCluster() throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+    qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
+      .build();
+    dfsCluster = qjmhaCluster.getDfsCluster();
+    jCluster = qjmhaCluster.getJournalCluster();
+
+    dfsCluster.transitionToActive(0);
+    fs = dfsCluster.getFileSystem(0);
+    namesystem = dfsCluster.getNamesystem(0);
+  }
+
+  @After
+  public void shutDownMiniCluster() throws IOException {
+    if (qjmhaCluster != null) {
+      qjmhaCluster.shutdown();
+    }
+  }
+
+  @Test(timeout=30000)
+  public void testJournalNodeSync() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete one.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+
+    File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
+
+    GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
+        500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testSyncForMultipleMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete two.
+    long firstTxId = generateEditLog();
+    long nextTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testSyncForDiscontinuousMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete two discontinuous logs.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+    long nextTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=30000)
+  public void testMultipleJournalsMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    StorageDirectory sd = new StorageDirectory(secondJournalDir);
+    File secondJournalCurrentDir = sd.getCurrentDir();
+
+    // Generate some edit logs and delete one log from two journals.
+    long firstTxId = generateEditLog();
+    generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+  }
+
+  @Test(timeout=60000)
+  public void testMultipleJournalsMultipleMissingLogs() throws Exception {
+    File firstJournalDir = jCluster.getJournalDir(0, jid);
+    File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+        .getCurrentDir();
+
+    File secondJournalDir = jCluster.getJournalDir(1, jid);
+    File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+        .getCurrentDir();
+
+    File thirdJournalDir = jCluster.getJournalDir(2, jid);
+    File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
+        .getCurrentDir();
+
+    // Generate some edit logs and delete multiple logs in multiple journals.
+    long firstTxId = generateEditLog();
+    long secondTxId = generateEditLog();
+    long thirdTxId = generateEditLog();
+
+    List<File> missingLogs = Lists.newArrayList();
+    missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+    missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
+    missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  // Test JournalNode Sync by randomly deleting edit logs from one or two of
+  // the journals.
+  @Test(timeout=60000)
+  public void testRandomJournalMissingLogs() throws Exception {
+    Random randomJournal = new Random();
+
+    List<File> journalCurrentDirs = Lists.newArrayList();
+
+    for (int i = 0; i < 3; i++) {
+      journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+          jid)).getCurrentDir());
+    }
+
+    int count = 0;
+    long lastStartTxId;
+    int journalIndex;
+    List<File> missingLogs = Lists.newArrayList();
+    while (count < 5) {
+      lastStartTxId = generateEditLog();
+
+      // Delete the last edit log segment from randomly selected journal node
+      journalIndex = randomJournal.nextInt(3);
+      missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+          lastStartTxId));
+
+      // Delete the last edit log segment from two journals for some logs
+      if (count % 2 == 0) {
+        journalIndex = (journalIndex + 1) % 3;
+        missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+            lastStartTxId));
+      }
+
+      count++;
+    }
+
+    GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+  }
+
+  private File deleteEditLog(File currentDir, long startTxId)
+      throws IOException {
+    EditLogFile logFile = getLogFile(currentDir, startTxId);
+    while (logFile.isInProgress()) {
+      dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+      logFile = getLogFile(currentDir, startTxId);
+    }
+    File deleteFile = logFile.getFile();
+    Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
+
+    return deleteFile;
+  }
+
+  /**
+   * Do a mutative metadata operation on the file system.
+   *
+   * @return true if the operation was successful, false otherwise.
+   */
+  private boolean doAnEdit() throws IOException {
+    return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+  }
+
+  /**
+   * Does an edit and rolls the Edit Log.
+   *
+   * @return the startTxId of next segment after rolling edits.
+   */
+  private long generateEditLog() throws IOException {
+    long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+    Assert.assertTrue("Failed to do an edit", doAnEdit());
+    dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+    return startTxId;
+  }
+
+  private Supplier<Boolean> editLogExists(List<File> editLogs) {
+    Supplier<Boolean> supplier = new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        for (File editLog : editLogs) {
+          if (!editLog.exists()) {
+            return false;
+          }
+        }
+        return true;
+      }
+    };
+    return supplier;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org