You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:11 UTC
[hadoop] 08/50: HDFS-13608. [SBN read] Edit Tail Fast Path Part 2:
Add ability for JournalNode to serve edits via RPC. Contributed by Erik
Krogen.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit b13110a6f018974d241e67cafc03ebca48e41d39
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed May 23 12:42:13 2018 -0700
HDFS-13608. [SBN read] Edit Tail Fast Path Part 2: Add ability for JournalNode to serve edits via RPC. Contributed by Erik Krogen.
---
.../hadoop-common/src/site/markdown/Metrics.md | 5 ++
.../hdfs/qjournal/protocol/QJournalProtocol.java | 24 ++++++++-
.../QJournalProtocolServerSideTranslatorPB.java | 14 +++++
.../protocolPB/QJournalProtocolTranslatorPB.java | 20 ++++++++
.../hadoop/hdfs/qjournal/server/Journal.java | 59 ++++++++++++++++++++++
.../hdfs/qjournal/server/JournalMetrics.java | 20 +++++++-
.../hdfs/qjournal/server/JournalNodeRpcServer.java | 8 +++
.../src/main/proto/QJournalProtocol.proto | 18 +++++++
.../hadoop/hdfs/qjournal/server/TestJournal.java | 47 +++++++++++++++++
9 files changed, 213 insertions(+), 2 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 3888e65..2351d97 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -325,6 +325,11 @@ The server-side metrics for a journal from the JournalNode's perspective. Each m
| `LastWrittenTxId` | The highest transaction id stored on this JournalNode |
| `LastPromisedEpoch` | The last epoch number which this node has promised not to accept any lower epoch, or 0 if no promises have been made |
| `LastJournalTimestamp` | The timestamp of last successfully written transaction |
+| `TxnsServedViaRpc` | Number of transactions served via the RPC mechanism |
+| `BytesServedViaRpc` | Number of bytes served via the RPC mechanism |
+| `RpcRequestCacheMissAmountNumMisses` | Number of RPC requests which could not be served due to lack of data in the cache |
+| `RpcRequestCacheMissAmountAvgTxns` | The average number of transactions by which a request missed the cache; for example if transaction ID 10 is requested and the cache's oldest transaction is ID 15, value 5 will be added to this average |
+| `RpcEmptyResponses` | Number of RPC requests with zero edits returned |
datanode
--------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
index 8dad261..5db055a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -139,7 +140,28 @@ public interface QJournalProtocol {
long sinceTxId,
boolean inProgressOk)
throws IOException;
-
+
+ /**
+ * Fetch edit logs present in the Journal's in-memory cache of edits
+ * ({@link org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache}).
+ * To enable this cache, in-progress edit log tailing must be enabled via the
+ * {@value DFSConfigKeys#DFS_HA_TAILEDITS_INPROGRESS_KEY} configuration key.
+ *
+ * @param jid The ID of the journal from which to fetch edits.
+ * @param nameServiceId The ID of the namespace for which to fetch edits.
+ * @param sinceTxId Fetch edits starting at this transaction ID
+ * @param maxTxns Request at most this many transactions to be returned
+ * @throws IOException If there was an issue encountered while fetching edits
+ * from the cache, including a cache miss (cache does not contain the
+ * requested edits). The caller should then attempt to fetch the edits via
+ * the streaming mechanism (starting with
+ * {@link #getEditLogManifest(String, String, long, boolean)}).
+ * @return Response containing serialized edits to be loaded
+ * @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache
+ */
+ GetJournaledEditsResponseProto getJournaledEdits(String jid,
+ String nameServiceId, long sinceTxId, int maxTxns) throws IOException;
+
/**
* Begin the recovery process for a given segment. See the HDFS-3077
* design document for details.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
index 2ad19da..e0324fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatReq
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -235,6 +237,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
}
}
+ @Override
+ public GetJournaledEditsResponseProto getJournaledEdits(
+ RpcController controller, GetJournaledEditsRequestProto request)
+ throws ServiceException {
+ try {
+ return impl.getJournaledEdits(request.getJid().getIdentifier(),
+ request.hasNameServiceId() ? request.getNameServiceId() : null,
+ request.getSinceTxId(), request.getMaxTxns());
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
@Override
public PrepareRecoveryResponseProto prepareRecovery(RpcController controller,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
index 42d35f5..4126b72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeL
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@@ -284,6 +286,24 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
+ public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+ String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+ try {
+ GetJournaledEditsRequestProto.Builder req =
+ GetJournaledEditsRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .setMaxTxns(maxTxns);
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
+ }
+ return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
+ } catch (ServiceException se) {
+ throw ProtobufHelper.getRemoteException(se);
+ }
+ }
+
+ @Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException {
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index cb72741..f95da45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;
+import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
@@ -24,9 +25,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -36,10 +39,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -85,6 +90,7 @@ public class Journal implements Closeable {
// Current writing state
private EditLogOutputStream curSegment;
private long curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+ private int curSegmentLayoutVersion = 0;
private long nextTxId = HdfsServerConstants.INVALID_TXID;
private long highestWrittenTxId = 0;
@@ -133,6 +139,8 @@ public class Journal implements Closeable {
private final FileJournalManager fjm;
+ private final JournaledEditsCache cache;
+
private final JournalMetrics metrics;
private long lastJournalTimestamp = 0;
@@ -156,6 +164,13 @@ public class Journal implements Closeable {
refreshCachedData();
this.fjm = storage.getJournalManager();
+
+ if (conf.getBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
+ DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT)) {
+ this.cache = new JournaledEditsCache(conf);
+ } else {
+ this.cache = null;
+ }
this.metrics = JournalMetrics.create(this);
@@ -361,6 +376,7 @@ public class Journal implements Closeable {
curSegment.abort();
curSegment = null;
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+ curSegmentLayoutVersion = 0;
}
/**
@@ -406,6 +422,9 @@ public class Journal implements Closeable {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
" ; journal id: " + journalId);
}
+ if (cache != null) {
+ cache.storeEdits(records, firstTxnId, lastTxnId, curSegmentLayoutVersion);
+ }
// If the edit has already been marked as committed, we know
// it has been fsynced on a quorum of other nodes, and we are
@@ -593,6 +612,7 @@ public class Journal implements Closeable {
curSegment = fjm.startLogSegment(txid, layoutVersion);
curSegmentTxId = txid;
+ curSegmentLayoutVersion = layoutVersion;
nextTxId = txid;
}
@@ -612,6 +632,7 @@ public class Journal implements Closeable {
curSegment.close();
curSegment = null;
curSegmentTxId = HdfsServerConstants.INVALID_TXID;
+ curSegmentLayoutVersion = 0;
}
checkSync(nextTxId == endTxId + 1,
@@ -713,6 +734,44 @@ public class Journal implements Closeable {
}
/**
+ * @see QJournalProtocol#getJournaledEdits(String, String, long, int)
+ */
+ public GetJournaledEditsResponseProto getJournaledEdits(long sinceTxId,
+ int maxTxns) throws IOException {
+ if (cache == null) {
+ throw new IOException("The journal edits cache is not enabled, which " +
+ "is a requirement to fetch journaled edits via RPC. Please enable " +
+ "it via " + DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY);
+ }
+ if (sinceTxId > getHighestWrittenTxId()) {
+ // Requested edits that don't exist yet; short-circuit the cache here
+ metrics.rpcEmptyResponses.incr();
+ return GetJournaledEditsResponseProto.newBuilder().setTxnCount(0).build();
+ }
+ try {
+ List<ByteBuffer> buffers = new ArrayList<>();
+ int txnCount = cache.retrieveEdits(sinceTxId, maxTxns, buffers);
+ int totalSize = 0;
+ for (ByteBuffer buf : buffers) {
+ totalSize += buf.remaining();
+ }
+ metrics.txnsServedViaRpc.incr(txnCount);
+ metrics.bytesServedViaRpc.incr(totalSize);
+ ByteString.Output output = ByteString.newOutput(totalSize);
+ for (ByteBuffer buf : buffers) {
+ output.write(buf.array(), buf.position(), buf.remaining());
+ }
+ return GetJournaledEditsResponseProto.newBuilder()
+ .setTxnCount(txnCount)
+ .setEditLog(output.toByteString())
+ .build();
+ } catch (JournaledEditsCache.CacheMissException cme) {
+ metrics.rpcRequestCacheMissAmount.add(cme.getCacheMissAmount());
+ throw cme;
+ }
+ }
+
+ /**
* @return the current state of the given segment, or null if the
* segment does not exist.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
index fcfd901..7d271f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableStat;
+
/**
* The server-side metrics for a journal from the JournalNode's
@@ -42,7 +44,23 @@ class JournalMetrics {
@Metric("Number of bytes written since startup")
MutableCounterLong bytesWritten;
-
+
+ @Metric("Number of txns served via RPC")
+ MutableCounterLong txnsServedViaRpc;
+
+ @Metric("Number of bytes served via RPC")
+ MutableCounterLong bytesServedViaRpc;
+
+ @Metric
+ MutableStat rpcRequestCacheMissAmount = new MutableStat(
+ "RpcRequestCacheMissAmount", "Number of RPC requests unable to be " +
+ "served due to lack of availability in cache, and how many " +
+ "transactions away the request was from being in the cache.",
+ "Misses", "Txns");
+
+ @Metric("Number of RPC requests with zero edits returned")
+ MutableCounterLong rpcEmptyResponses;
+
@Metric("Number of batches written where this node was lagging")
MutableCounterLong batchesWrittenWhileLagging;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index bfa9a22..880b8c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.InterQJournalProtocolService;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
@@ -235,6 +236,13 @@ public class JournalNodeRpcServer implements QJournalProtocol,
}
@Override
+ public GetJournaledEditsResponseProto getJournaledEdits(String jid,
+ String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
+ return jn.getOrCreateJournal(jid, nameServiceId)
+ .getJournaledEdits(sinceTxId, maxTxns);
+ }
+
+ @Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException {
return jn.getOrCreateJournal(reqInfo.getJournalId(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
index 625966f..b4d2b31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
@@ -286,6 +286,21 @@ message GetEditLogManifestResponseProto {
}
/**
+ * getJournaledEdits()
+ */
+message GetJournaledEditsRequestProto {
+ required JournalIdProto jid = 1;
+ required uint64 sinceTxId = 2;
+ required uint32 maxTxns = 3;
+ optional string nameServiceId = 4;
+}
+
+message GetJournaledEditsResponseProto {
+ required uint32 txnCount = 1;
+ optional bytes editLog = 2;
+}
+
+/**
* prepareRecovery()
*/
message PrepareRecoveryRequestProto {
@@ -364,6 +379,9 @@ service QJournalProtocolService {
rpc getEditLogManifest(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);
+ rpc getJournaledEdits(GetJournaledEditsRequestProto)
+ returns (GetJournaledEditsResponseProto);
+
rpc prepareRecovery(PrepareRecoveryRequestProto)
returns (PrepareRecoveryResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
index b8d2652..2f51275 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
@@ -17,19 +17,25 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import com.google.common.primitives.Bytes;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
@@ -38,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
@@ -71,6 +78,8 @@ public class TestJournal {
public void setup() throws Exception {
FileUtil.fullyDelete(TEST_LOG_DIR);
conf = new Configuration();
+ // Enable fetching edits via RPC
+ conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
journal = new Journal(conf, TEST_LOG_DIR, JID, StartupOption.REGULAR,
mockErrorReporter);
journal.format(FAKE_NSINFO, false);
@@ -435,6 +444,44 @@ public class TestJournal {
}
@Test
+ public void testReadFromCache() throws Exception {
+ journal.newEpoch(FAKE_NSINFO, 1);
+ journal.startLogSegment(makeRI(1), 1,
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+ journal.journal(makeRI(2), 1, 1, 5, QJMTestUtil.createTxnData(1, 5));
+ journal.journal(makeRI(3), 1, 6, 5, QJMTestUtil.createTxnData(6, 5));
+ journal.journal(makeRI(4), 1, 11, 5, QJMTestUtil.createTxnData(11, 5));
+ assertJournaledEditsTxnCountAndContents(1, 7, 7,
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+ assertJournaledEditsTxnCountAndContents(1, 30, 15,
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+
+ journal.finalizeLogSegment(makeRI(5), 1, 15);
+ int newLayoutVersion = NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION - 1;
+ journal.startLogSegment(makeRI(6), 16, newLayoutVersion);
+ journal.journal(makeRI(7), 16, 16, 5, QJMTestUtil.createTxnData(16, 5));
+
+ assertJournaledEditsTxnCountAndContents(16, 10, 20, newLayoutVersion);
+ }
+
+ private void assertJournaledEditsTxnCountAndContents(int startTxn,
+ int requestedMaxTxns, int expectedEndTxn, int layoutVersion)
+ throws Exception {
+ GetJournaledEditsResponseProto result =
+ journal.getJournaledEdits(startTxn, requestedMaxTxns);
+ int expectedTxnCount = expectedEndTxn - startTxn + 1;
+ ByteArrayOutputStream headerBytes = new ByteArrayOutputStream();
+ EditLogFileOutputStream.writeHeader(layoutVersion,
+ new DataOutputStream(headerBytes));
+ assertEquals(expectedTxnCount, result.getTxnCount());
+ assertArrayEquals(
+ Bytes.concat(
+ headerBytes.toByteArray(),
+ QJMTestUtil.createTxnData(startTxn, expectedTxnCount)),
+ result.getEditLog().toByteArray());
+ }
+
+ @Test
public void testFormatNonEmptyStorageDirectoriesWhenforceOptionIsTrue()
throws Exception {
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org