You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/02/24 19:34:34 UTC
[07/50] [abbrv] hadoop git commit: HDFS-4025. QJM: Sychronize past
log segments to JNs that missed them. Contributed by Hanisha Koneru.
HDFS-4025. QJM: Sychronize past log segments to JNs that missed them. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13d4bcfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13d4bcfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13d4bcfe
Branch: refs/heads/YARN-2915
Commit: 13d4bcfe3535a2df79c2a56e7578716d15497ff4
Parents: b10e962
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Feb 22 16:33:38 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Feb 22 16:33:38 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 16 +
.../qjournal/client/QuorumJournalManager.java | 38 +-
.../hadoop/hdfs/qjournal/server/JNStorage.java | 9 +-
.../hadoop/hdfs/qjournal/server/Journal.java | 19 +
.../hdfs/qjournal/server/JournalNode.java | 23 +-
.../hdfs/qjournal/server/JournalNodeSyncer.java | 413 +++++++++++++++++++
.../hadoop/hdfs/server/common/Storage.java | 9 +
.../apache/hadoop/hdfs/server/common/Util.java | 46 ++-
.../hadoop/hdfs/server/namenode/NNStorage.java | 5 +-
.../hdfs/server/namenode/TransferFsImage.java | 3 +-
.../src/main/resources/hdfs-default.xml | 41 ++
.../hdfs/qjournal/MiniJournalCluster.java | 8 +
.../hadoop/hdfs/qjournal/MiniQJMHACluster.java | 1 +
.../hdfs/qjournal/TestJournalNodeSync.java | 264 ++++++++++++
14 files changed, 853 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cf1d21a..cfd16aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -707,6 +707,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
+ // Edit Log segment transfer timeout
+ public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY =
+ "dfs.edit.log.transfer.timeout";
+ public static final int DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT = 30 * 1000;
+
+ // Throttling Edit Log Segment transfer for Journal Sync
+ public static final String DFS_EDIT_LOG_TRANSFER_RATE_KEY =
+ "dfs.edit.log.transfer.bandwidthPerSec";
+ public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling
+
// Datanode File IO Stats
public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY =
"dfs.datanode.enable.fileio.profiling";
@@ -891,6 +901,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
public static final String DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY = "dfs.journalnode.kerberos.principal";
public static final String DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+ public static final String DFS_JOURNALNODE_ENABLE_SYNC_KEY =
+ "dfs.journalnode.enable.sync";
+ public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT = false;
+ public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
+ "dfs.journalnode.sync.interval";
+ public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
// Journal-node related configs for the client side.
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index ae3358b..97c0050 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@@ -51,8 +51,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -362,41 +360,17 @@ public class QuorumJournalManager implements JournalManager {
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
- List<InetSocketAddress> addrs = getLoggerAddresses(uri);
+ List<InetSocketAddress> addrs = Util.getAddressesList(uri);
+ if (addrs.size() % 2 == 0) {
+ LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
+ "of Journal Nodes specified. This is not recommended!");
+ }
String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) {
ret.add(factory.createLogger(conf, nsInfo, jid, addr));
}
return ret;
}
-
- private static List<InetSocketAddress> getLoggerAddresses(URI uri)
- throws IOException {
- String authority = uri.getAuthority();
- Preconditions.checkArgument(authority != null && !authority.isEmpty(),
- "URI has no authority: " + uri);
-
- String[] parts = StringUtils.split(authority, ';');
- for (int i = 0; i < parts.length; i++) {
- parts[i] = parts[i].trim();
- }
-
- if (parts.length % 2 == 0) {
- LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
- "of Journal Nodes specified. This is not recommended!");
- }
-
- List<InetSocketAddress> addrs = Lists.newArrayList();
- for (String addr : parts) {
- InetSocketAddress isa = NetUtils.createSocketAddr(
- addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
- if (isa.isUnresolved()) {
- throw new UnknownHostException(addr);
- }
- addrs.add(isa);
- }
- return addrs;
- }
@Override
public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 07c9286..8f40f6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -49,7 +49,6 @@ class JNStorage extends Storage {
private final FileJournalManager fjm;
private final StorageDirectory sd;
private StorageState state;
-
private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
ImmutableList.of(
@@ -121,6 +120,14 @@ class JNStorage extends Storage {
return new File(sd.getCurrentDir(), name);
}
+ File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) {
+ return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp);
+ }
+
+ File getFinalizedEditsFile(long startTxId, long endTxId) {
+ return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId);
+ }
+
/**
* @return the path for the file which contains persisted data for the
* paxos-like recovery process for the given log segment.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index 3760641..ca21373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -1092,6 +1092,25 @@ public class Journal implements Closeable {
committedTxnId.set(startTxId - 1);
}
+ synchronized boolean renameTmpSegment(File tmpFile, File finalFile,
+ long endTxId) throws IOException {
+ final boolean success;
+ if (endTxId <= committedTxnId.get()) {
+ success = tmpFile.renameTo(finalFile);
+ if (!success) {
+ LOG.warn("Unable to rename edits file from " + tmpFile + " to " +
+ finalFile);
+ }
+ } else {
+ success = false;
+ LOG.error("The endTxId of the temporary file is not less than the " +
+ "last committed transaction id. Aborting renaming to final file" +
+ finalFile);
+ }
+
+ return success;
+ }
+
public Long getJournalCTime() throws IOException {
return storage.getJournalManager().getJournalCTime();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index cde0112..42e9be7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -68,6 +68,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
private JournalNodeRpcServer rpcServer;
private JournalNodeHttpServer httpServer;
private final Map<String, Journal> journalsById = Maps.newHashMap();
+ private final Map<String, JournalNodeSyncer> journalSyncersById = Maps
+ .newHashMap();
private ObjectName journalNodeInfoBeanName;
private String httpServerURI;
private File localDir;
@@ -92,11 +94,24 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
LOG.info("Initializing journal in directory " + logDir);
journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
journalsById.put(jid, journal);
+
+ // Start SyncJouranl thread, if JournalNode Sync is enabled
+ if (conf.getBoolean(
+ DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) {
+ startSyncer(journal, jid);
+ }
}
-
+
return journal;
}
+ private void startSyncer(Journal journal, String jid) {
+ JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf);
+ journalSyncersById.put(jid, jSyncer);
+ jSyncer.start();
+ }
+
@VisibleForTesting
public Journal getOrCreateJournal(String jid) throws IOException {
return getOrCreateJournal(jid, StartupOption.REGULAR);
@@ -190,7 +205,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
*/
public void stop(int rc) {
this.resultCode = rc;
-
+
+ for (JournalNodeSyncer jSyncer : journalSyncersById.values()) {
+ jSyncer.stopSync();
+ }
+
if (rpcServer != null) {
rpcServer.stop();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
new file mode 100644
index 0000000..f195c00
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+ .JournalIdProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+ .GetEditLogManifestRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos
+ .GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * A Journal Sync thread runs through the lifetime of the JN. It periodically
+ * gossips with other journal nodes to compare edit log manifests and if it
+ * detects any missing log segment, it downloads it from the other journal node
+ */
+@InterfaceAudience.Private
+public class JournalNodeSyncer {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ JournalNodeSyncer.class);
+ private final JournalNode jn;
+ private final Journal journal;
+ private final String jid;
+ private final JournalIdProto jidProto;
+ private final JNStorage jnStorage;
+ private final Configuration conf;
+ private volatile Daemon syncJournalDaemon;
+ private volatile boolean shouldSync = true;
+
+ private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList();
+ private int numOtherJNs;
+ private int journalNodeIndexForSync = 0;
+ private final long journalSyncInterval;
+ private final int logSegmentTransferTimeout;
+ private final DataTransferThrottler throttler;
+
+ JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
+ Configuration conf) {
+ this.jn = jouranlNode;
+ this.journal = journal;
+ this.jid = jid;
+ this.jidProto = convertJournalId(this.jid);
+ this.jnStorage = journal.getStorage();
+ this.conf = conf;
+ journalSyncInterval = conf.getLong(
+ DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT);
+ logSegmentTransferTimeout = conf.getInt(
+ DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
+ DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
+ throttler = getThrottler(conf);
+ }
+
+ void stopSync() {
+ shouldSync = false;
+ if (syncJournalDaemon != null) {
+ syncJournalDaemon.interrupt();
+ }
+ }
+
+ public void start() {
+ LOG.info("Starting SyncJournal daemon for journal " + jid);
+ if (getOtherJournalNodeProxies()) {
+ startSyncJournalsDaemon();
+ } else {
+ LOG.warn("Failed to start SyncJournal daemon for journal " + jid);
+ }
+ }
+
+ private boolean getOtherJournalNodeProxies() {
+ List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs();
+ if (otherJournalNodes == null || otherJournalNodes.isEmpty()) {
+ LOG.warn("Other JournalNode addresses not available. Journal Syncing " +
+ "cannot be done");
+ return false;
+ }
+ for (InetSocketAddress addr : otherJournalNodes) {
+ try {
+ otherJNProxies.add(new JournalNodeProxy(addr));
+ } catch (IOException e) {
+ LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
+ }
+ }
+ if (otherJNProxies.isEmpty()) {
+ LOG.error("Cannot sync as there is no other JN available for sync.");
+ return false;
+ }
+ numOtherJNs = otherJNProxies.size();
+ return true;
+ }
+
+ private void startSyncJournalsDaemon() {
+ syncJournalDaemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ while(shouldSync) {
+ try {
+ if (!journal.isFormatted()) {
+ LOG.warn("Journal not formatted. Cannot sync.");
+ } else {
+ syncJournals();
+ }
+ Thread.sleep(journalSyncInterval);
+ } catch (Throwable t) {
+ if (!shouldSync) {
+ if (t instanceof InterruptedException) {
+ LOG.info("Stopping JournalNode Sync.");
+ } else {
+ LOG.warn("JournalNodeSyncer received an exception while " +
+ "shutting down.", t);
+ }
+ break;
+ } else {
+ if (t instanceof InterruptedException) {
+ LOG.warn("JournalNodeSyncer interrupted", t);
+ break;
+ }
+ }
+ LOG.error(
+ "JournalNodeSyncer daemon received Runtime exception. ", t);
+ }
+ }
+ }
+ });
+ syncJournalDaemon.start();
+ }
+
+ private void syncJournals() {
+ syncWithJournalAtIndex(journalNodeIndexForSync);
+ journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
+ }
+
+ private void syncWithJournalAtIndex(int index) {
+ LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ + jn.getBoundIpcAddress().getPort() + " with "
+ + otherJNProxies.get(index) + ", journal id: " + jid);
+ final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy;
+ if (jnProxy == null) {
+ LOG.error("JournalNode Proxy not found.");
+ return;
+ }
+
+ List<RemoteEditLog> thisJournalEditLogs;
+ try {
+ thisJournalEditLogs = journal.getEditLogManifest(0, false).getLogs();
+ } catch (IOException e) {
+ LOG.error("Exception in getting local edit log manifest", e);
+ return;
+ }
+
+ GetEditLogManifestResponseProto editLogManifest;
+ try {
+ editLogManifest = jnProxy.getEditLogManifest(null,
+ GetEditLogManifestRequestProto.newBuilder().setJid(jidProto)
+ .setSinceTxId(0)
+ .setInProgressOk(false).build());
+ } catch (ServiceException e) {
+ LOG.error("Could not sync with Journal at " +
+ otherJNProxies.get(journalNodeIndexForSync), e);
+ return;
+ }
+
+ getMissingLogSegments(thisJournalEditLogs, editLogManifest,
+ otherJNProxies.get(index));
+ }
+
+ private List<InetSocketAddress> getOtherJournalNodeAddrs() {
+ URI uri = null;
+ try {
+ String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+ if (uriStr == null || uriStr.isEmpty()) {
+ LOG.warn("Could not construct Shared Edits Uri");
+ return null;
+ }
+ uri = new URI(uriStr);
+ return Util.getLoggerAddresses(uri,
+ Sets.newHashSet(jn.getBoundIpcAddress()));
+ } catch (URISyntaxException e) {
+ LOG.error("The conf property " + DFSConfigKeys
+ .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly.");
+ } catch (IOException e) {
+ LOG.error("Could not parse JournalNode addresses: " + uri);
+ }
+ return null;
+ }
+
+ private JournalIdProto convertJournalId(String journalId) {
+ return QJournalProtocolProtos.JournalIdProto.newBuilder()
+ .setIdentifier(journalId)
+ .build();
+ }
+
+ private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
+ GetEditLogManifestResponseProto response,
+ JournalNodeProxy remoteJNproxy) {
+
+ List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
+ response.getManifest()).getLogs();
+ if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
+ LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs");
+ return;
+ }
+ List<RemoteEditLog> missingLogs = getMissingLogList(thisJournalEditLogs,
+ otherJournalEditLogs);
+
+ if (!missingLogs.isEmpty()) {
+ NamespaceInfo nsInfo = jnStorage.getNamespaceInfo();
+
+ for (RemoteEditLog missingLog : missingLogs) {
+ URL url = null;
+ boolean success = false;
+ try {
+ if (remoteJNproxy.httpServerUrl == null) {
+ if (response.hasFromURL()) {
+ URI uri = URI.create(response.getFromURL());
+ remoteJNproxy.httpServerUrl = getHttpServerURI(uri.getScheme(),
+ uri.getHost(), uri.getPort());
+ } else {
+ remoteJNproxy.httpServerUrl = getHttpServerURI("http",
+ remoteJNproxy.jnAddr.getHostName(), response.getHttpPort());
+ }
+ }
+
+ String urlPath = GetJournalEditServlet.buildPath(jid, missingLog
+ .getStartTxId(), nsInfo);
+ url = new URL(remoteJNproxy.httpServerUrl, urlPath);
+ success = downloadMissingLogSegment(url, missingLog);
+ } catch (MalformedURLException e) {
+ LOG.error("MalformedURL when download missing log segment", e);
+ } catch (Exception e) {
+ LOG.error("Exception in downloading missing log segment from url " +
+ url, e);
+ }
+ if (!success) {
+ LOG.error("Aborting current sync attempt.");
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the logs present in otherJournalEditLogs and missing from
+ * thisJournalEditLogs.
+ */
+ private List<RemoteEditLog> getMissingLogList(
+ List<RemoteEditLog> thisJournalEditLogs,
+ List<RemoteEditLog> otherJournalEditLogs) {
+ if (thisJournalEditLogs.isEmpty()) {
+ return otherJournalEditLogs;
+ }
+
+ List<RemoteEditLog> missingEditLogs = Lists.newArrayList();
+
+ int thisJnIndex = 0, otherJnIndex = 0;
+ int thisJnNumLogs = thisJournalEditLogs.size();
+ int otherJnNumLogs = otherJournalEditLogs.size();
+
+ while (thisJnIndex < thisJnNumLogs && otherJnIndex < otherJnNumLogs) {
+ long localJNstartTxId = thisJournalEditLogs.get(thisJnIndex)
+ .getStartTxId();
+ long remoteJNstartTxId = otherJournalEditLogs.get(otherJnIndex)
+ .getStartTxId();
+
+ if (localJNstartTxId == remoteJNstartTxId) {
+ thisJnIndex++;
+ otherJnIndex++;
+ } else if (localJNstartTxId > remoteJNstartTxId) {
+ missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
+ otherJnIndex++;
+ } else {
+ thisJnIndex++;
+ }
+ }
+
+ if (otherJnIndex < otherJnNumLogs) {
+ for (; otherJnIndex < otherJnNumLogs; otherJnIndex++) {
+ missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex));
+ }
+ }
+
+ return missingEditLogs;
+ }
+
+ private URL getHttpServerURI(String scheme, String hostname, int port)
+ throws MalformedURLException {
+ return new URL(scheme, hostname, port, "");
+ }
+
+ /**
+ * Transfer an edit log from one journal node to another for sync-up.
+ */
+ private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws
+ IOException {
+ LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage
+ .getRoot());
+
+ assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log;
+ File finalEditsFile = jnStorage.getFinalizedEditsFile(log.getStartTxId(),
+ log.getEndTxId());
+
+ if (finalEditsFile.exists() && FileUtil.canRead(finalEditsFile)) {
+ LOG.info("Skipping download of remote edit log " + log + " since it's" +
+ " already stored locally at " + finalEditsFile);
+ return true;
+ }
+
+ final long milliTime = Time.monotonicNow();
+ File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log
+ .getEndTxId(), milliTime);
+ try {
+ Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false,
+ logSegmentTransferTimeout, throttler);
+ } catch (IOException e) {
+ LOG.error("Download of Edit Log file for Syncing failed. Deleting temp " +
+ "file: " + tmpEditsFile);
+ if (!tmpEditsFile.delete()) {
+ LOG.warn("Deleting " + tmpEditsFile + " has failed");
+ }
+ return false;
+ }
+ LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " +
+ tmpEditsFile.length() + " bytes.");
+
+ LOG.debug("Renaming " + tmpEditsFile.getName() + " to "
+ + finalEditsFile.getName());
+ boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile,
+ finalEditsFile, log.getEndTxId());
+ if (!renameSuccess) {
+ //If rename is not successful, delete the tmpFile
+ LOG.debug("Renaming unsuccessful. Deleting temporary file: "
+ + tmpEditsFile);
+ if (!tmpEditsFile.delete()) {
+ LOG.warn("Deleting " + tmpEditsFile + " has failed");
+ }
+ return false;
+ }
+ return true;
+ }
+
+ private static DataTransferThrottler getThrottler(Configuration conf) {
+ long transferBandwidth =
+ conf.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY,
+ DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT);
+ DataTransferThrottler throttler = null;
+ if (transferBandwidth > 0) {
+ throttler = new DataTransferThrottler(transferBandwidth);
+ }
+ return throttler;
+ }
+
+ private class JournalNodeProxy {
+ private final InetSocketAddress jnAddr;
+ private final QJournalProtocolPB jnProxy;
+ private URL httpServerUrl;
+
+ JournalNodeProxy(InetSocketAddress jnAddr) throws IOException {
+ this.jnAddr = jnAddr;
+ this.jnProxy = RPC.getProxy(QJournalProtocolPB.class,
+ RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf);
+ }
+
+ @Override
+ public String toString() {
+ return jnAddr.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 1af7877..4493772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.ToolRunner;
@@ -1010,6 +1011,14 @@ public abstract class Storage extends StorageInfo {
return false;
}
+ public NamespaceInfo getNamespaceInfo() {
+ return new NamespaceInfo(
+ getNamespaceID(),
+ getClusterID(),
+ null,
+ getCTime());
+ }
+
/**
* Return true if the layout of the given storage directory is from a version
* of Hadoop prior to the introduction of the "current" and "previous"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
index f08c3fa..9c67f0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -22,9 +22,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.net.UnknownHostException;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.ArrayList;
@@ -32,18 +34,23 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
@@ -143,7 +150,8 @@ public final class Util {
* storage.
*/
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
- Storage dstStorage, boolean getChecksum, int timeout) throws IOException {
+ Storage dstStorage, boolean getChecksum, int timeout,
+ DataTransferThrottler throttler) throws IOException {
HttpURLConnection connection;
try {
connection = (HttpURLConnection)
@@ -176,7 +184,7 @@ public final class Util {
return receiveFile(url.toExternalForm(), localPaths, dstStorage,
getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
- null);
+ throttler);
}
/**
@@ -268,7 +276,7 @@ public final class Util {
long xferKb = received / 1024;
xferCombined += xferSec;
xferStats.append(
- String.format(" The fsimage download took %.2fs at %.2f KB/s.",
+ String.format(" The file download took %.2fs at %.2f KB/s.",
xferSec, xferKb / xferSec));
} finally {
stream.close();
@@ -301,7 +309,7 @@ public final class Util {
advertisedSize);
}
}
- xferStats.insert(0, String.format("Combined time for fsimage download and" +
+ xferStats.insert(0, String.format("Combined time for file download and" +
" fsync to all disks took %.2fs.", xferCombined));
LOG.info(xferStats.toString());
@@ -350,4 +358,34 @@ public final class Util {
String header = connection.getHeaderField(MD5_HEADER);
return (header != null) ? new MD5Hash(header) : null;
}
+
+ public static List<InetSocketAddress> getAddressesList(URI uri)
+ throws IOException{
+ String authority = uri.getAuthority();
+ Preconditions.checkArgument(authority != null && !authority.isEmpty(),
+ "URI has no authority: " + uri);
+
+ String[] parts = StringUtils.split(authority, ';');
+ for (int i = 0; i < parts.length; i++) {
+ parts[i] = parts[i].trim();
+ }
+
+ List<InetSocketAddress> addrs = Lists.newArrayList();
+ for (String addr : parts) {
+ InetSocketAddress isa = NetUtils.createSocketAddr(
+ addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
+ if (isa.isUnresolved()) {
+ throw new UnknownHostException(addr);
+ }
+ addrs.add(isa);
+ }
+ return addrs;
+ }
+
+ public static List<InetSocketAddress> getLoggerAddresses(URI uri,
+ Set<InetSocketAddress> addrsToExclude) throws IOException {
+ List<InetSocketAddress> addrsList = getAddressesList(uri);
+ addrsList.removeAll(addrsToExclude);
+ return addrsList;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index c79ba4a..63d1a28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -763,13 +763,13 @@ public class NNStorage extends Storage implements Closeable,
return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId));
}
- static File getFinalizedEditsFile(StorageDirectory sd,
+ public static File getFinalizedEditsFile(StorageDirectory sd,
long startTxId, long endTxId) {
return new File(sd.getCurrentDir(),
getFinalizedEditsFileName(startTxId, endTxId));
}
- static File getTemporaryEditsFile(StorageDirectory sd,
+ public static File getTemporaryEditsFile(StorageDirectory sd,
long startTxId, long endTxId, long timestamp) {
return new File(sd.getCurrentDir(),
getTemporaryEditsFileName(startTxId, endTxId, timestamp));
@@ -1106,6 +1106,7 @@ public class NNStorage extends Storage implements Closeable,
return inspector;
}
+ @Override
public NamespaceInfo getNamespaceInfo() {
return new NamespaceInfo(
getNamespaceID(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 5821353..7316414 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -401,7 +401,8 @@ public class TransferFsImage {
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
Storage dstStorage, boolean getChecksum) throws IOException {
- return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout);
+ return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout,
+ null);
}
private static MD5Hash parseMD5Header(HttpServletRequest request) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 03f1a08..652b216 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1279,6 +1279,26 @@
</property>
<property>
+ <name>dfs.edit.log.transfer.timeout</name>
+ <value>30000</value>
+ <description>
+ Socket timeout for edit log transfer in milliseconds. This timeout
+ should be configured such that normal edit log transfer for journal
+ node syncing can complete successfully.
+ </description>
+</property>
+
+<property>
+ <name>dfs.edit.log.transfer.bandwidthPerSec</name>
+ <value>0</value>
+ <description>
+ Maximum bandwidth used for transferring edit log to between journal nodes
+ for syncing, in bytes per second.
+ A default value of 0 indicates that throttling is disabled.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.support.allow.format</name>
<value>true</value>
<description>Does HDFS namenode allow itself to be formatted?
@@ -3785,6 +3805,27 @@
</property>
<property>
+ <name>dfs.journalnode.enable.sync</name>
+ <value>true</value>
+ <description>
+ If true, the journal nodes wil sync with each other. The journal nodes
+ will periodically gossip with other journal nodes to compare edit log
+ manifests and if they detect any missing log segment, they will download
+ it from the other journal nodes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.journalnode.sync.interval</name>
+ <value>120000</value>
+ <description>
+ Time interval, in milliseconds, between two Journal Node syncs.
+ This configuration takes effect only if the journalnode sync is enabled
+ by setting the configuration parameter dfs.journalnode.enable.sync to true.
+ </description>
+</property>
+
+<property>
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index 7b974c3..2314e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -255,4 +255,12 @@ public class MiniJournalCluster {
}
}
}
+
+ public void setNamenodeSharedEditsConf(String jid) {
+ URI quorumJournalURI = getQuorumJournalURI(jid);
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i].node.getConf().set(DFSConfigKeys
+ .DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index 0764f12..c163894 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -101,6 +101,7 @@ public class MiniQJMHACluster {
journalCluster = new MiniJournalCluster.Builder(conf).format(true)
.build();
journalCluster.waitActive();
+ journalCluster.setNamenodeSharedEditsConf(NAMESERVICE);
URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE);
// start cluster with specified NameNodes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
new file mode 100644
index 0000000..5375b02
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
+ .getLogFile;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Unit test for Journal Node formatting upon re-installation and syncing.
+ */
+public class TestJournalNodeSync {
+ private MiniQJMHACluster qjmhaCluster;
+ private MiniDFSCluster dfsCluster;
+ private MiniJournalCluster jCluster;
+ private FileSystem fs;
+ private FSNamesystem namesystem;
+ private int editsPerformed = 0;
+ private final String jid = "ns1";
+
+ @Before
+ public void setUpMiniCluster() throws IOException {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2)
+ .build();
+ dfsCluster = qjmhaCluster.getDfsCluster();
+ jCluster = qjmhaCluster.getJournalCluster();
+
+ dfsCluster.transitionToActive(0);
+ fs = dfsCluster.getFileSystem(0);
+ namesystem = dfsCluster.getNamesystem(0);
+ }
+
+ @After
+ public void shutDownMiniCluster() throws IOException {
+ if (qjmhaCluster != null) {
+ qjmhaCluster.shutdown();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testJournalNodeSync() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete one.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+
+ File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId);
+
+ GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)),
+ 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testSyncForMultipleMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete two.
+ long firstTxId = generateEditLog();
+ long nextTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testSyncForDiscontinuousMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete two discontinuous logs.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+ long nextTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=30000)
+ public void testMultipleJournalsMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ StorageDirectory sd = new StorageDirectory(secondJournalDir);
+ File secondJournalCurrentDir = sd.getCurrentDir();
+
+ // Generate some edit logs and delete one log from two journals.
+ long firstTxId = generateEditLog();
+ generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000);
+ }
+
+ @Test(timeout=60000)
+ public void testMultipleJournalsMultipleMissingLogs() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ File secondJournalDir = jCluster.getJournalDir(1, jid);
+ File secondJournalCurrentDir = new StorageDirectory(secondJournalDir)
+ .getCurrentDir();
+
+ File thirdJournalDir = jCluster.getJournalDir(2, jid);
+ File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs and delete multiple logs in multiple journals.
+ long firstTxId = generateEditLog();
+ long secondTxId = generateEditLog();
+ long thirdTxId = generateEditLog();
+
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId));
+ missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId));
+ missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId));
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ // Test JournalNode Sync by randomly deleting edit logs from one or two of
+ // the journals.
+ @Test(timeout=60000)
+ public void testRandomJournalMissingLogs() throws Exception {
+ Random randomJournal = new Random();
+
+ List<File> journalCurrentDirs = Lists.newArrayList();
+
+ for (int i = 0; i < 3; i++) {
+ journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i,
+ jid)).getCurrentDir());
+ }
+
+ int count = 0;
+ long lastStartTxId;
+ int journalIndex;
+ List<File> missingLogs = Lists.newArrayList();
+ while (count < 5) {
+ lastStartTxId = generateEditLog();
+
+ // Delete the last edit log segment from randomly selected journal node
+ journalIndex = randomJournal.nextInt(3);
+ missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+ lastStartTxId));
+
+ // Delete the last edit log segment from two journals for some logs
+ if (count % 2 == 0) {
+ journalIndex = (journalIndex + 1) % 3;
+ missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex),
+ lastStartTxId));
+ }
+
+ count++;
+ }
+
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
+ private File deleteEditLog(File currentDir, long startTxId)
+ throws IOException {
+ EditLogFile logFile = getLogFile(currentDir, startTxId);
+ while (logFile.isInProgress()) {
+ dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+ logFile = getLogFile(currentDir, startTxId);
+ }
+ File deleteFile = logFile.getFile();
+ Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete());
+
+ return deleteFile;
+ }
+
+ /**
+ * Do a mutative metadata operation on the file system.
+ *
+ * @return true if the operation was successful, false otherwise.
+ */
+ private boolean doAnEdit() throws IOException {
+ return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
+ }
+
+ /**
+ * Does an edit and rolls the Edit Log.
+ *
+ * @return the startTxId of next segment after rolling edits.
+ */
+ private long generateEditLog() throws IOException {
+ long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId();
+ Assert.assertTrue("Failed to do an edit", doAnEdit());
+ dfsCluster.getNameNode(0).getRpcServer().rollEditLog();
+ return startTxId;
+ }
+
+ private Supplier<Boolean> editLogExists(List<File> editLogs) {
+ Supplier<Boolean> supplier = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ for (File editLog : editLogs) {
+ if (!editLog.exists()) {
+ return false;
+ }
+ }
+ return true;
+ }
+ };
+ return supplier;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org