You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:16 UTC
[hadoop] 13/50: HDFS-13610. [SBN read] Edit Tail Fast Path Part 4:
Cleanup. Integration test, documentation, remove unnecessary dummy sync,
minors fixups. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit ea402854aa63e10a086460f812afd2c3c9fbf8e8
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri May 11 13:23:38 2018 -0700
HDFS-13610. [SBN read] Edit Tail Fast Path Part 4: Cleanup. Integration test, documentation, remove unnecessary dummy sync, minors fixups. Contributed by Erik Krogen.
---
.../hdfs/qjournal/client/QuorumJournalManager.java | 15 +-
.../hdfs/qjournal/client/QuorumOutputStream.java | 13 +-
.../hadoop/hdfs/qjournal/server/Journal.java | 6 +
.../hdfs/qjournal/server/JournaledEditsCache.java | 63 +++++---
.../site/markdown/HDFSHighAvailabilityWithQJM.md | 28 ++++
.../qjournal/client/TestQuorumJournalManager.java | 4 +
.../client/TestQuorumJournalManagerUnit.java | 2 +
.../hdfs/qjournal/server/JournalTestUtil.java | 48 ++++++
.../namenode/ha/TestStandbyInProgressTail.java | 164 +++++++++++++++++----
9 files changed, 275 insertions(+), 68 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index 91d4995..5525e92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -77,6 +77,8 @@ public class QuorumJournalManager implements JournalManager {
// Maximum number of transactions to fetch at a time when using the
// RPC edit fetch mechanism
private final int maxTxnsPerRpc;
+ // Whether or not in-progress tailing is enabled in the configuration
+ private final boolean inProgressTailingEnabled;
// Timeouts for which the QJM will wait for each of the following actions.
private final int startSegmentTimeoutMs;
private final int prepareRecoveryTimeoutMs;
@@ -139,6 +141,9 @@ public class QuorumJournalManager implements JournalManager {
conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
Preconditions.checkArgument(maxTxnsPerRpc > 0,
"Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
+ this.inProgressTailingEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+ DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
// Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@@ -420,11 +425,8 @@ public class QuorumJournalManager implements JournalManager {
layoutVersion);
loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
"startLogSegment(" + txId + ")");
- boolean updateCommittedTxId = conf.getBoolean(
- DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
- DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
- writeTxnsTimeoutMs, updateCommittedTxId);
+ writeTxnsTimeoutMs);
}
@Override
@@ -493,7 +495,10 @@ public class QuorumJournalManager implements JournalManager {
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk,
boolean onlyDurableTxns) throws IOException {
- if (inProgressOk) {
+ // Some calls will use inProgressOK to get in-progress edits even if
+ // the cache used for RPC calls is not enabled; fall back to using the
+ // streaming mechanism to serve such requests
+ if (inProgressOk && inProgressTailingEnabled) {
LOG.info("Tailing edits starting from txn ID " + fromTxnId +
" via RPC mechanism");
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
index 3ffcd3e..e094b21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
@@ -33,17 +33,15 @@ class QuorumOutputStream extends EditLogOutputStream {
private EditsDoubleBuffer buf;
private final long segmentTxId;
private final int writeTimeoutMs;
- private final boolean updateCommittedTxId;
public QuorumOutputStream(AsyncLoggerSet loggers,
long txId, int outputBufferCapacity,
- int writeTimeoutMs, boolean updateCommittedTxId) throws IOException {
+ int writeTimeoutMs) throws IOException {
super();
this.buf = new EditsDoubleBuffer(outputBufferCapacity);
this.loggers = loggers;
this.segmentTxId = txId;
this.writeTimeoutMs = writeTimeoutMs;
- this.updateCommittedTxId = updateCommittedTxId;
}
@Override
@@ -112,15 +110,6 @@ class QuorumOutputStream extends EditLogOutputStream {
// RPCs will thus let the loggers know of the most recent transaction, even
// if a logger has fallen behind.
loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
-
- // If we don't have this dummy send, committed TxId might be one-batch
- // stale on the Journal Nodes
- if (updateCommittedTxId) {
- QuorumCall<AsyncLogger, Void> fakeCall = loggers.sendEdits(
- segmentTxId, firstTxToFlush,
- 0, new byte[0]);
- loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits");
- }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index f95da45..ffbb682 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -1209,4 +1209,10 @@ public class Journal implements Closeable {
public Long getJournalCTime() throws IOException {
return storage.getJournalManager().getJournalCTime();
}
+
+ @VisibleForTesting
+ JournaledEditsCache getJournaledEditsCache() {
+ return cache;
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
index 1151069..387caa1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.util.AutoCloseableLock;
-
/**
* An in-memory cache of edits in their serialized form. This is used to serve
* the {@link Journal#getJournaledEdits(long, int)} call, used by the
@@ -70,6 +69,9 @@ import org.apache.hadoop.util.AutoCloseableLock;
*/
class JournaledEditsCache {
+ private static final int INVALID_LAYOUT_VERSION = 0;
+ private static final long INVALID_TXN_ID = -1;
+
/** The capacity, in bytes, of this cache. */
private final int capacity;
@@ -91,13 +93,13 @@ class JournaledEditsCache {
*/
private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
/** Stores the layout version currently present in the cache. */
- private int layoutVersion = Integer.MAX_VALUE;
+ private int layoutVersion = INVALID_LAYOUT_VERSION;
/** Stores the serialized version of the header for the current version. */
private ByteBuffer layoutHeader;
/**
- * The lowest/highest transaction IDs present in the cache. -1 if there are no
- * transactions in the cache.
+ * The lowest/highest transaction IDs present in the cache.
+ * {@value INVALID_TXN_ID} if there are no transactions in the cache.
*/
private long lowestTxnId;
private long highestTxnId;
@@ -127,7 +129,7 @@ class JournaledEditsCache {
ReadWriteLock lock = new ReentrantReadWriteLock(true);
readLock = new AutoCloseableLock(lock.readLock());
writeLock = new AutoCloseableLock(lock.writeLock());
- initialize(-1);
+ initialize(INVALID_TXN_ID);
}
/**
@@ -144,6 +146,7 @@ class JournaledEditsCache {
* transaction count of 0 will be returned. If {@code requestedStartTxn} is
* lower than the lowest transaction currently contained in this cache, or no
* transactions have yet been added to the cache, an exception will be thrown.
+ *
* @param requestedStartTxn The ID of the first transaction to return. If any
* transactions are returned, it is guaranteed that
* the first one will have this ID.
@@ -160,7 +163,7 @@ class JournaledEditsCache {
int txnCount = 0;
try (AutoCloseableLock l = readLock.acquire()) {
- if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) {
+ if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
throw getCacheMissException(requestedStartTxn);
} else if (requestedStartTxn > highestTxnId) {
return 0;
@@ -222,6 +225,7 @@ class JournaledEditsCache {
* This attempts to always handle malformed inputs gracefully rather than
* throwing an exception, to allow the rest of the Journal's operations
* to proceed normally.
+ *
* @param inputData A buffer containing edits in serialized form
* @param newStartTxn The txn ID of the first edit in {@code inputData}
* @param newEndTxn The txn ID of the last edit in {@code inputData}
@@ -246,15 +250,16 @@ class JournaledEditsCache {
newStartTxn, newEndTxn, newLayoutVersion), ioe);
return;
}
- }
- if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) {
- // Cache initialization step
- if (lowestTxnId >= 0) {
- // Cache is out of sync; clear to avoid storing noncontiguous regions
- Journal.LOG.error(String.format("Edits cache is out of sync; " +
- "looked for next txn id at %d but got start txn id for " +
- "cache put request at %d", highestTxnId + 1, newStartTxn));
- }
+ } else if (lowestTxnId == INVALID_TXN_ID) {
+ Journal.LOG.info("Initializing edits cache starting from txn ID " +
+ newStartTxn);
+ initialize(newStartTxn);
+ } else if (highestTxnId + 1 != newStartTxn) {
+ // Cache is out of sync; clear to avoid storing noncontiguous regions
+ Journal.LOG.error(String.format("Edits cache is out of sync; " +
+ "looked for next txn id at %d but got start txn id for " +
+ "cache put request at %d. Reinitializing at new request.",
+ highestTxnId + 1, newStartTxn));
initialize(newStartTxn);
}
@@ -264,11 +269,12 @@ class JournaledEditsCache {
totalSize -= lowest.getValue().length;
}
if (inputData.length > capacity) {
- initialize(-1);
+ initialize(INVALID_TXN_ID);
Journal.LOG.warn(String.format("A single batch of edits was too " +
"large to fit into the cache: startTxn = %d, endTxn = %d, " +
"input length = %d. The capacity of the cache (%s) must be " +
- "increased for it to work properly (current capacity %d)",
+ "increased for it to work properly (current capacity %d)." +
+ "Cache is now empty.",
newStartTxn, newEndTxn, inputData.length,
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
return;
@@ -289,6 +295,7 @@ class JournaledEditsCache {
* Skip through a given stream of edits until the given transaction ID is
* found. Return the number of bytes that appear prior to the given
* transaction.
+ *
* @param buf A buffer containing a stream of serialized edits
* @param txnId The transaction ID to search for
* @return The number of bytes appearing in {@code buf} <i>before</i>
@@ -312,13 +319,22 @@ class JournaledEditsCache {
/**
* Update the layout version of the cache. This clears out all existing
* entries, and populates the new layout version and header for that version.
+ *
* @param newLayoutVersion The new layout version to be stored in the cache
* @param newStartTxn The new lowest transaction in the cache
*/
private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
throws IOException {
- Journal.LOG.info("Updating edits cache to use layout version " +
- newLayoutVersion + "; previously was " + layoutVersion);
+ StringBuilder logMsg = new StringBuilder()
+ .append("Updating edits cache to use layout version ")
+ .append(newLayoutVersion)
+ .append(" starting from txn ID ")
+ .append(newStartTxn);
+ if (layoutVersion != INVALID_LAYOUT_VERSION) {
+ logMsg.append("; previous version was ").append(layoutVersion)
+ .append("; old entries will be cleared.");
+ }
+ Journal.LOG.info(logMsg.toString());
initialize(newStartTxn);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
EditLogFileOutputStream.writeHeader(newLayoutVersion,
@@ -329,20 +345,23 @@ class JournaledEditsCache {
/**
* Initialize the cache back to a clear state.
+ *
* @param newInitialTxnId The new lowest transaction ID stored in the cache.
- * -1 if the cache is to remain empty at this time.
+ * This should be {@value INVALID_TXN_ID} if the cache
+ * is to remain empty at this time.
*/
private void initialize(long newInitialTxnId) {
dataMap.clear();
totalSize = 0;
initialTxnId = newInitialTxnId;
lowestTxnId = initialTxnId;
- highestTxnId = -1;
+ highestTxnId = INVALID_TXN_ID; // this will be set later
}
/**
* Return the underlying data buffer used to store information about the
* given transaction ID.
+ *
* @param txnId Transaction ID whose containing buffer should be fetched.
* @return The data buffer for the transaction
*/
@@ -354,7 +373,7 @@ class JournaledEditsCache {
}
private CacheMissException getCacheMissException(long requestedTxnId) {
- if (lowestTxnId < 0) {
+ if (lowestTxnId == INVALID_TXN_ID) {
return new CacheMissException(0, "Cache is empty; either it was never " +
"written to or the last write overflowed the cache capacity.");
} else if (requestedTxnId < initialTxnId) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
index e4363fb..76a9837 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
@@ -431,6 +431,34 @@ http://NN_HOSTNAME/isActive will return a 200 status code response if the NN is
+### In-Progress Edit Log Tailing
+
+Under the default settings, the Standby NameNode will only apply edits that are present in an edit
+log segments which has been finalized. If it is desirable to have a Standby NameNode which has more
+up-to-date namespace information, it is possible to enable tailing of in-progress edit segments.
+This setting will attempt to fetch edits from an in-memory cache on the JournalNodes and can reduce
+the lag time before a transaction is applied on the Standby NameNode to the order of milliseconds.
+If an edit cannot be served from the cache, the Standby will still be able to retrieve it, but the
+lag time will be much longer. The relevant configurations are:
+
+* **dfs.ha.tail-edits.in-progress** - Whether or not to enable tailing on in-progress edits logs.
+ This will also enable the in-memory edit cache on the JournalNodes. Disabled by default.
+
+* **dfs.journalnode.edit-cache-size.bytes** - The size of the in-memory cache of edits on the
+ JournalNode. Edits take around 200 bytes each in a typical environment, so, for example, the
+ default of 1048576 (1MB) can hold around 5000 transactions. It is recommended to monitor the
+ JournalNode metrics RpcRequestCacheMissAmountNumMisses and RpcRequestCacheMissAmountAvgTxns,
+ which respectively count the number of requests unable to be served by the cache, and the extra
+ number of transactions which would have needed to have been in the cache for the request to
+ succeed. For example, if a request attempted to fetch edits starting at transaction ID 10, but
+ the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the
+ average.
+
+This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this
+feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits
+provides these nodes with the ability to serve requests with data which is much more fresh. See the
+Apache JIRA ticket HDFS-12943 for more information on this feature.
+
Automatic Failover
------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index 9f089c9..f3bb954 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -1045,6 +1045,10 @@ public class TestQuorumJournalManager {
qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
writeTxns(stm, 1, 10);
writeTxns(stm, 11, 1);
+ // One last sync whose transactions are not expected to be seen in the
+ // input streams because the JournalNodes have not updated their concept
+ // of the committed transaction ID yet
+ writeTxns(stm, 12, 1);
futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
index 30ef21b..837c7d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
@@ -32,6 +32,7 @@ import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@@ -79,6 +80,7 @@ public class TestQuorumJournalManagerUnit {
mockLogger(),
mockLogger());
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
@Override
protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java
new file mode 100644
index 0000000..de03b2c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+/**
+ * Utilities for testing {@link Journal} instances.
+ */
+public class JournalTestUtil {
+
+ /**
+ * Corrupt the cache of a {@link Journal} to simulate some corrupt entries
+ * being present.
+ *
+ * @param txid The transaction ID whose containing buffer in the cache
+ * should be corrupted.
+ * @param journal The journal whose cache should be corrupted.
+ */
+ public static void corruptJournaledEditsCache(long txid, Journal journal) {
+ JournaledEditsCache cache = journal.getJournaledEditsCache();
+ byte[] buf = cache.getRawDataForTests(txid);
+ // Change a few arbitrary bytes in the buffer
+ for (int i = 0; i < buf.length; i += 9) {
+ buf[i] = 0;
+ }
+ for (int i = 3; i < buf.length; i += 9) {
+ buf[i] += 10;
+ }
+ for (int i = 6; i < buf.length; i += 9) {
+ buf[i] -= 10;
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
index 2bdada4..8394073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URI;
+import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
@@ -30,9 +33,11 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
@@ -43,6 +48,7 @@ import org.junit.Before;
import org.junit.Test;
import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
/**
@@ -64,6 +70,8 @@ public class TestStandbyInProgressTail {
// Set period of tail edits to a large value (20 mins) for test purposes
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+ conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
+ 500);
HAUtil.setAllowStandbyReads(conf, true);
qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
cluster = qjmhaCluster.getDfsCluster();
@@ -179,12 +187,7 @@ public class TestStandbyInProgressTail {
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
FsPermission.createImmutable((short) 0755), true);
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
-
- // After waiting for 5 seconds, StandbyNameNode should finish tailing
- // in-progress logs
- assertNotNull(getFileInfo(cluster.getNameNode(1),
- "/test", true, false, false));
+ waitForFileInfo(nn1, "/test");
// Restarting the standby should not finalize any edits files
// in the shared directory when it starts up!
@@ -227,10 +230,9 @@ public class TestStandbyInProgressTail {
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
FsPermission.createImmutable((short) 0755), true);
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
// StandbyNameNode should tail the in-progress edit
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
+ waitForFileInfo(nn1, "/test");
// Create a new edit and finalized it
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
@@ -238,17 +240,14 @@ public class TestStandbyInProgressTail {
nn0.getRpcServer().rollEditLog();
// StandbyNameNode shouldn't tail the edit since we do not call the method
- assertNull(getFileInfo(nn1, "/test2", true, false, false));
+ waitForFileInfo(nn1, "/test2");
// Create a new in-progress edit and let SBNN do the tail
cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
FsPermission.createImmutable((short) 0755), true);
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
// StandbyNameNode should tail the finalized edit and the new in-progress
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+ waitForFileInfo(nn1, "/test", "/test2", "/test3");
}
@Test
@@ -275,12 +274,8 @@ public class TestStandbyInProgressTail {
assertNull(getFileInfo(nn1, "/test2", true, false, false));
assertNull(getFileInfo(nn1, "/test3", true, false, false));
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
-
- // StandbyNameNode shoudl tail the finalized edit and the new in-progress
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+ // StandbyNameNode should tail the finalized edit and the new in-progress
+ waitForFileInfo(nn1, "/test", "/test2", "/test3");
}
@Test
@@ -295,19 +290,14 @@ public class TestStandbyInProgressTail {
FsPermission.createImmutable((short) 0755), true);
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
FsPermission.createImmutable((short) 0755), true);
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
+ waitForFileInfo(nn1, "/test", "/test2");
nn0.getRpcServer().rollEditLog();
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
FsPermission.createImmutable((short) 0755), true);
- nn1.getNamesystem().getEditLogTailer().doTailEdits();
- // StandbyNameNode shoudl tail the finalized edit and the new in-progress
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
- assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+ // StandbyNameNode should tail the finalized edit and the new in-progress
+ waitForFileInfo(nn1, "/test", "/test2", "/test3");
}
@Test
@@ -325,8 +315,85 @@ public class TestStandbyInProgressTail {
FsPermission.createImmutable((short) 0755), true);
cluster.getNameNode(0).getRpcServer().rollEdits();
- cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
- assertNotNull(getFileInfo(nn1, "/test", true, false, false));
+ waitForFileInfo(nn1, "/test");
+ }
+
+ @Test
+ public void testEditsServedViaCache() throws Exception {
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+
+ mkdirs(nn0, "/test", "/test2");
+ nn0.getRpcServer().rollEditLog();
+ for (int idx = 0; idx < qjmhaCluster.getJournalCluster().getNumNodes();
+ idx++) {
+ File[] startingEditFile = qjmhaCluster.getJournalCluster()
+ .getCurrentDir(idx, DFSUtil.getNamenodeNameServiceId(conf))
+ .listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.matches("edits_0+1-[0-9]+");
+ }
+ });
+ assertNotNull(startingEditFile);
+ assertEquals(1, startingEditFile.length);
+ // Delete this edit file to ensure that edits can't be served via the
+ // streaming mechanism - RPC/cache-based only
+ startingEditFile[0].delete();
+ }
+ // Ensure edits were not tailed before the edit files were deleted;
+ // quick spot check of a single dir
+ assertNull(getFileInfo(nn1, "/tmp0", false, false, false));
+
+ waitForFileInfo(nn1, "/test", "/test2");
+ }
+
+ @Test
+ public void testCorruptJournalCache() throws Exception {
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+
+ // Shut down one JN so there is only a quorum remaining to make it easier
+ // to manage the remaining two
+ qjmhaCluster.getJournalCluster().getJournalNode(0).stopAndJoin(0);
+
+ mkdirs(nn0, "/test", "/test2");
+ JournalTestUtil.corruptJournaledEditsCache(1,
+ qjmhaCluster.getJournalCluster().getJournalNode(1)
+ .getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
+
+ nn0.getRpcServer().rollEditLog();
+
+ waitForFileInfo(nn1, "/test", "/test2");
+
+ mkdirs(nn0, "/test3", "/test4");
+ JournalTestUtil.corruptJournaledEditsCache(3,
+ qjmhaCluster.getJournalCluster().getJournalNode(2)
+ .getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
+
+ waitForFileInfo(nn1, "/test3", "/test4");
+ }
+
+ @Test
+ public void testTailWithoutCache() throws Exception {
+ qjmhaCluster.shutdown();
+ // Effectively disable the cache by setting its size too small to be used
+ conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1);
+ qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+ cluster = qjmhaCluster.getDfsCluster();
+ cluster.transitionToActive(0);
+ cluster.waitActive(0);
+ nn0 = cluster.getNameNode(0);
+ nn1 = cluster.getNameNode(1);
+
+ mkdirs(nn0, "/test", "/test2");
+ nn0.getRpcServer().rollEditLog();
+
+ mkdirs(nn0, "/test3", "/test4");
+
+ // Skip the last directory; the JournalNodes' idea of the committed
+ // txn ID may not have been updated to include it yet
+ waitForFileInfo(nn1, "/test", "/test2", "/test3");
}
/**
@@ -356,4 +423,43 @@ public class TestStandbyInProgressTail {
GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
}
}
+
+ /**
+ * Create the given directories on the provided NameNode.
+ */
+ private static void mkdirs(NameNode nameNode, String... dirNames)
+ throws Exception {
+ for (String dirName : dirNames) {
+ nameNode.getRpcServer().mkdirs(dirName,
+ FsPermission.createImmutable((short) 0755), true);
+ }
+ }
+
+ /**
+ * Wait up to 1 second until the given NameNode is aware of the existing of
+ * all of the provided fileNames.
+ */
+ private static void waitForFileInfo(NameNode standbyNN, String... fileNames)
+ throws Exception {
+ List<String> remainingFiles = Lists.newArrayList(fileNames);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ standbyNN.getNamesystem().getEditLogTailer().doTailEdits();
+ for (Iterator<String> it = remainingFiles.iterator(); it.hasNext();) {
+ if (getFileInfo(standbyNN, it.next(), true, false, false) == null) {
+ return false;
+ } else {
+ it.remove();
+ }
+ }
+ return true;
+ } catch (IOException|InterruptedException e) {
+ throw new AssertionError("Exception while waiting: " + e);
+ }
+ }
+ }, 10, 1000);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org