You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:47:16 UTC

[33/50] [abbrv] hadoop git commit: HDFS-11448. JN log segment syncing should support HA upgrade. Contributed by Hanisha Koneru.

HDFS-11448. JN log segment syncing should support HA upgrade. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07761af3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07761af3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07761af3

Branch: refs/heads/HDFS-7240
Commit: 07761af357ef4da791df2972d7d3f049d6011c8d
Parents: 54e2b9e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu May 4 15:57:44 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu May 4 15:57:44 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/qjournal/server/JNStorage.java  |  25 ++++-
 .../hadoop/hdfs/qjournal/server/Journal.java    |  21 +++-
 .../hdfs/qjournal/server/JournalNodeSyncer.java | 104 ++++++++++++-------
 .../hdfs/qjournal/TestJournalNodeSync.java      |   1 +
 4 files changed, 105 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 8f40f6b..7226cae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -58,6 +58,8 @@ class JNStorage extends Storage {
   private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES = 
       ImmutableList.of(Pattern.compile("(\\d+)"));
 
+  private static final String STORAGE_EDITS_SYNC = "edits.sync";
+
   /**
    * @param conf Configuration object
    * @param logDir the path to the directory in which data will be stored
@@ -120,12 +122,29 @@ class JNStorage extends Storage {
     return new File(sd.getCurrentDir(), name);
   }
 
-  File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
-    return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
+  File getCurrentDir() {
+    return sd.getCurrentDir();
+  }
+
+  /**
+   * Directory {@code edits.sync} temporarily holds the log segments
+   * downloaded through {@link JournalNodeSyncer} before they are moved to
+   * {@code current} directory.
+   *
+   * @return the directory path
+   */
+  File getEditsSyncDir() {
+    return new File(sd.getRoot(), STORAGE_EDITS_SYNC);
+  }
+
+  File getTemporaryEditsFile(long startTxId, long endTxId) {
+    return new File(getEditsSyncDir(), String.format("%s_%019d-%019d",
+            NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
   }
 
   File getFinalizedEditsFile(long startTxId, long endTxId) {
-    return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
+    return new File(sd.getCurrentDir(), String.format("%s_%019d-%019d",
+            NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index ca21373..0041d5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
 import java.util.Iterator;
 import java.util.List;
@@ -1092,19 +1094,28 @@ public class Journal implements Closeable {
     committedTxnId.set(startTxId - 1);
   }
 
-  synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
+  synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
       long endTxId) throws IOException {
     final boolean success;
     if (endTxId <= committedTxnId.get()) {
-      success = tmpFile.renameTo(finalFile);
-      if (!success) {
-        LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
+      if (!finalFile.getParentFile().exists()) {
+        LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
+            "segment move to current directory");
+        return false;
+      }
+      Files.move(tmpFile.toPath(), finalFile.toPath(),
+          StandardCopyOption.ATOMIC_MOVE);
+      if (finalFile.exists() && FileUtil.canRead(finalFile)) {
+        success = true;
+      } else {
+        success = false;
+        LOG.warn("Unable to move edits file from " + tmpFile + " to " +
             finalFile);
       }
     } else {
       success = false;
       LOG.error("The endTxId of the temporary file is not less than the " +
-          "last committed transaction id. Aborting renaming to final file" +
+          "last committed transaction id. Aborting move to final file" +
           finalFile);
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index f195c00..788c5de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,6 +97,11 @@ public class JournalNodeSyncer {
 
   void stopSync() {
     shouldSync = false;
+    // Delete the edits.sync directory
+    File editsSyncDir = journal.getStorage().getEditsSyncDir();
+    if (editsSyncDir.exists()) {
+      FileUtil.fullyDelete(editsSyncDir);
+    }
     if (syncJournalDaemon != null) {
       syncJournalDaemon.interrupt();
     }
@@ -112,6 +116,15 @@ public class JournalNodeSyncer {
     }
   }
 
+  private boolean createEditsSyncDir() {
+    File editsSyncDir = journal.getStorage().getEditsSyncDir();
+    if (editsSyncDir.exists()) {
+      LOG.info(editsSyncDir + " directory already exists.");
+      return true;
+    }
+    return editsSyncDir.mkdir();
+  }
+
   private boolean getOtherJournalNodeProxies() {
     List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
     if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
@@ -135,35 +148,51 @@ public class JournalNodeSyncer {
   }
 
   private void startSyncJournalsDaemon() {
-    syncJournalDaemon = new Daemon(new Runnable() {
-      @Override
-      public void run() {
-        while(shouldSync) {
-          try {
-            if (!journal.isFormatted()) {
-              LOG.warn("Journal not formatted. Cannot sync.");
+    syncJournalDaemon = new Daemon(() -> {
+      // Wait for journal to be formatted to create edits.sync directory
+      while(!journal.isFormatted()) {
+        try {
+          Thread.sleep(journalSyncInterval);
+        } catch (InterruptedException e) {
+          LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
+          Thread.currentThread().interrupt();
+          return;
+        }
+      }
+      if (!createEditsSyncDir()) {
+        LOG.error("Failed to create directory for downloading log " +
+                "segments: %s. Stopping Journal Node Sync.",
+            journal.getStorage().getEditsSyncDir());
+        return;
+      }
+      while(shouldSync) {
+        try {
+          if (!journal.isFormatted()) {
+            LOG.warn("Journal cannot sync. Not formatted.");
+          } else {
+            syncJournals();
+          }
+          Thread.sleep(journalSyncInterval);
+        } catch (Throwable t) {
+          if (!shouldSync) {
+            if (t instanceof InterruptedException) {
+              LOG.info("Stopping JournalNode Sync.");
+              Thread.currentThread().interrupt();
+              return;
             } else {
-              syncJournals();
+              LOG.warn("JournalNodeSyncer received an exception while " +
+                  "shutting down.", t);
             }
-            Thread.sleep(journalSyncInterval);
-          } catch (Throwable t) {
-            if (!shouldSync) {
-              if (t instanceof InterruptedException) {
-                LOG.info("Stopping JournalNode Sync.");
-              } else {
-                LOG.warn("JournalNodeSyncer received an exception while " +
-                    "shutting down.", t);
-              }
-              break;
-            } else {
-              if (t instanceof InterruptedException) {
-                LOG.warn("JournalNodeSyncer interrupted", t);
-                break;
-              }
+            break;
+          } else {
+            if (t instanceof InterruptedException) {
+              LOG.warn("JournalNodeSyncer interrupted", t);
+              Thread.currentThread().interrupt();
+              return;
             }
-            LOG.error(
-                "JournalNodeSyncer daemon received Runtime exception. ", t);
           }
+          LOG.error(
+              "JournalNodeSyncer daemon received Runtime exception. ", t);
         }
       }
     });
@@ -335,8 +364,8 @@ public class JournalNodeSyncer {
   /**
    * Transfer an edit log from one journal node to another for sync-up.
    */
-  private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
-      IOException {
+  private boolean downloadMissingLogSegment(URL url, RemoteEditLog log)
+      throws IOException {
     LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
         .getRoot());
 
@@ -350,9 +379,10 @@ public class JournalNodeSyncer {
       return true;
     }
 
-    final long milliTime = Time.monotonicNow();
-    File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
-        .getEndTxId(), milliTime);
+    // Download the log segment to current.tmp directory first.
+    File tmpEditsFile = jnStorage.getTemporaryEditsFile(
+        log.getStartTxId(), log.getEndTxId());
+
     try {
       Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
           logSegmentTransferTimeout, throttler);
@@ -367,14 +397,12 @@ public class JournalNodeSyncer {
     LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
         tmpEditsFile.length() + " bytes.");
 
-    LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
-        + finalEditsFile.getName());
-    boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
+    final boolean moveSuccess = journal.moveTmpSegmentToCurrent(tmpEditsFile,
         finalEditsFile, log.getEndTxId());
-    if (!renameSuccess) {
-      //If rename is not successful, delete the tmpFile
-      LOG.debug("Renaming unsuccessful. Deleting temporary file: "
-          + tmpEditsFile);
+    if (!moveSuccess) {
+      // If move is not successful, delete the tmpFile
+      LOG.debug("Move to current directory unsuccessful. Deleting temporary " +
+          "file: " + tmpEditsFile);
       if (!tmpEditsFile.delete()) {
         LOG.warn("Deleting " + tmpEditsFile + " has failed");
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
index 5375b02..8415a6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
@@ -57,6 +57,7 @@ public class TestJournalNodeSync {
   @Before
   public void setUpMiniCluster() throws IOException {
     final Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
     qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
       .build();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org