You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/08/23 18:48:38 UTC
[hadoop] branch branch-3.2 updated: HDFS-13977. Override
shouldForceSync in QuorumOutputStream to allow for proper auto-sync
behavior. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 2408c24 HDFS-13977. Override shouldForceSync in QuorumOutputStream to allow for proper auto-sync behavior. Contributed by Erik Krogen.
2408c24 is described below
commit 2408c2491f4507f92e4051b72eb0800e5e11f069
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Aug 16 11:11:46 2019 -0700
HDFS-13977. Override shouldForceSync in QuorumOutputStream to allow for proper auto-sync behavior. Contributed by Erik Krogen.
(cherry picked from d699022fce756d25956d33e022100111aa0dd22e)
---
.../hdfs/qjournal/client/QuorumJournalManager.java | 14 ++++-
.../hdfs/qjournal/client/QuorumOutputStream.java | 5 ++
.../hadoop/hdfs/server/namenode/FSEditLog.java | 3 +-
.../client/TestQuorumJournalManagerUnit.java | 61 +++++++++++++++++++++-
.../hdfs/server/namenode/FSImageTestUtil.java | 19 ++++++-
5 files changed, 97 insertions(+), 5 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index ce12b4f..674ca70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
@@ -104,7 +105,8 @@ public class QuorumJournalManager implements JournalManager {
private final AsyncLoggerSet loggers;
- private int outputBufferCapacity = 512 * 1024;
+ private static final int OUTPUT_BUFFER_CAPACITY_DEFAULT = 512 * 1024;
+ private int outputBufferCapacity;
private final URLConnectionFactory connectionFactory;
/** Limit logging about input stream selection to every 5 seconds max. */
@@ -189,6 +191,7 @@ public class QuorumJournalManager implements JournalManager {
DFSConfigKeys.DFS_QJOURNAL_HTTP_READ_TIMEOUT_DEFAULT);
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(connectTimeoutMs, readTimeoutMs, conf);
+ setOutputBufferCapacity(OUTPUT_BUFFER_CAPACITY_DEFAULT);
}
protected List<AsyncLogger> createLoggers(
@@ -447,6 +450,15 @@ public class QuorumJournalManager implements JournalManager {
@Override
public void setOutputBufferCapacity(int size) {
+ int ipcMaxDataLength = conf.getInt(
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+ if (size >= ipcMaxDataLength) {
+ throw new IllegalArgumentException("Attempted to use QJM output buffer "
+ + "capacity (" + size + ") greater than the IPC max data length ("
+ + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH + " = "
+ + ipcMaxDataLength + "). This will cause journals to reject edits.");
+ }
outputBufferCapacity = size;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
index 26f978a..a02c0f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
@@ -81,6 +81,11 @@ class QuorumOutputStream extends EditLogOutputStream {
}
@Override
+ public boolean shouldForceSync() {
+ return buf.shouldForceSync();
+ }
+
+ @Override
protected void flushAndSync(boolean durable) throws IOException {
int numReadyBytes = buf.countReadyBytes();
if (numReadyBytes > 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 0db1712..7b3f6a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -1802,7 +1802,8 @@ public class FSEditLog implements LogsPurgeable {
* @return The constructed journal manager
* @throws IllegalArgumentException if no class is configured for uri
*/
- private JournalManager createJournal(URI uri) {
+ @VisibleForTesting
+ JournalManager createJournal(URI uri) {
Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
index 837c7d9..28f07f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
@@ -29,9 +31,12 @@ import java.io.IOException;
import java.net.URI;
import java.util.List;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournaledEditsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
@@ -39,8 +44,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.event.Level;
import org.junit.Before;
@@ -58,6 +67,8 @@ import com.google.protobuf.ByteString;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeOp;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.createTxnData;
import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
/**
* True unit tests for QuorumJournalManager
@@ -107,7 +118,7 @@ public class TestQuorumJournalManagerUnit {
}
private AsyncLogger mockLogger() {
- return Mockito.mock(AsyncLogger.class);
+ return mock(AsyncLogger.class);
}
static <V> Stubber futureReturns(V value) {
@@ -204,7 +215,53 @@ public class TestQuorumJournalManagerUnit {
anyLong(), eq(3L), eq(1), Mockito.<byte[]>any());
stm.flush();
}
-
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSetOutputBufferCapacityTooLarge() throws Exception {
+ qjm.setOutputBufferCapacity(
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT + 1);
+ }
+
+ // Regression test for HDFS-13977
+ @Test
+ public void testFSEditLogAutoSyncToQuorumStream() throws Exception {
+ // Set the buffer capacity low to make it easy to fill it
+ qjm.setOutputBufferCapacity(512);
+
+ // Set up mocks
+ NNStorage mockStorage = mock(NNStorage.class);
+ createLogSegment(); // sets up to the mocks for startLogSegment
+ for (int logIdx = 0; logIdx < 3; logIdx++) {
+ futureReturns(null).when(spyLoggers.get(logIdx))
+ .sendEdits(anyLong(), anyLong(), anyInt(), any());
+ }
+ PermissionStatus permStat = PermissionStatus
+ .createImmutable("user", "group", FsPermission.getDefault());
+ INode fakeInode = FSImageTestUtil.createEmptyInodeFile(1, "foo",
+ permStat, 1, 1, (short) 1, 1);
+
+ // Create a fake FSEditLog using this QJM
+ String mockQjmEdits = "qjournal://mock/";
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mockQjmEdits);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, mockQjmEdits);
+ FSEditLog editLog = FSImageTestUtil.createEditLogWithJournalManager(
+ conf, mockStorage, URI.create(mockQjmEdits), qjm);
+
+ editLog.initJournalsForWrite();
+ editLog.startLogSegment(1, false,
+ NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
+ // Write enough edit ops that the output buffer capacity should fill and
+ // an auto-sync should be triggered
+ for (int i = 0; i < 12; i++) {
+ editLog.logMkDir("/fake/path", fakeInode);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ Mockito.verify(spyLoggers.get(i), times(1))
+ .sendEdits(eq(1L), eq(1L), anyInt(), any());
+ }
+ }
+
@Test
public void testWriteEditsOneSlow() throws Exception {
EditLogOutputStream stm = createLogSegment();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
index 883e43c..5d0bd54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java
@@ -29,6 +29,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -208,7 +209,23 @@ public abstract class FSImageTestUtil {
editLog.initJournalsForWrite();
return editLog;
}
-
+
+ public static INodeFile createEmptyInodeFile(long id, String name,
+ PermissionStatus permissions, long mtime, long atime, short replication,
+ long preferredBlockSize) {
+ return new INodeFile(id, name.getBytes(StandardCharsets.UTF_8),
+ permissions, mtime, atime, null, replication, preferredBlockSize);
+ }
+
+ public static FSEditLog createEditLogWithJournalManager(Configuration conf,
+ NNStorage storage, URI editsUri, final JournalManager manager) {
+ return new FSEditLog(conf, storage, ImmutableList.of(editsUri)) {
+ @Override
+ protected JournalManager createJournal(URI uri) {
+ return manager;
+ }
+ };
+ }
/**
* Create an aborted in-progress log in the given directory, containing
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org