You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:11 UTC

[hadoop] 08/50: HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b13110a6f018974d241e67cafc03ebca48e41d39
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed May 23 12:42:13 2018 -0700

    HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen.
---
 .../hadoop-common/src/site/markdown/Metrics.md     |  5 ++
 .../hdfs/qjournal/protocol/QJournalProtocol.java   | 24 ++++++++-
 .../QJournalProtocolServerSideTranslatorPB.java    | 14 +++++
 .../protocolPB/QJournalProtocolTranslatorPB.java   | 20 ++++++++
 .../hadoop/hdfs/qjournal/server/Journal.java       | 59 ++++++++++++++++++++++
 .../hdfs/qjournal/server/JournalMetrics.java       | 20 +++++++-
 .../hdfs/qjournal/server/JournalNodeRpcServer.java |  8 +++
 .../src/main/proto/QJournalProtocol.proto          | 18 +++++++
 .../hadoop/hdfs/qjournal/server/TestJournal.java   | 47 +++++++++++++++++
 9 files changed, 213 insertions(+), 2 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 3888e65..2351d97 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -325,6 +325,11 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m
 | `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
 | `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
 | `LastJournalTimestamp` | The timestamp of last successfully written transaction |
+| `TxnsServedViaRpc` | Number of transactions served via the RPC mechanism |
+| `BytesServedViaRpc` | Number of bytes served via the RPC mechanism |
+| `RpcRequestCacheMissAmountNumMisses` | Number of RPC requests which could not be served due to lack of data in the cache |
+| `RpcRequestCacheMissAmountAvgTxns` | The average number of transactions by which a request missed the cache; for example if transaction ID 10 is requested and the cache's oldest transaction is ID 15, value 5 will be added to this average |
+| `RpcEmptyResponses` | Number of RPC requests with zero edits returned |
 
 datanode
 --------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
index 8dad261..5db055a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -139,7 +140,28 @@ public interface QJournalProtocol {
                                                      long sinceTxId,
                                                      boolean inProgressOk)
       throws IOException;
