You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:16 UTC

[hadoop] 13/50: HDFS-13610. [SBN read] Edit Tail Fast Path Part 4: Cleanup. Integration test, documentation, remove unnecessary dummy sync, minors fixups. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ea402854aa63e10a086460f812afd2c3c9fbf8e8
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri May 11 13:23:38 2018 -0700

    HDFS-13610. [SBN read] Edit Tail Fast Path Part 4: Cleanup. Integration test, documentation, remove unnecessary dummy sync, minors fixups. Contributed by Erik Krogen.
---
 .../hdfs/qjournal/client/QuorumJournalManager.java |  15 +-
 .../hdfs/qjournal/client/QuorumOutputStream.java   |  13 +-
 .../hadoop/hdfs/qjournal/server/Journal.java       |   6 +
 .../hdfs/qjournal/server/JournaledEditsCache.java  |  63 +++++---
 .../site/markdown/HDFSHighAvailabilityWithQJM.md   |  28 ++++
 .../qjournal/client/TestQuorumJournalManager.java  |   4 +
 .../client/TestQuorumJournalManagerUnit.java       |   2 +
 .../hdfs/qjournal/server/JournalTestUtil.java      |  48 ++++++
 .../namenode/ha/TestStandbyInProgressTail.java     | 164 +++++++++++++++++----
 9 files changed, 275 insertions(+), 68 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index 91d4995..5525e92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -77,6 +77,8 @@ public class QuorumJournalManager implements JournalManager {
   // Maximum number of transactions to fetch at a time when using the
   // RPC edit fetch mechanism
   private final int maxTxnsPerRpc;
+  // Whether or not in-progress tailing is enabled in the configuration
+  private final boolean inProgressTailingEnabled;
   // Timeouts for which the QJM will wait for each of the following actions.
   private final int startSegmentTimeoutMs;
   private final int prepareRecoveryTimeoutMs;
@@ -139,6 +141,9 @@ public class QuorumJournalManager implements JournalManager {
         conf.getInt(QJM_RPC_MAX_TXNS_KEY, QJM_RPC_MAX_TXNS_DEFAULT);
     Preconditions.checkArgument(maxTxnsPerRpc > 0,
         "Must specify %s greater than 0!", QJM_RPC_MAX_TXNS_KEY);
+    this.inProgressTailingEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
         DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
@@ -420,11 +425,8 @@ public class QuorumJournalManager implements JournalManager {
         layoutVersion);
     loggers.waitForWriteQuorum(q, startSegmentTimeoutMs,
         "startLogSegment(" + txId + ")");
-    boolean updateCommittedTxId = conf.getBoolean(
-        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
-        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
     return new QuorumOutputStream(loggers, txId, outputBufferCapacity,
-        writeTxnsTimeoutMs, updateCommittedTxId);
+        writeTxnsTimeoutMs);
   }
 
   @Override
@@ -493,7 +495,10 @@ public class QuorumJournalManager implements JournalManager {
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxnId, boolean inProgressOk,
       boolean onlyDurableTxns) throws IOException {
-    if (inProgressOk) {
+    // Some calls will use inProgressOK to get in-progress edits even if
+    // the cache used for RPC calls is not enabled; fall back to using the
+    // streaming mechanism to serve such requests
+    if (inProgressOk && inProgressTailingEnabled) {
       LOG.info("Tailing edits starting from txn ID " + fromTxnId +
           " via RPC mechanism");
       try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
index 3ffcd3e..e094b21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
@@ -33,17 +33,15 @@ class QuorumOutputStream extends EditLogOutputStream {
   private EditsDoubleBuffer buf;
   private final long segmentTxId;
   private final int writeTimeoutMs;
-  private final boolean updateCommittedTxId;
 
   public QuorumOutputStream(AsyncLoggerSet loggers,
       long txId, int outputBufferCapacity,
-      int writeTimeoutMs, boolean updateCommittedTxId) throws IOException {
+      int writeTimeoutMs) throws IOException {
     super();
     this.buf = new EditsDoubleBuffer(outputBufferCapacity);
     this.loggers = loggers;
     this.segmentTxId = txId;
     this.writeTimeoutMs = writeTimeoutMs;
-    this.updateCommittedTxId = updateCommittedTxId;
   }
 
   @Override
@@ -112,15 +110,6 @@ class QuorumOutputStream extends EditLogOutputStream {
       // RPCs will thus let the loggers know of the most recent transaction, even
       // if a logger has fallen behind.
       loggers.setCommittedTxId(firstTxToFlush + numReadyTxns - 1);
-
-      // If we don't have this dummy send, committed TxId might be one-batch
-      // stale on the Journal Nodes
-      if (updateCommittedTxId) {
-        QuorumCall<AsyncLogger, Void> fakeCall = loggers.sendEdits(
-            segmentTxId, firstTxToFlush,
-            0, new byte[0]);
-        loggers.waitForWriteQuorum(fakeCall, writeTimeoutMs, "sendEdits");
-      }
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index f95da45..ffbb682 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -1209,4 +1209,10 @@ public class Journal implements Closeable {
   public Long getJournalCTime() throws IOException {
     return storage.getJournalManager().getJournalCTime();
   }
+
+  @VisibleForTesting
+  JournaledEditsCache getJournaledEditsCache() {
+    return cache;
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
index 1151069..387caa1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.util.AutoCloseableLock;
 
-
 /**
  * An in-memory cache of edits in their serialized form. This is used to serve
  * the {@link Journal#getJournaledEdits(long, int)} call, used by the
@@ -70,6 +69,9 @@ import org.apache.hadoop.util.AutoCloseableLock;
  */
 class JournaledEditsCache {
 
+  private static final int INVALID_LAYOUT_VERSION = 0;
+  private static final long INVALID_TXN_ID = -1;
+
   /** The capacity, in bytes, of this cache. */
   private final int capacity;
 
@@ -91,13 +93,13 @@ class JournaledEditsCache {
    */
   private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
   /** Stores the layout version currently present in the cache. */
-  private int layoutVersion = Integer.MAX_VALUE;
+  private int layoutVersion = INVALID_LAYOUT_VERSION;
   /** Stores the serialized version of the header for the current version. */
   private ByteBuffer layoutHeader;
 
   /**
-   * The lowest/highest transaction IDs present in the cache. -1 if there are no
-   * transactions in the cache.
+   * The lowest/highest transaction IDs present in the cache.
+   * {@value INVALID_TXN_ID} if there are no transactions in the cache.
    */
   private long lowestTxnId;
   private long highestTxnId;
@@ -127,7 +129,7 @@ class JournaledEditsCache {
     ReadWriteLock lock = new ReentrantReadWriteLock(true);
     readLock = new AutoCloseableLock(lock.readLock());
     writeLock = new AutoCloseableLock(lock.writeLock());
-    initialize(-1);
+    initialize(INVALID_TXN_ID);
   }
 
   /**
@@ -144,6 +146,7 @@ class JournaledEditsCache {
    * transaction count of 0 will be returned. If {@code requestedStartTxn} is
    * lower than the lowest transaction currently contained in this cache, or no
    * transactions have yet been added to the cache, an exception will be thrown.
+   *
    * @param requestedStartTxn The ID of the first transaction to return. If any
    *                          transactions are returned, it is guaranteed that
    *                          the first one will have this ID.
@@ -160,7 +163,7 @@ class JournaledEditsCache {
     int txnCount = 0;
 
     try (AutoCloseableLock l = readLock.acquire()) {
-      if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) {
+      if (lowestTxnId == INVALID_TXN_ID || requestedStartTxn < lowestTxnId) {
         throw getCacheMissException(requestedStartTxn);
       } else if (requestedStartTxn > highestTxnId) {
         return 0;
@@ -222,6 +225,7 @@ class JournaledEditsCache {
    * This attempts to always handle malformed inputs gracefully rather than
    * throwing an exception, to allow the rest of the Journal's operations
    * to proceed normally.
+   *
    * @param inputData A buffer containing edits in serialized form
    * @param newStartTxn The txn ID of the first edit in {@code inputData}
    * @param newEndTxn The txn ID of the last edit in {@code inputData}
@@ -246,15 +250,16 @@ class JournaledEditsCache {
               newStartTxn, newEndTxn, newLayoutVersion), ioe);
           return;
         }
-      }
-      if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) {
-        // Cache initialization step
-        if (lowestTxnId >= 0) {
-          // Cache is out of sync; clear to avoid storing noncontiguous regions
-          Journal.LOG.error(String.format("Edits cache is out of sync; " +
-                  "looked for next txn id at %d but got start txn id for " +
-                  "cache put request at %d", highestTxnId + 1, newStartTxn));
-        }
+      } else if (lowestTxnId == INVALID_TXN_ID) {
+        Journal.LOG.info("Initializing edits cache starting from txn ID " +
+            newStartTxn);
+        initialize(newStartTxn);
+      } else if (highestTxnId + 1 != newStartTxn) {
+        // Cache is out of sync; clear to avoid storing noncontiguous regions
+        Journal.LOG.error(String.format("Edits cache is out of sync; " +
+            "looked for next txn id at %d but got start txn id for " +
+            "cache put request at %d. Reinitializing at new request.",
+            highestTxnId + 1, newStartTxn));
         initialize(newStartTxn);
       }
 
@@ -264,11 +269,12 @@ class JournaledEditsCache {
         totalSize -= lowest.getValue().length;
       }
       if (inputData.length > capacity) {
-        initialize(-1);
+        initialize(INVALID_TXN_ID);
         Journal.LOG.warn(String.format("A single batch of edits was too " +
                 "large to fit into the cache: startTxn = %d, endTxn = %d, " +
                 "input length = %d. The capacity of the cache (%s) must be " +
-                "increased for it to work properly (current capacity %d)",
+                "increased for it to work properly (current capacity %d)." +
+                "Cache is now empty.",
             newStartTxn, newEndTxn, inputData.length,
             DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
         return;
@@ -289,6 +295,7 @@ class JournaledEditsCache {
    * Skip through a given stream of edits until the given transaction ID is
    * found. Return the number of bytes that appear prior to the given
    * transaction.
+   *
    * @param buf A buffer containing a stream of serialized edits
    * @param txnId The transaction ID to search for
    * @return The number of bytes appearing in {@code buf} <i>before</i>
@@ -312,13 +319,22 @@ class JournaledEditsCache {
   /**
    * Update the layout version of the cache. This clears out all existing
    * entries, and populates the new layout version and header for that version.
+   *
    * @param newLayoutVersion The new layout version to be stored in the cache
    * @param newStartTxn The new lowest transaction in the cache
    */
   private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
       throws IOException {
-    Journal.LOG.info("Updating edits cache to use layout version " +
-        newLayoutVersion + "; previously was " + layoutVersion);
+    StringBuilder logMsg = new StringBuilder()
+        .append("Updating edits cache to use layout version ")
+        .append(newLayoutVersion)
+        .append(" starting from txn ID ")
+        .append(newStartTxn);
+    if (layoutVersion != INVALID_LAYOUT_VERSION) {
+      logMsg.append("; previous version was ").append(layoutVersion)
+          .append("; old entries will be cleared.");
+    }
+    Journal.LOG.info(logMsg.toString());
     initialize(newStartTxn);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     EditLogFileOutputStream.writeHeader(newLayoutVersion,
@@ -329,20 +345,23 @@ class JournaledEditsCache {
 
   /**
    * Initialize the cache back to a clear state.
+   *
    * @param newInitialTxnId The new lowest transaction ID stored in the cache.
-   *                        -1 if the cache is to remain empty at this time.
+   *                        This should be {@value INVALID_TXN_ID} if the cache
+   *                        is to remain empty at this time.
    */
   private void initialize(long newInitialTxnId) {
     dataMap.clear();
     totalSize = 0;
     initialTxnId = newInitialTxnId;
     lowestTxnId = initialTxnId;
-    highestTxnId = -1;
+    highestTxnId = INVALID_TXN_ID; // this will be set later
   }
 
   /**
    * Return the underlying data buffer used to store information about the
    * given transaction ID.
+   *
    * @param txnId Transaction ID whose containing buffer should be fetched.
    * @return The data buffer for the transaction
    */
@@ -354,7 +373,7 @@ class JournaledEditsCache {
   }
 
   private CacheMissException getCacheMissException(long requestedTxnId) {
-    if (lowestTxnId < 0) {
+    if (lowestTxnId == INVALID_TXN_ID) {
       return new CacheMissException(0, "Cache is empty; either it was never " +
           "written to or the last write overflowed the cache capacity.");
     } else if (requestedTxnId < initialTxnId) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
index e4363fb..76a9837 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithQJM.md
@@ -431,6 +431,34 @@ http://NN_HOSTNAME/isActive will return a 200 status code response if the NN is
 
 
 
+### In-Progress Edit Log Tailing
+
+Under the default settings, the Standby NameNode will only apply edits that are present in an edit
+log segments which has been finalized. If it is desirable to have a Standby NameNode which has more
+up-to-date namespace information, it is possible to enable tailing of in-progress edit segments.
+This setting will attempt to fetch edits from an in-memory cache on the JournalNodes and can reduce
+the lag time before a transaction is applied on the Standby NameNode to the order of milliseconds.
+If an edit cannot be served from the cache, the Standby will still be able to retrieve it, but the
+lag time will be much longer. The relevant configurations are:
+
+*   **dfs.ha.tail-edits.in-progress** - Whether or not to enable tailing on in-progress edits logs.
+    This will also enable the in-memory edit cache on the JournalNodes. Disabled by default.
+
+*   **dfs.journalnode.edit-cache-size.bytes** - The size of the in-memory cache of edits on the
+    JournalNode. Edits take around 200 bytes each in a typical environment, so, for example, the
+    default of 1048576 (1MB) can hold around 5000 transactions. It is recommended to monitor the
+    JournalNode metrics RpcRequestCacheMissAmountNumMisses and RpcRequestCacheMissAmountAvgTxns,
+    which respectively count the number of requests unable to be served by the cache, and the extra
+    number of transactions which would have needed to have been in the cache for the request to
+    succeed. For example, if a request attempted to fetch edits starting at transaction ID 10, but
+    the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the
+    average.
+
+This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this
+feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits
+provides these nodes with the ability to serve requests with data which is much more fresh. See the
+Apache JIRA ticket HDFS-12943 for more information on this feature.
+
 Automatic Failover
 ------------------
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index 9f089c9..f3bb954 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -1045,6 +1045,10 @@ public class TestQuorumJournalManager {
         qjm.startLogSegment(1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
     writeTxns(stm, 1, 10);
     writeTxns(stm, 11, 1);
+    // One last sync whose transactions are not expected to be seen in the
+    // input streams because the JournalNodes have not updated their concept
+    // of the committed transaction ID yet
+    writeTxns(stm, 12, 1);
 
     futureThrows(new IOException()).when(spies.get(0)).getJournaledEdits(1,
         QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
index 30ef21b..837c7d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
@@ -32,6 +32,7 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@@ -79,6 +80,7 @@ public class TestQuorumJournalManagerUnit {
         mockLogger(),
         mockLogger());
 
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
       @Override
       protected List<AsyncLogger> createLoggers(AsyncLogger.Factory factory) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java
new file mode 100644
index 0000000..de03b2c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/JournalTestUtil.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+/**
+ * Utilities for testing {@link Journal} instances.
+ */
+public class JournalTestUtil {
+
+  /**
+   * Corrupt the cache of a {@link Journal} to simulate some corrupt entries
+   * being present.
+   *
+   * @param txid The transaction ID whose containing buffer in the cache
+   *             should be corrupted.
+   * @param journal The journal whose cache should be corrupted.
+   */
+  public static void corruptJournaledEditsCache(long txid, Journal journal) {
+    JournaledEditsCache cache = journal.getJournaledEditsCache();
+    byte[] buf = cache.getRawDataForTests(txid);
+    // Change a few arbitrary bytes in the buffer
+    for (int i = 0; i < buf.length; i += 9) {
+      buf[i] = 0;
+    }
+    for (int i = 3; i < buf.length; i += 9) {
+      buf[i] += 10;
+    }
+    for (int i = 6; i < buf.length; i += 9) {
+      buf[i] -= 10;
+    }
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
index 2bdada4..8394073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyInProgressTail.java
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Iterator;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -30,9 +33,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -43,6 +48,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 /**
@@ -64,6 +70,8 @@ public class TestStandbyInProgressTail {
     // Set period of tail edits to a large value (20 mins) for test purposes
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 20 * 60);
     conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
+        500);
     HAUtil.setAllowStandbyReads(conf, true);
     qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
     cluster = qjmhaCluster.getDfsCluster();
@@ -179,12 +187,7 @@ public class TestStandbyInProgressTail {
     cluster.getNameNode(0).getRpcServer().mkdirs("/test",
             FsPermission.createImmutable((short) 0755), true);
 
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
-
-    // After waiting for 5 seconds, StandbyNameNode should finish tailing
-    // in-progress logs
-    assertNotNull(getFileInfo(cluster.getNameNode(1),
-            "/test", true, false, false));
+    waitForFileInfo(nn1, "/test");
 
     // Restarting the standby should not finalize any edits files
     // in the shared directory when it starts up!
@@ -227,10 +230,9 @@ public class TestStandbyInProgressTail {
 
     cluster.getNameNode(0).getRpcServer().mkdirs("/test",
             FsPermission.createImmutable((short) 0755), true);
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
 
     // StandbyNameNode should tail the in-progress edit
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
+    waitForFileInfo(nn1, "/test");
 
     // Create a new edit and finalized it
     cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
@@ -238,17 +240,14 @@ public class TestStandbyInProgressTail {
     nn0.getRpcServer().rollEditLog();
 
     // StandbyNameNode shouldn't tail the edit since we do not call the method
-    assertNull(getFileInfo(nn1, "/test2", true, false, false));
+    waitForFileInfo(nn1, "/test2");
 
     // Create a new in-progress edit and let SBNN do the tail
     cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
             FsPermission.createImmutable((short) 0755), true);
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
 
     // StandbyNameNode should tail the finalized edit and the new in-progress
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+    waitForFileInfo(nn1, "/test", "/test2", "/test3");
   }
 
   @Test
@@ -275,12 +274,8 @@ public class TestStandbyInProgressTail {
     assertNull(getFileInfo(nn1, "/test2", true, false, false));
     assertNull(getFileInfo(nn1, "/test3", true, false, false));
 
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
-
-    // StandbyNameNode shoudl tail the finalized edit and the new in-progress
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+    // StandbyNameNode should tail the finalized edit and the new in-progress
+    waitForFileInfo(nn1, "/test", "/test2", "/test3");
   }
 
   @Test
@@ -295,19 +290,14 @@ public class TestStandbyInProgressTail {
             FsPermission.createImmutable((short) 0755), true);
     cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
             FsPermission.createImmutable((short) 0755), true);
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
+    waitForFileInfo(nn1, "/test", "/test2");
     nn0.getRpcServer().rollEditLog();
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
 
     cluster.getNameNode(0).getRpcServer().mkdirs("/test3",
             FsPermission.createImmutable((short) 0755), true);
-    nn1.getNamesystem().getEditLogTailer().doTailEdits();
 
-    // StandbyNameNode shoudl tail the finalized edit and the new in-progress
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test2", true, false, false));
-    assertNotNull(getFileInfo(nn1, "/test3", true, false, false));
+    // StandbyNameNode should tail the finalized edit and the new in-progress
+    waitForFileInfo(nn1, "/test", "/test2", "/test3");
   }
 
   @Test
@@ -325,8 +315,85 @@ public class TestStandbyInProgressTail {
         FsPermission.createImmutable((short) 0755), true);
     cluster.getNameNode(0).getRpcServer().rollEdits();
 
-    cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
-    assertNotNull(getFileInfo(nn1, "/test", true, false, false));
+    waitForFileInfo(nn1, "/test");
+  }
+
+  @Test
+  public void testEditsServedViaCache() throws Exception {
+    cluster.transitionToActive(0);
+    cluster.waitActive(0);
+
+    mkdirs(nn0, "/test", "/test2");
+    nn0.getRpcServer().rollEditLog();
+    for (int idx = 0; idx < qjmhaCluster.getJournalCluster().getNumNodes();
+        idx++) {
+      File[] startingEditFile = qjmhaCluster.getJournalCluster()
+          .getCurrentDir(idx, DFSUtil.getNamenodeNameServiceId(conf))
+          .listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+              return name.matches("edits_0+1-[0-9]+");
+            }
+          });
+      assertNotNull(startingEditFile);
+      assertEquals(1, startingEditFile.length);
+      // Delete this edit file to ensure that edits can't be served via the
+      // streaming mechanism - RPC/cache-based only
+      startingEditFile[0].delete();
+    }
+    // Ensure edits were not tailed before the edit files were deleted;
+    // quick spot check of a single dir
+    assertNull(getFileInfo(nn1, "/tmp0", false, false, false));
+
+    waitForFileInfo(nn1, "/test", "/test2");
+  }
+
+  @Test
+  public void testCorruptJournalCache() throws Exception {
+    cluster.transitionToActive(0);
+    cluster.waitActive(0);
+
+    // Shut down one JN so there is only a quorum remaining to make it easier
+    // to manage the remaining two
+    qjmhaCluster.getJournalCluster().getJournalNode(0).stopAndJoin(0);
+
+    mkdirs(nn0, "/test", "/test2");
+    JournalTestUtil.corruptJournaledEditsCache(1,
+        qjmhaCluster.getJournalCluster().getJournalNode(1)
+            .getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
+
+    nn0.getRpcServer().rollEditLog();
+
+    waitForFileInfo(nn1, "/test", "/test2");
+
+    mkdirs(nn0, "/test3", "/test4");
+    JournalTestUtil.corruptJournaledEditsCache(3,
+        qjmhaCluster.getJournalCluster().getJournalNode(2)
+            .getJournal(DFSUtil.getNamenodeNameServiceId(conf)));
+
+    waitForFileInfo(nn1, "/test3", "/test4");
+  }
+
+  @Test
+  public void testTailWithoutCache() throws Exception {
+    qjmhaCluster.shutdown();
+    // Effectively disable the cache by setting its size too small to be used
+    conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1);
+    qjmhaCluster = new MiniQJMHACluster.Builder(conf).build();
+    cluster = qjmhaCluster.getDfsCluster();
+    cluster.transitionToActive(0);
+    cluster.waitActive(0);
+    nn0 = cluster.getNameNode(0);
+    nn1 = cluster.getNameNode(1);
+
+    mkdirs(nn0, "/test", "/test2");
+    nn0.getRpcServer().rollEditLog();
+
+    mkdirs(nn0, "/test3", "/test4");
+
+    // Skip the last directory; the JournalNodes' idea of the committed
+    // txn ID may not have been updated to include it yet
+    waitForFileInfo(nn1, "/test", "/test2", "/test3");
   }
 
   /**
@@ -356,4 +423,43 @@ public class TestStandbyInProgressTail {
       GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
     }
   }
+
+  /**
+   * Create the given directories on the provided NameNode.
+   */
+  private static void mkdirs(NameNode nameNode, String... dirNames)
+      throws Exception {
+    for (String dirName : dirNames) {
+      nameNode.getRpcServer().mkdirs(dirName,
+          FsPermission.createImmutable((short) 0755), true);
+    }
+  }
+
+  /**
+   * Wait up to 1 second until the given NameNode is aware of the existing of
+   * all of the provided fileNames.
+   */
+  private static void waitForFileInfo(NameNode standbyNN, String... fileNames)
+      throws Exception {
+    List<String> remainingFiles = Lists.newArrayList(fileNames);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        try {
+          standbyNN.getNamesystem().getEditLogTailer().doTailEdits();
+          for (Iterator<String> it = remainingFiles.iterator(); it.hasNext();) {
+            if (getFileInfo(standbyNN, it.next(), true, false, false) == null) {
+              return false;
+            } else {
+              it.remove();
+            }
+          }
+          return true;
+        } catch (IOException|InterruptedException e) {
+          throw new AssertionError("Exception while waiting: " + e);
+        }
+      }
+    }, 10, 1000);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org