You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:10 UTC

[hadoop] 07/50: HDFS-13607. [SBN read] Edit Tail Fast Path Part 1: Enhance JournalNode with an in-memory cache of recent edit transactions. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 174c41c68f80fc91c999df123af8765f1bec0def
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed May 9 15:40:07 2018 -0700

    HDFS-13607. [SBN read] Edit Tail Fast Path Part 1: Enhance JournalNode with an in-memory cache of recent edit transactions. Contributed by Erik Krogen.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   3 +
 .../hdfs/qjournal/server/JournaledEditsCache.java  | 393 +++++++++++++++++++++
 .../src/main/resources/hdfs-default.xml            |  12 +
 .../qjournal/server/TestJournaledEditsCache.java   | 257 ++++++++++++++
 4 files changed, 665 insertions(+)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8898817..785aee2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1083,6 +1083,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
       "dfs.journalnode.sync.interval";
   public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
+  public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
+      "dfs.journalnode.edit-cache-size.bytes";
+  public static final int DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT = 1024 * 1024;
 
   // Journal-node related configs for the client side.
   public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
new file mode 100644
index 0000000..1151069
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournaledEditsCache.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.util.AutoCloseableLock;
+
+
+/**
+ * An in-memory cache of edits in their serialized form. This is used to serve
+ * the {@link Journal#getJournaledEdits(long, int)} call, used by the
+ * QJM when {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} is
+ * enabled.
+ *
+ * <p>When a batch of edits is received by the JournalNode, it is put into this
+ * cache via {@link #storeEdits(byte[], long, long, int)}. Edits must be
+ * stored contiguously; if a batch of edits is stored that does not align with
+ * the previously stored edits, the cache will be cleared before storing new
+ * edits to avoid gaps. This decision is made because gaps are only handled
+ * when in recovery mode, which the cache is not intended to be used for.
+ *
+ * <p>Batches of edits are stored in a {@link TreeMap} mapping the starting
+ * transaction ID of the batch to the data buffer. Upon retrieval, the
+ * relevant data buffers are concatenated together and a header is added
+ * to construct a fully-formed edit data stream.
+ *
+ * <p>The cache is of a limited size capacity determined by
+ * {@value DFSConfigKeys#DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY}. If the capacity
+ * is exceeded after adding a new batch of edits, batches of edits are removed
+ * until the total size is less than the capacity, starting from the ones
+ * containing the oldest transactions. Transactions range in size, but a
+ * decent rule of thumb is that 200 bytes are needed per transaction. Monitoring
+ * the {@link JournalMetrics#rpcRequestCacheMissAmount} metric is recommended
+ * to determine if the cache is too small; it will indicate both how many
+ * cache misses occurred, and how many more transactions would have been
+ * needed in the cache to serve the request.
+ */
+class JournaledEditsCache {
+
+  /** The capacity, in bytes, of this cache. */
+  private final int capacity;
+
+  /**
+   * Read/write lock pair wrapped in AutoCloseable; these refer to the same
+   * underlying lock.
+   */
+  private final AutoCloseableLock readLock;
+  private final AutoCloseableLock writeLock;
+
+  // ** Start lock-protected fields **
+
+  /**
+   * Stores the actual data as a mapping of the StartTxnId of a batch of edits
+   * to the serialized batch of edits. Stores only contiguous ranges; that is,
+   * the last transaction ID in one batch is always one less than the first
+   * transaction ID in the next batch. Though the map is protected by the lock,
+   * individual data buffers are immutable and can be accessed without locking.
+   */
+  private final NavigableMap<Long, byte[]> dataMap = new TreeMap<>();
+  /** Stores the layout version currently present in the cache. */
+  private int layoutVersion = Integer.MAX_VALUE;
+  /** Stores the serialized version of the header for the current version. */
+  private ByteBuffer layoutHeader;
+
+  /**
+   * The lowest/highest transaction IDs present in the cache. -1 if there are no
+   * transactions in the cache.
+   */
+  private long lowestTxnId;
+  private long highestTxnId;
+  /**
+   * The lowest transaction ID that was ever present in the cache since last
+   * being reset (i.e. since initialization or since reset due to being out of
+   * sync with the Journal). Until the cache size goes above capacity, this is
+   * equal to lowestTxnId.
+   */
+  private long initialTxnId;
+  /** The current total size of all buffers in this cache. */
+  private int totalSize;
+
+  // ** End lock-protected fields **
+
+  JournaledEditsCache(Configuration conf) {
+    capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
+    if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
+      Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
+          "maximum JVM memory is only %d bytes. It is recommended that you " +
+          "decrease the cache size or increase the heap size.",
+          capacity, Runtime.getRuntime().maxMemory()));
+    }
+    Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
+        "of bytes: " + capacity);
+    ReadWriteLock lock = new ReentrantReadWriteLock(true);
+    readLock = new AutoCloseableLock(lock.readLock());
+    writeLock = new AutoCloseableLock(lock.writeLock());
+    initialize(-1);
+  }
+
+  /**
+   * Fetch the data for edits starting at the specific transaction ID, fetching
+   * up to {@code maxTxns} transactions. Populates a list of output buffers
+   * which contains a serialized version of the edits, and returns the count of
+   * edits contained within the serialized buffers. The serialized edits are
+   * prefixed with a standard edit log header containing information about the
+   * layout version. The transactions returned are guaranteed to have contiguous
+   * transaction IDs.
+   *
+   * If {@code requestedStartTxn} is higher than the highest transaction which
+   * has been added to this cache, a response with an empty buffer and a
+   * transaction count of 0 will be returned. If {@code requestedStartTxn} is
+   * lower than the lowest transaction currently contained in this cache, or no
+   * transactions have yet been added to the cache, an exception will be thrown.
+   * @param requestedStartTxn The ID of the first transaction to return. If any
+   *                          transactions are returned, it is guaranteed that
+   *                          the first one will have this ID.
+   * @param maxTxns The maximum number of transactions to return.
+   * @param outputBuffers A list to populate with output buffers. When
+   *                      concatenated, these form a full response.
+   * @return The number of transactions contained within the set of output
+   *         buffers.
+   * @throws IOException If transactions are requested which cannot be served
+   *                     by this cache.
+   */
+  int retrieveEdits(long requestedStartTxn, int maxTxns,
+      List<ByteBuffer> outputBuffers) throws IOException {
+    int txnCount = 0;
+
+    try (AutoCloseableLock l = readLock.acquire()) {
+      if (lowestTxnId < 0 || requestedStartTxn < lowestTxnId) {
+        throw getCacheMissException(requestedStartTxn);
+      } else if (requestedStartTxn > highestTxnId) {
+        return 0;
+      }
+      outputBuffers.add(layoutHeader);
+      Iterator<Map.Entry<Long, byte[]>> incrBuffIter =
+          dataMap.tailMap(dataMap.floorKey(requestedStartTxn), true)
+              .entrySet().iterator();
+      long prevTxn = requestedStartTxn;
+      byte[] prevBuf = null;
+      // Stop when maximum transactions reached...
+      while ((txnCount < maxTxns) &&
+          // ... or there are no more entries ...
+          (incrBuffIter.hasNext() || prevBuf != null)) {
+        long currTxn;
+        byte[] currBuf;
+        if (incrBuffIter.hasNext()) {
+          Map.Entry<Long, byte[]> ent = incrBuffIter.next();
+          currTxn = ent.getKey();
+          currBuf = ent.getValue();
+        } else {
+          // This accounts for the trailing entry
+          currTxn = highestTxnId + 1;
+          currBuf = null;
+        }
+        if (prevBuf != null) { // True except for the first loop iteration
+          outputBuffers.add(ByteBuffer.wrap(prevBuf));
+          // if prevTxn < requestedStartTxn, the extra transactions will get
+          // removed after the loop, so don't include them in the txn count
+          txnCount += currTxn - Math.max(requestedStartTxn, prevTxn);
+        }
+        prevTxn = currTxn;
+        prevBuf = currBuf;
+      }
+      // Release the lock before doing operations on the buffers (deserializing
+      // to find transaction boundaries, and copying into an output buffer)
+    }
+    // Remove extra leading transactions in the first buffer
+    ByteBuffer firstBuf = outputBuffers.get(1); // 0th is the header
+    firstBuf.position(
+        findTransactionPosition(firstBuf.array(), requestedStartTxn));
+    // Remove trailing transactions in the last buffer if necessary
+    if (txnCount > maxTxns) {
+      ByteBuffer lastBuf = outputBuffers.get(outputBuffers.size() - 1);
+      int limit =
+          findTransactionPosition(lastBuf.array(), requestedStartTxn + maxTxns);
+      lastBuf.limit(limit);
+      txnCount = maxTxns;
+    }
+
+    return txnCount;
+  }
+
+  /**
+   * Store a batch of serialized edits into this cache. Removes old batches
+   * as necessary to keep the total size of the cache below the capacity.
+   * See the class Javadoc for more info.
+   *
+   * This attempts to always handle malformed inputs gracefully rather than
+   * throwing an exception, to allow the rest of the Journal's operations
+   * to proceed normally.
+   * @param inputData A buffer containing edits in serialized form
+   * @param newStartTxn The txn ID of the first edit in {@code inputData}
+   * @param newEndTxn The txn ID of the last edit in {@code inputData}
+   * @param newLayoutVersion The version of the layout used to serialize
+   *                         the edits
+   */
+  void storeEdits(byte[] inputData, long newStartTxn, long newEndTxn,
+      int newLayoutVersion) {
+    if (newStartTxn < 0 || newEndTxn < newStartTxn) {
+      Journal.LOG.error(String.format("Attempted to cache data of length %d " +
+          "with newStartTxn %d and newEndTxn %d",
+          inputData.length, newStartTxn, newEndTxn));
+      return;
+    }
+    try (AutoCloseableLock l = writeLock.acquire()) {
+      if (newLayoutVersion != layoutVersion) {
+        try {
+          updateLayoutVersion(newLayoutVersion, newStartTxn);
+        } catch (IOException ioe) {
+          Journal.LOG.error(String.format("Unable to save new edits [%d, %d] " +
+              "due to exception when updating to new layout version %d",
+              newStartTxn, newEndTxn, newLayoutVersion), ioe);
+          return;
+        }
+      }
+      if (lowestTxnId < 0 || (highestTxnId + 1) != newStartTxn) {
+        // Cache initialization step
+        if (lowestTxnId >= 0) {
+          // Cache is out of sync; clear to avoid storing noncontiguous regions
+          Journal.LOG.error(String.format("Edits cache is out of sync; " +
+                  "looked for next txn id at %d but got start txn id for " +
+                  "cache put request at %d", highestTxnId + 1, newStartTxn));
+        }
+        initialize(newStartTxn);
+      }
+
+      while ((totalSize + inputData.length) > capacity && !dataMap.isEmpty()) {
+        Map.Entry<Long, byte[]> lowest = dataMap.firstEntry();
+        dataMap.remove(lowest.getKey());
+        totalSize -= lowest.getValue().length;
+      }
+      if (inputData.length > capacity) {
+        initialize(-1);
+        Journal.LOG.warn(String.format("A single batch of edits was too " +
+                "large to fit into the cache: startTxn = %d, endTxn = %d, " +
+                "input length = %d. The capacity of the cache (%s) must be " +
+                "increased for it to work properly (current capacity %d)",
+            newStartTxn, newEndTxn, inputData.length,
+            DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
+        return;
+      }
+      if (dataMap.isEmpty()) {
+        lowestTxnId = newStartTxn;
+      } else {
+        lowestTxnId = dataMap.firstKey();
+      }
+
+      dataMap.put(newStartTxn, inputData);
+      highestTxnId = newEndTxn;
+      totalSize += inputData.length;
+    }
+  }
+
+  /**
+   * Skip through a given stream of edits until the given transaction ID is
+   * found. Return the number of bytes that appear prior to the given
+   * transaction.
+   * @param buf A buffer containing a stream of serialized edits
+   * @param txnId The transaction ID to search for
+   * @return The number of bytes appearing in {@code buf} <i>before</i>
+   *         the start of the transaction with ID {@code txnId}.
+   */
+  private int findTransactionPosition(byte[] buf, long txnId)
+      throws IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(buf);
+    FSEditLogLoader.PositionTrackingInputStream tracker =
+        new FSEditLogLoader.PositionTrackingInputStream(bais);
+    FSEditLogOp.Reader reader = FSEditLogOp.Reader.create(
+        new DataInputStream(tracker), tracker, layoutVersion);
+    long previousPos = 0;
+    while (reader.scanOp() < txnId) {
+      previousPos = tracker.getPos();
+    }
+    // tracker is backed by a byte[]; position cannot go above an integer
+    return (int) previousPos;
+  }
+
+  /**
+   * Update the layout version of the cache. This clears out all existing
+   * entries, and populates the new layout version and header for that version.
+   * @param newLayoutVersion The new layout version to be stored in the cache
+   * @param newStartTxn The new lowest transaction in the cache
+   */
+  private void updateLayoutVersion(int newLayoutVersion, long newStartTxn)
+      throws IOException {
+    Journal.LOG.info("Updating edits cache to use layout version " +
+        newLayoutVersion + "; previously was " + layoutVersion);
+    initialize(newStartTxn);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(newLayoutVersion,
+        new DataOutputStream(baos));
+    layoutVersion = newLayoutVersion;
+    layoutHeader = ByteBuffer.wrap(baos.toByteArray());
+  }
+
+  /**
+   * Initialize the cache back to a clear state.
+   * @param newInitialTxnId The new lowest transaction ID stored in the cache.
+   *                        -1 if the cache is to remain empty at this time.
+   */
+  private void initialize(long newInitialTxnId) {
+    dataMap.clear();
+    totalSize = 0;
+    initialTxnId = newInitialTxnId;
+    lowestTxnId = initialTxnId;
+    highestTxnId = -1;
+  }
+
+  /**
+   * Return the underlying data buffer used to store information about the
+   * given transaction ID.
+   * @param txnId Transaction ID whose containing buffer should be fetched.
+   * @return The data buffer for the transaction
+   */
+  @VisibleForTesting
+  byte[] getRawDataForTests(long txnId) {
+    try (AutoCloseableLock l = readLock.acquire()) {
+      return dataMap.floorEntry(txnId).getValue();
+    }
+  }
+
+  private CacheMissException getCacheMissException(long requestedTxnId) {
+    if (lowestTxnId < 0) {
+      return new CacheMissException(0, "Cache is empty; either it was never " +
+          "written to or the last write overflowed the cache capacity.");
+    } else if (requestedTxnId < initialTxnId) {
+      return new CacheMissException(initialTxnId - requestedTxnId,
+          "Cache started at txn ID %d but requested txns starting at %d.",
+          initialTxnId, requestedTxnId);
+    } else {
+      return new CacheMissException(lowestTxnId - requestedTxnId,
+          "Oldest txn ID available in the cache is %d, but requested txns " +
+              "starting at %d. The cache size (%s) may need to be increased " +
+              "to hold more transactions (currently %d bytes containing %d " +
+              "transactions)", lowestTxnId, requestedTxnId,
+          DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
+          highestTxnId - lowestTxnId + 1);
+    }
+  }
+
+  static class CacheMissException extends IOException {
+
+    private static final long serialVersionUID = 0L;
+
+    private final long cacheMissAmount;
+
+    CacheMissException(long cacheMissAmount, String msgFormat,
+        Object... msgArgs) {
+      super(String.format(msgFormat, msgArgs));
+      this.cacheMissAmount = cacheMissAmount;
+    }
+
+    long getCacheMissAmount() {
+      return cacheMissAmount;
+    }
+
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5048bc8..5eff480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4122,6 +4122,18 @@
 </property>
 
 <property>
+  <name>dfs.journalnode.edit-cache-size.bytes</name>
+  <value>1048576</value>
+  <description>
+    The size, in bytes, of the in-memory cache of edits to keep on the
+    JournalNode. This cache is used to serve edits for tailing via the RPC-based
+    mechanism, and is only enabled when dfs.ha.tail-edits.in-progress is true.
+    Transactions range in size but are around 200 bytes on average, so the
+    default of 1MB can store around 5000 transactions.
+  </description>
+</property>
+
+<property>
   <name>dfs.journalnode.kerberos.internal.spnego.principal</name>
   <value></value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java
new file mode 100644
index 0000000..9e15d60
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournaledEditsCache.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import com.google.common.primitives.Bytes;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createGabageTxns;
+import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Test the {@link JournaledEditsCache} used for caching edits in-memory on the
+ * {@link Journal}.
+ */
+public class TestJournaledEditsCache {
+
+  private static final int EDITS_CAPACITY = 100;
+
+  private static final File TEST_DIR =
+      PathUtils.getTestDir(TestJournaledEditsCache.class, false);
+  private JournaledEditsCache cache;
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
+        createTxnData(1, 1).length * EDITS_CAPACITY);
+    cache = new JournaledEditsCache(conf);
+    TEST_DIR.mkdirs();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    FileUtils.deleteQuietly(TEST_DIR);
+  }
+
+  @Test
+  public void testCacheSingleSegment() throws Exception {
+    storeEdits(1, 20);
+    // Leading part of the segment
+    assertTxnCountAndContents(1, 5, 5);
+    // All of the segment
+    assertTxnCountAndContents(1, 20, 20);
+    // Past the segment
+    assertTxnCountAndContents(1, 40, 20);
+    // Trailing part of the segment
+    assertTxnCountAndContents(10, 11, 20);
+    // Trailing part of the segment, past the end
+    assertTxnCountAndContents(10, 20, 20);
+  }
+
+  @Test
+  public void testCacheBelowCapacityRequestOnBoundary() throws Exception {
+    storeEdits(1, 5);
+    storeEdits(6, 20);
+    storeEdits(21, 30);
+
+    // First segment only
+    assertTxnCountAndContents(1, 3, 3);
+    // Second segment only
+    assertTxnCountAndContents(6, 10, 15);
+    // First and second segment
+    assertTxnCountAndContents(1, 7, 7);
+    // All three segments
+    assertTxnCountAndContents(1, 25, 25);
+    // Second and third segment
+    assertTxnCountAndContents(6, 20, 25);
+    // Second and third segment; request past the end
+    assertTxnCountAndContents(6, 50, 30);
+    // Third segment only; request past the end
+    assertTxnCountAndContents(21, 20, 30);
+  }
+
+  @Test
+  public void testCacheBelowCapacityRequestOffBoundary() throws Exception {
+    storeEdits(1, 5);
+    storeEdits(6, 20);
+    storeEdits(21, 30);
+
+    // First segment only
+    assertTxnCountAndContents(3, 1, 3);
+    // First and second segment
+    assertTxnCountAndContents(3, 6, 8);
+    // Second and third segment
+    assertTxnCountAndContents(15, 10, 24);
+    // Second and third segment; request past the end
+    assertTxnCountAndContents(15, 50, 30);
+    // Start read past the end
+    List<ByteBuffer> buffers = new ArrayList<>();
+    assertEquals(0, cache.retrieveEdits(31, 10, buffers));
+    assertTrue(buffers.isEmpty());
+  }
+
+  @Test
+  public void testCacheAboveCapacity() throws Exception {
+    int thirdCapacity = EDITS_CAPACITY / 3;
+    storeEdits(1, thirdCapacity);
+    storeEdits(thirdCapacity + 1, thirdCapacity * 2);
+    storeEdits(thirdCapacity * 2 + 1, EDITS_CAPACITY);
+    storeEdits(EDITS_CAPACITY + 1, thirdCapacity * 4);
+    storeEdits(thirdCapacity * 4 + 1, thirdCapacity * 5);
+
+    try {
+      cache.retrieveEdits(1, 10, new ArrayList<>());
+      fail();
+    } catch (IOException ioe) {
+      // expected
+    }
+    assertTxnCountAndContents(EDITS_CAPACITY + 1, EDITS_CAPACITY,
+        thirdCapacity * 5);
+  }
+
+  @Test
+  public void testCacheSingleAdditionAboveCapacity() throws Exception {
+    LogCapturer logs = LogCapturer.captureLogs(Journal.LOG);
+    storeEdits(1, EDITS_CAPACITY * 2);
+    logs.stopCapturing();
+    assertTrue(logs.getOutput().contains("batch of edits was too large"));
+    try {
+      cache.retrieveEdits(1, 1, new ArrayList<>());
+      fail();
+    } catch (IOException ioe) {
+      // expected
+    }
+    storeEdits(EDITS_CAPACITY * 2 + 1, EDITS_CAPACITY * 2 + 5);
+    assertTxnCountAndContents(EDITS_CAPACITY * 2 + 1, 5,
+        EDITS_CAPACITY * 2 + 5);
+  }
+
+  @Test
+  public void testCacheWithFutureLayoutVersion() throws Exception {
+    byte[] firstHalf = createGabageTxns(1, 5);
+    byte[] secondHalf = createGabageTxns(6, 5);
+    int futureVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
+    cache.storeEdits(Bytes.concat(firstHalf, secondHalf), 1, 10,
+        futureVersion);
+    List<ByteBuffer> buffers = new ArrayList<>();
+    assertEquals(5, cache.retrieveEdits(6, 5, buffers));
+    assertArrayEquals(getHeaderForLayoutVersion(futureVersion),
+        buffers.get(0).array());
+    byte[] retBytes = new byte[buffers.get(1).remaining()];
+    System.arraycopy(buffers.get(1).array(), buffers.get(1).position(),
+        retBytes, 0, buffers.get(1).remaining());
+    assertArrayEquals(secondHalf, retBytes);
+  }
+
+  @Test
+  public void testCacheWithMultipleLayoutVersions() throws Exception {
+    int oldLayout = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION + 1;
+    cache.storeEdits(createTxnData(1, 5), 1, 5, oldLayout);
+    storeEdits(6, 10);
+    // Ensure the cache will only return edits from a single
+    // layout version at a time
+    try {
+      cache.retrieveEdits(1, 50, new ArrayList<>());
+      fail("Expected a cache miss");
+    } catch (JournaledEditsCache.CacheMissException cme) {
+      // expected
+    }
+    assertTxnCountAndContents(6, 50, 10);
+  }
+
+  @Test
+  public void testCacheEditsWithGaps() throws Exception {
+    storeEdits(1, 5);
+    storeEdits(10, 15);
+
+    try {
+      cache.retrieveEdits(1, 20, new ArrayList<>());
+      fail();
+    } catch (JournaledEditsCache.CacheMissException cme) {
+      assertEquals(9, cme.getCacheMissAmount());
+    }
+    assertTxnCountAndContents(10, 10, 15);
+  }
+
+  @Test(expected = JournaledEditsCache.CacheMissException.class)
+  public void testReadUninitializedCache() throws Exception {
+    cache.retrieveEdits(1, 10, new ArrayList<>());
+  }
+
+  @Test(expected = JournaledEditsCache.CacheMissException.class)
+  public void testCacheMalformedInput() throws Exception {
+    storeEdits(1, 1);
+    cache.retrieveEdits(-1, 10, new ArrayList<>());
+  }
+
+  private void storeEdits(int startTxn, int endTxn) throws Exception {
+    cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn,
+        endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+  }
+
+  private void assertTxnCountAndContents(int startTxn, int requestedMaxTxns,
+      int expectedEndTxn) throws Exception {
+    List<ByteBuffer> buffers = new ArrayList<>();
+    int expectedTxnCount = expectedEndTxn - startTxn + 1;
+    assertEquals(expectedTxnCount,
+        cache.retrieveEdits(startTxn, requestedMaxTxns, buffers));
+
+    byte[] expectedBytes = Bytes.concat(
+        getHeaderForLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION),
+        createTxnData(startTxn, expectedTxnCount));
+    byte[] actualBytes =
+        new byte[buffers.stream().mapToInt(ByteBuffer::remaining).sum()];
+    int pos = 0;
+    for (ByteBuffer buf : buffers) {
+      System.arraycopy(buf.array(), buf.position(), actualBytes, pos,
+          buf.remaining());
+      pos += buf.remaining();
+    }
+    assertArrayEquals(expectedBytes, actualBytes);
+  }
+
+  private static byte[] getHeaderForLayoutVersion(int version)
+      throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(version, new DataOutputStream(baos));
+    return baos.toByteArray();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org