-  
+
+  /**
+   * Fetch edit logs present in the Journal's in-memory cache of edits
+   * ({@link org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache}).
+   * To enable this cache, in-progress edit log tailing must be enabled via the
+   * {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} configuration key.
+   *
+   * @param jid The ID of the journal from which to fetch edits.
+   * @param nameServiceId The ID of the namespace for which to fetch edits.
+   * @param sinceTxId Fetch edits starting at this transaction ID
+   * @param maxTxns Request at most this many transactions to be returned
+   * @throws IOException If there was an issue encountered while fetching edits
+   *     from the cache, including a cache miss (cache does not contain the
+   *     requested edits). The caller should then attempt to fetch the edits via
+   *     the streaming mechanism (starting with
+   *     {@link #getEditLogManifest(String, String, long, boolean)}).
+   * @return Response containing serialized edits to be loaded
+   * @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache
+   */
+  GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException;
+
   /**
    * Begin the recovery process for a given segment. See the HDFS-3077
    * design document for details.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
index 2ad19da..e0324fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatReq
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -235,6 +237,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
     }
   }
 
+  @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(
+      RpcController controller, GetJournaledEditsRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.getJournaledEdits(request.getJid().getIdentifier(),
+          request.hasNameServiceId() ? request.getNameServiceId() : null,
+          request.getSinceTxId(), request.getMaxTxns());
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
 
   @Override
   public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
index 42d35f5..4126b72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeL
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -284,6 +286,24 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
   }
 
   @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+    try {
+      GetJournaledEditsRequestProto.Builder req =
+          GetJournaledEditsRequestProto.newBuilder()
+              .setJid(convertJournalId(jid))
+              .setSinceTxId(sinceTxId)
+              .setMaxTxns(maxTxns);
+      if (nameServiceId != null) {
+        req.setNameServiceId(nameServiceId);
+      }
+      return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override
   public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
       long segmentTxId) throws IOException {
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index cb72741..f95da45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import com.google.protobuf.ByteString;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
@@ -24,9 +25,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -36,10 +39,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -85,6 +90,7 @@ public class Journal implements Closeable {
   // Current writing state
   private EditLogOutputStream curSegment;
   private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+  private int curSegmentLayoutVersion = 0;
   private long nextTxId = HdfsServerConstants.INVALID_TXID;
   private long highestWrittenTxId = 0;
   
@@ -133,6 +139,8 @@ public class Journal implements Closeable {
   
   private final FileJournalManager fjm;
 
+  private final JournaledEditsCache cache;
+
   private final JournalMetrics metrics;
 
   private long lastJournalTimestamp = 0;
@@ -156,6 +164,13 @@ public class Journal implements Closeable {
     refreshCachedData();
     
     this.fjm = storage.getJournalManager();
+
+    if (conf.getBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+        DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT)) {
+      this.cache = new JournaledEditsCache(conf);
+    } else {
+      this.cache = null;
+    }
     
     this.metrics = JournalMetrics.create(this);
     
@@ -361,6 +376,7 @@ public class Journal implements Closeable {
     curSegment.abort();
     curSegment = null;
     curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+    curSegmentLayoutVersion = 0;
   }
 
   /**
@@ -406,6 +422,9 @@ public class Journal implements Closeable {
       LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
           " ; journal id: " + journalId);
     }
+    if (cache != null) {
+      cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
+    }
 
     // If the edit has already been marked as committed, we know
     // it has been fsynced on a quorum of other nodes, and we are
@@ -593,6 +612,7 @@ public class Journal implements Closeable {
     
     curSegment = fjm.startLogSegment(txid, layoutVersion);
     curSegmentTxId = txid;
+    curSegmentLayoutVersion = layoutVersion;
     nextTxId = txid;
   }
   
@@ -612,6 +632,7 @@ public class Journal implements Closeable {
         curSegment.close();
         curSegment = null;
         curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+        curSegmentLayoutVersion = 0;
       }
       
       checkSync(nextTxId == endTxId + 1,
@@ -713,6 +734,44 @@ public class Journal implements Closeable {
   }
 
   /**
+   * @see QJournalProtocol#getJournaledEdits(String, String, long, int)
+   */
+  public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId,
+      int maxTxns) throws IOException {
+    if (cache == null) {
+      throw new IOException("The journal edits cache is not enabled, which " +
+          "is a requirement to fetch journaled edits via RPC. Please enable " +
+          "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
+    }
+    if (sinceTxId > getHighestWrittenTxId()) {
+      // Requested edits that don't exist yet; short-circuit the cache here
+      metrics.rpcEmptyResponses.incr();
+      return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+    }
+    try {
+      List<ByteBuffer> buffers = new ArrayList<>();
+      int txnCount = cache.retrieveEdits(sinceTxId, maxTxns, buffers);
+      int totalSize = 0;
+      for (ByteBuffer buf : buffers) {
+        totalSize += buf.remaining();
+      }
+      metrics.txnsServedViaRpc.incr(txnCount);
+      metrics.bytesServedViaRpc.incr(totalSize);
+      ByteString.Output output = ByteString.newOutput(totalSize);
+      for (ByteBuffer buf : buffers) {
+        output.write(buf.array(), buf.position(), buf.remaining());
+      }
+      return GetJournaledEditsResponseProto.newBuilder()
+          .setTxnCount(txnCount)
+          .setEditLog(output.toByteString())
+          .build();
+    } catch (JournaledEditsCache.CacheMissException cme) {
+      metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount());
+      throw cme;
+    }
+  }
+
+  /**
    * @return the current state of the given segment, or null if the
    * segment does not exist.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
index fcfd901..7d271f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+
 
 /**
  * The server-side metrics for a journal from the JournalNode's
@@ -42,7 +44,23 @@ class JournalMetrics {
   
   @Metric("Number of bytes written since startup")
   MutableCounterLong bytesWritten;
-  
+
+  @Metric("Number of txns served via RPC")
+  MutableCounterLong txnsServedViaRpc;
+
+  @Metric("Number of bytes served via RPC")
+  MutableCounterLong bytesServedViaRpc;
+
+  @Metric
+  MutableStat rpcRequestCacheMissAmount = new MutableStat(
+      "RpcRequestCacheMissAmount", "Number of RPC requests unable to be " +
+      "served due to lack of availability in cache, and how many " +
+      "transactions away the request was from being in the cache.",
+      "Misses", "Txns");
+
+  @Metric("Number of RPC requests with zero edits returned")
+  MutableCounterLong rpcEmptyResponses;
+
   @Metric("Number of batches written where this node was lagging")
   MutableCounterLong batchesWrittenWhileLagging;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index bfa9a22..880b8c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -235,6 +236,13 @@ public class JournalNodeRpcServer implements QJournalProtocol,
   }
 
   @Override
+  public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+      String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+    return jn.getOrCreateJournal(jid, nameServiceId)
+        .getJournaledEdits(sinceTxId, maxTxns);
+  }
+
+  @Override
   public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
       long segmentTxId) throws IOException {
     return jn.getOrCreateJournal(reqInfo.getJournalId(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
index 625966f..b4d2b31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
@@ -286,6 +286,21 @@ message GetEditLogManifestResponseProto {
 }
 
 /**
+ * getJournaledEdits()
+ */
+message GetJournaledEditsRequestProto {
+  required JournalIdProto jid = 1;
+  required uint64 sinceTxId = 2;
+  required uint32 maxTxns = 3;
+  optional string nameServiceId = 4;
+}
+
+message GetJournaledEditsResponseProto {
+  required uint32 txnCount = 1;
+  optional bytes editLog = 2;
+}
+
+/**
  * prepareRecovery()
  */
 message PrepareRecoveryRequestProto {
@@ -364,6 +379,9 @@ service QJournalProtocolService {
   rpc getEditLogManifest(GetEditLogManifestRequestProto)
       returns (GetEditLogManifestResponseProto);
 
+  rpc getJournaledEdits(GetJournaledEditsRequestProto)
+  returns (GetJournaledEditsResponseProto);
+
   rpc prepareRecovery(PrepareRecoveryRequestProto)
       returns (PrepareRecoveryResponseProto);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
index b8d2652..2f51275 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
@@ -17,19 +17,25 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.primitives.Bytes;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -38,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
@@ -71,6 +78,8 @@ public class TestJournal {
   public void setup() throws Exception {
     FileUtil.fullyDelete(TEST_LOG_DIR);
     conf = new Configuration();
+    // Enable fetching edits via RPC
+    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
     journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
       mockErrorReporter);
     journal.format(FAKE_NSINFO, false);
@@ -435,6 +444,44 @@ public class TestJournal {
   }
 
   @Test
+  public void testReadFromCache() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(makeRI(1), 1,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5));
+    journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5));
+    journal.journal(makeRI(4), 1, 11, 5, QJMTestUtil.createTxnData(11, 5));
+    assertJournaledEditsTxnCountAndContents(1, 7, 7,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+    assertJournaledEditsTxnCountAndContents(1, 30, 15,
+        NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+
+    journal.finalizeLogSegment(makeRI(5), 1, 15);
+    int newLayoutVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
+    journal.startLogSegment(makeRI(6), 16, newLayoutVersion);
+    journal.journal(makeRI(7), 16, 16, 5, QJMTestUtil.createTxnData(16, 5));
+
+    assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion);
+  }
+
+  private void assertJournaledEditsTxnCountAndContents(int startTxn,
+      int requestedMaxTxns, int expectedEndTxn, int layoutVersion)
+      throws Exception {
+    GetJournaledEditsResponseProto result =
+        journal.getJournaledEdits(startTxn, requestedMaxTxns);
+    int expectedTxnCount = expectedEndTxn - startTxn + 1;
+    ByteArrayOutputStream headerBytes = new ByteArrayOutputStream();
+    EditLogFileOutputStream.writeHeader(layoutVersion,
+        new DataOutputStream(headerBytes));
+    assertEquals(expectedTxnCount, result.getTxnCount());
+    assertArrayEquals(
+        Bytes.concat(
+            headerBytes.toByteArray(),
+            QJMTestUtil.createTxnData(startTxn, expectedTxnCount)),
+        result.getEditLog().toByteArray());
+  }
+
+  @Test
   public void testFormatNonEmptyStorageDirectoriesWhenforceOptionIsTrue()
       throws Exception {
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org