You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/31 03:41:50 UTC
[3/4] hadoop git commit: HDFS-7192. DN should ignore lazyPersist hint
if the writer is not local. (Contributed by Arpit Agarwal)
HDFS-7192. DN should ignore lazyPersist hint if the writer is not local. (Contributed by Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88d8736d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88d8736d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88d8736d
Branch: refs/heads/HADOOP-12111
Commit: 88d8736ddeff10a03acaa99a9a0ee99dcfabe590
Parents: 91b42e7
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Jul 30 13:16:46 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jul 30 13:16:46 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../hadoop/hdfs/server/datanode/DNConf.java | 13 ++
.../hdfs/server/datanode/DataXceiver.java | 57 ++++--
.../TestDataXceiverLazyPersistHint.java | 178 +++++++++++++++++++
5 files changed, 242 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e7af2cb..7f04125 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -762,6 +762,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8816. Improve visualization for the Datanode tab in the NN UI. (wheat9)
+ HDFS-7192. DN should ignore lazyPersist hint if the writer is
+ not local. (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0e569f0..1e5bf0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -96,6 +96,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+ public static final String DFS_DATANODE_NON_LOCAL_LAZY_PERSIST =
+ "dfs.datanode.non.local.lazy.persist";
+ public static final boolean DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT =
+ false;
// This setting is for testing/internal use only.
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 42b1b46..abc9390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@@ -99,6 +101,9 @@ public class DNConf {
final long maxLockedMemory;
+ // Allow LAZY_PERSIST writes from non-local clients?
+ private final boolean allowNonLocalLazyPersist;
+
public DNConf(Configuration conf) {
this.conf = conf;
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -192,6 +197,10 @@ public class DNConf {
this.restartReplicaExpiry = conf.getLong(
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
+
+ this.allowNonLocalLazyPersist = conf.getBoolean(
+ DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
+ DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -265,4 +274,8 @@ public class DNConf {
public boolean getIgnoreSecurePortsForTesting() {
return ignoreSecurePortsForTesting;
}
+
+ public boolean getAllowNonLocalLazyPersist() {
+ return allowNonLocalLazyPersist;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 26d669c..089b7cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -46,11 +46,11 @@ import java.nio.channels.ClosedChannelException;
import java.security.MessageDigest;
import java.util.Arrays;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -620,7 +620,7 @@ class DataXceiver extends Receiver implements Runnable {
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
- final boolean allowLazyPersist,
+ boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname;
@@ -629,6 +629,8 @@ class DataXceiver extends Receiver implements Runnable {
final boolean isClient = !isDatanode;
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
+ allowLazyPersist = allowLazyPersist &&
+ (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
long size = 0;
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
@@ -661,10 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
+ localAddress);
// reply to upstream datanode or client
- final DataOutputStream replyOut = new DataOutputStream(
- new BufferedOutputStream(
- getOutputStream(),
- smallBufferSize));
+ final DataOutputStream replyOut = getBufferedOutputStream();
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
@@ -679,7 +678,7 @@ class DataXceiver extends Receiver implements Runnable {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
- blockReceiver = new BlockReceiver(block, storageType, in,
+ blockReceiver = getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
@@ -726,19 +725,18 @@ class DataXceiver extends Receiver implements Runnable {
smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn);
- // Do not propagate allowLazyPersist to downstream DataNodes.
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
- false, targetPinnings[0], targetPinnings);
+ allowLazyPersist, targetPinnings[0], targetPinnings);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
- false, false, targetPinnings);
+ allowLazyPersist, false, targetPinnings);
}
mirrorOut.flush();
@@ -853,8 +851,8 @@ class DataXceiver extends Receiver implements Runnable {
}
//update metrics
- datanode.metrics.addWriteBlockOp(elapsed());
- datanode.metrics.incrWritesFromClient(peer.isLocal(), size);
+ datanode.getMetrics().addWriteBlockOp(elapsed());
+ datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
}
@Override
@@ -1161,7 +1159,7 @@ class DataXceiver extends Receiver implements Runnable {
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(block, storageType,
+ blockReceiver = getBlockReceiver(block, storageType,
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
@@ -1216,6 +1214,39 @@ class DataXceiver extends Receiver implements Runnable {
datanode.metrics.addReplaceBlockOp(elapsed());
}
+
+ /**
+ * Separated for testing.
+ */
+ @VisibleForTesting
+ BlockReceiver getBlockReceiver(
+ final ExtendedBlock block, final StorageType storageType,
+ final DataInputStream in,
+ final String inAddr, final String myAddr,
+ final BlockConstructionStage stage,
+ final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
+ final String clientname, final DatanodeInfo srcDataNode,
+ final DataNode dn, DataChecksum requestedChecksum,
+ CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist,
+ final boolean pinning) throws IOException {
+ return new BlockReceiver(block, storageType, in,
+ inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
+ clientname, srcDataNode, dn, requestedChecksum,
+ cachingStrategy, allowLazyPersist, pinning);
+ }
+
+ /**
+ * Separated for testing.
+ * @return
+ */
+ @VisibleForTesting
+ DataOutputStream getBufferedOutputStream() {
+ return new DataOutputStream(
+ new BufferedOutputStream(getOutputStream(), smallBufferSize));
+ }
+
+
private long elapsed() {
return monotonicNow() - opStartTime;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
new file mode 100644
index 0000000..d8a7188
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.net.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.util.DataChecksum;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.ArgumentCaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Mock-based unit test to verify that the DataXceiver correctly handles the
+ * LazyPersist hint from clients.
+ */
+public class TestDataXceiverLazyPersistHint {
+ @Rule
+ public Timeout timeout = new Timeout(300000);
+
+ private enum PeerLocality {
+ LOCAL,
+ REMOTE
+ }
+
+ private enum NonLocalLazyPersist {
+ ALLOWED,
+ NOT_ALLOWED
+ }
+
+ /**
+ * Ensure that the correct hint is passed to the block receiver when
+ * the client is local.
+ */
+ @Test
+ public void testWithLocalClient() throws IOException {
+ ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+ DataXceiver xceiver = makeStubDataXceiver(
+ PeerLocality.LOCAL, NonLocalLazyPersist.NOT_ALLOWED, captor);
+
+ for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+ issueWriteBlockCall(xceiver, lazyPersistSetting);
+ assertThat(captor.getValue(), is(lazyPersistSetting));
+ }
+ }
+
+ /**
+ * Ensure that hint is always false when the client is remote.
+ */
+ @Test
+ public void testWithRemoteClient() throws IOException {
+ ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+ DataXceiver xceiver = makeStubDataXceiver(
+ PeerLocality.REMOTE, NonLocalLazyPersist.NOT_ALLOWED, captor);
+
+ for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+ issueWriteBlockCall(xceiver, lazyPersistSetting);
+ assertThat(captor.getValue(), is(false));
+ }
+ }
+
+ /**
+ * Ensure that the correct hint is passed to the block receiver when
+ * the client is remote AND dfs.datanode.allow.non.local.lazy.persist
+ * is set to true.
+ */
+ @Test
+ public void testOverrideWithRemoteClient() throws IOException {
+ ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+ DataXceiver xceiver = makeStubDataXceiver(
+ PeerLocality.REMOTE, NonLocalLazyPersist.ALLOWED, captor);
+
+ for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+ issueWriteBlockCall(xceiver, lazyPersistSetting);
+ assertThat(captor.getValue(), is(lazyPersistSetting));
+ }
+ }
+
+ /**
+ * Issue a write block call with dummy parameters. The only parameter useful
+ * for this test is the value of lazyPersist.
+ */
+ private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist)
+ throws IOException {
+ xceiver.writeBlock(
+ new ExtendedBlock("Dummy-pool", 0L),
+ StorageType.RAM_DISK,
+ null,
+ "Dummy-Client",
+ new DatanodeInfo[0],
+ new StorageType[0],
+ mock(DatanodeInfo.class),
+ BlockConstructionStage.PIPELINE_SETUP_CREATE,
+ 0, 0, 0, 0,
+ DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
+ CachingStrategy.newDefaultStrategy(),
+ lazyPersist,
+ false, null);
+ }
+
+ // Helper functions to setup the mock objects.
+
+ private static DataXceiver makeStubDataXceiver(
+ PeerLocality locality,
+ NonLocalLazyPersist nonLocalLazyPersist,
+ final ArgumentCaptor<Boolean> captor) throws IOException {
+ DataXceiver xceiverSpy = spy(DataXceiver.create(
+ getMockPeer(locality),
+ getMockDn(nonLocalLazyPersist),
+ mock(DataXceiverServer.class)));
+
+ doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver(
+ any(ExtendedBlock.class), any(StorageType.class),
+ any(DataInputStream.class), anyString(), anyString(),
+ any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
+ anyString(), any(DatanodeInfo.class), any(DataNode.class),
+ any(DataChecksum.class), any(CachingStrategy.class),
+ captor.capture(), anyBoolean());
+ doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
+ .getBufferedOutputStream();
+ return xceiverSpy;
+ }
+
+ private static Peer getMockPeer(PeerLocality locality) {
+ Peer peer = mock(Peer.class);
+ when(peer.isLocal()).thenReturn(locality == PeerLocality.LOCAL);
+ when(peer.getRemoteAddressString()).thenReturn("1.1.1.1:1000");
+ when(peer.getLocalAddressString()).thenReturn("2.2.2.2:2000");
+ return peer;
+ }
+
+ private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean(
+ DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
+ nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
+ DNConf dnConf = new DNConf(conf);
+ DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
+ DataNode mockDn = mock(DataNode.class);
+ when(mockDn.getDnConf()).thenReturn(dnConf);
+ when(mockDn.getConf()).thenReturn(conf);
+ when(mockDn.getMetrics()).thenReturn(mockMetrics);
+ return mockDn;
+ }
+}