You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/05/09 05:47:16 UTC
[33/50] [abbrv] hadoop git commit: HDFS-11448. JN log segment syncing
should support HA upgrade. Contributed by Hanisha Koneru.
HDFS-11448. JN log segment syncing should support HA upgrade. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/07761af3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/07761af3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/07761af3
Branch: refs/heads/HDFS-7240
Commit: 07761af357ef4da791df2972d7d3f049d6011c8d
Parents: 54e2b9e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu May 4 15:57:44 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu May 4 15:57:44 2017 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/qjournal/server/JNStorage.java | 25 ++++-
.../hadoop/hdfs/qjournal/server/Journal.java | 21 +++-
.../hdfs/qjournal/server/JournalNodeSyncer.java | 104 ++++++++++++-------
.../hdfs/qjournal/TestJournalNodeSync.java | 1 +
4 files changed, 105 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 8f40f6b..7226cae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -58,6 +58,8 @@ class JNStorage extends Storage {
private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES =
ImmutableList.of(Pattern.compile("(\\d+)"));
+ private static final String STORAGE_EDITS_SYNC = "edits.sync";
+
/**
* @param conf Configuration object
* @param logDir the path to the directory in which data will be stored
@@ -120,12 +122,29 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(), name);
}
- File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
- return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
+ File getCurrentDir() {
+ return sd.getCurrentDir();
+ }
+
+ /**
+ * Directory {@code edits.sync} temporarily holds the log segments
+ * downloaded through {@link JournalNodeSyncer} before they are moved to
+ * {@code current} directory.
+ *
+ * @return the directory path
+ */
+ File getEditsSyncDir() {
+ return new File(sd.getRoot(), STORAGE_EDITS_SYNC);
+ }
+
+ File getTemporaryEditsFile(long startTxId, long endTxId) {
+ return new File(getEditsSyncDir(), String.format("%s_%019d-%019d",
+ NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
}
File getFinalizedEditsFile(long startTxId, long endTxId) {
- return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
+ return new File(sd.getCurrentDir(), String.format("%s_%019d-%019d",
+ NNStorage.NameNodeFile.EDITS.getName(), startTxId, endTxId));
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index ca21373..0041d5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
@@ -1092,19 +1094,28 @@ public class Journal implements Closeable {
committedTxnId.set(startTxId - 1);
}
- synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
+ synchronized boolean moveTmpSegmentToCurrent(File tmpFile, File finalFile,
long endTxId) throws IOException {
final boolean success;
if (endTxId <= committedTxnId.get()) {
- success = tmpFile.renameTo(finalFile);
- if (!success) {
- LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
+ if (!finalFile.getParentFile().exists()) {
+ LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
+ "segment move to current directory");
+ return false;
+ }
+ Files.move(tmpFile.toPath(), finalFile.toPath(),
+ StandardCopyOption.ATOMIC_MOVE);
+ if (finalFile.exists() && FileUtil.canRead(finalFile)) {
+ success = true;
+ } else {
+ success = false;
+ LOG.warn("Unable to move edits file from " + tmpFile + " to " +
finalFile);
}
} else {
success = false;
LOG.error("The endTxId of the temporary file is not less than the " +
- "last committed transaction id. Aborting renaming to final file" +
+ "last committed transaction id. Aborting move to final file" +
finalFile);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index f195c00..788c5de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,6 +97,11 @@ public class JournalNodeSyncer {
void stopSync() {
shouldSync = false;
+ // Delete the edits.sync directory
+ File editsSyncDir = journal.getStorage().getEditsSyncDir();
+ if (editsSyncDir.exists()) {
+ FileUtil.fullyDelete(editsSyncDir);
+ }
if (syncJournalDaemon != null) {
syncJournalDaemon.interrupt();
}
@@ -112,6 +116,15 @@ public class JournalNodeSyncer {
}
}
+ private boolean createEditsSyncDir() {
+ File editsSyncDir = journal.getStorage().getEditsSyncDir();
+ if (editsSyncDir.exists()) {
+ LOG.info(editsSyncDir + " directory already exists.");
+ return true;
+ }
+ return editsSyncDir.mkdir();
+ }
+
private boolean getOtherJournalNodeProxies() {
List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
@@ -135,35 +148,51 @@ public class JournalNodeSyncer {
}
private void startSyncJournalsDaemon() {
- syncJournalDaemon = new Daemon(new Runnable() {
- @Override
- public void run() {
- while(shouldSync) {
- try {
- if (!journal.isFormatted()) {
- LOG.warn("Journal not formatted. Cannot sync.");
+ syncJournalDaemon = new Daemon(() -> {
+ // Wait for journal to be formatted to create edits.sync directory
+ while(!journal.isFormatted()) {
+ try {
+ Thread.sleep(journalSyncInterval);
+ } catch (InterruptedException e) {
+ LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ if (!createEditsSyncDir()) {
+ LOG.error("Failed to create directory for downloading log " +
+ "segments: %s. Stopping Journal Node Sync.",
+ journal.getStorage().getEditsSyncDir());
+ return;
+ }
+ while(shouldSync) {
+ try {
+ if (!journal.isFormatted()) {
+ LOG.warn("Journal cannot sync. Not formatted.");
+ } else {
+ syncJournals();
+ }
+ Thread.sleep(journalSyncInterval);
+ } catch (Throwable t) {
+ if (!shouldSync) {
+ if (t instanceof InterruptedException) {
+ LOG.info("Stopping JournalNode Sync.");
+ Thread.currentThread().interrupt();
+ return;
} else {
- syncJournals();
+ LOG.warn("JournalNodeSyncer received an exception while " +
+ "shutting down.", t);
}
- Thread.sleep(journalSyncInterval);
- } catch (Throwable t) {
- if (!shouldSync) {
- if (t instanceof InterruptedException) {
- LOG.info("Stopping JournalNode Sync.");
- } else {
- LOG.warn("JournalNodeSyncer received an exception while " +
- "shutting down.", t);
- }
- break;
- } else {
- if (t instanceof InterruptedException) {
- LOG.warn("JournalNodeSyncer interrupted", t);
- break;
- }
+ break;
+ } else {
+ if (t instanceof InterruptedException) {
+ LOG.warn("JournalNodeSyncer interrupted", t);
+ Thread.currentThread().interrupt();
+ return;
}
- LOG.error(
- "JournalNodeSyncer daemon received Runtime exception. ", t);
}
+ LOG.error(
+ "JournalNodeSyncer daemon received Runtime exception. ", t);
}
}
});
@@ -335,8 +364,8 @@ public class JournalNodeSyncer {
/**
* Transfer an edit log from one journal node to another for sync-up.
*/
- private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
- IOException {
+ private boolean downloadMissingLogSegment(URL url, RemoteEditLog log)
+ throws IOException {
LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
.getRoot());
@@ -350,9 +379,10 @@ public class JournalNodeSyncer {
return true;
}
- final long milliTime = Time.monotonicNow();
- File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
- .getEndTxId(), milliTime);
+ // Download the log segment to current.tmp directory first.
+ File tmpEditsFile = jnStorage.getTemporaryEditsFile(
+ log.getStartTxId(), log.getEndTxId());
+
try {
Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
logSegmentTransferTimeout, throttler);
@@ -367,14 +397,12 @@ public class JournalNodeSyncer {
LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
tmpEditsFile.length() + " bytes.");
- LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
- + finalEditsFile.getName());
- boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
+ final boolean moveSuccess = journal.moveTmpSegmentToCurrent(tmpEditsFile,
finalEditsFile, log.getEndTxId());
- if (!renameSuccess) {
- //If rename is not successful, delete the tmpFile
- LOG.debug("Renaming unsuccessful. Deleting temporary file: "
- + tmpEditsFile);
+ if (!moveSuccess) {
+ // If move is not successful, delete the tmpFile
+ LOG.debug("Move to current directory unsuccessful. Deleting temporary " +
+ "file: " + tmpEditsFile);
if (!tmpEditsFile.delete()) {
LOG.warn("Deleting " + tmpEditsFile + " has failed");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/07761af3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
index 5375b02..8415a6f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
@@ -57,6 +57,7 @@ public class TestJournalNodeSync {
@Before
public void setUpMiniCluster() throws IOException {
final Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org