You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/31 03:41:50 UTC

[3/4] hadoop git commit: HDFS-7192. DN should ignore lazyPersist hint if the writer is not local. (Contributed by Arpit Agarwal)

HDFS-7192. DN should ignore lazyPersist hint if the writer is not local. (Contributed by Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88d8736d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88d8736d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88d8736d

Branch: refs/heads/HADOOP-12111
Commit: 88d8736ddeff10a03acaa99a9a0ee99dcfabe590
Parents: 91b42e7
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Jul 30 13:16:46 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jul 30 13:16:46 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../hadoop/hdfs/server/datanode/DNConf.java     |  13 ++
 .../hdfs/server/datanode/DataXceiver.java       |  57 ++++--
 .../TestDataXceiverLazyPersistHint.java         | 178 +++++++++++++++++++
 5 files changed, 242 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e7af2cb..7f04125 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -762,6 +762,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8816. Improve visualization for the Datanode tab in the NN UI. (wheat9)
 
+    HDFS-7192. DN should ignore lazyPersist hint if the writer is
+    not local. (Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0e569f0..1e5bf0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -96,6 +96,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final Class<RamDiskReplicaLruTracker>  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
   public static final String  DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
   public static final int     DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
+  public static final String DFS_DATANODE_NON_LOCAL_LAZY_PERSIST =
+      "dfs.datanode.non.local.lazy.persist";
+  public static final boolean DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT =
+      false;
 
   // This setting is for testing/internal use only.
   public static final String  DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 42b1b46..abc9390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
@@ -99,6 +101,9 @@ public class DNConf {
 
   final long maxLockedMemory;
 
+  // Allow LAZY_PERSIST writes from non-local clients?
+  private final boolean allowNonLocalLazyPersist;
+
   public DNConf(Configuration conf) {
     this.conf = conf;
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -192,6 +197,10 @@ public class DNConf {
     this.restartReplicaExpiry = conf.getLong(
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
         DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
+
+    this.allowNonLocalLazyPersist = conf.getBoolean(
+        DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
+        DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT);
   }
 
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -265,4 +274,8 @@ public class DNConf {
   public boolean getIgnoreSecurePortsForTesting() {
     return ignoreSecurePortsForTesting;
   }
+
+  public boolean getAllowNonLocalLazyPersist() {
+    return allowNonLocalLazyPersist;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 26d669c..089b7cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -46,11 +46,11 @@ import java.nio.channels.ClosedChannelException;
 import java.security.MessageDigest;
 import java.util.Arrays;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -620,7 +620,7 @@ class DataXceiver extends Receiver implements Runnable {
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist,
+      boolean allowLazyPersist,
       final boolean pinning,
       final boolean[] targetPinnings) throws IOException {
     previousOpClientName = clientname;
@@ -629,6 +629,8 @@ class DataXceiver extends Receiver implements Runnable {
     final boolean isClient = !isDatanode;
     final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
         || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+    allowLazyPersist = allowLazyPersist &&
+        (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
     long size = 0;
     // check single target for transfer-RBW/Finalized 
     if (isTransfer && targets.length > 0) {
@@ -661,10 +663,7 @@ class DataXceiver extends Receiver implements Runnable {
         + localAddress);
 
     // reply to upstream datanode or client 
-    final DataOutputStream replyOut = new DataOutputStream(
-        new BufferedOutputStream(
-            getOutputStream(),
-            smallBufferSize));
+    final DataOutputStream replyOut = getBufferedOutputStream();
     checkAccess(replyOut, isClient, block, blockToken,
         Op.WRITE_BLOCK, BlockTokenIdentifier.AccessMode.WRITE);
 
@@ -679,7 +678,7 @@ class DataXceiver extends Receiver implements Runnable {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         // open a block receiver
-        blockReceiver = new BlockReceiver(block, storageType, in,
+        blockReceiver = getBlockReceiver(block, storageType, in,
             peer.getRemoteAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
@@ -726,19 +725,18 @@ class DataXceiver extends Receiver implements Runnable {
               smallBufferSize));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
-          // Do not propagate allowLazyPersist to downstream DataNodes.
           if (targetPinnings != null && targetPinnings.length > 0) {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
               latestGenerationStamp, requestedChecksum, cachingStrategy,
-              false, targetPinnings[0], targetPinnings);
+                allowLazyPersist, targetPinnings[0], targetPinnings);
           } else {
             new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
               latestGenerationStamp, requestedChecksum, cachingStrategy,
-              false, false, targetPinnings);
+                allowLazyPersist, false, targetPinnings);
           }
 
           mirrorOut.flush();
@@ -853,8 +851,8 @@ class DataXceiver extends Receiver implements Runnable {
     }
 
     //update metrics
-    datanode.metrics.addWriteBlockOp(elapsed());
-    datanode.metrics.incrWritesFromClient(peer.isLocal(), size);
+    datanode.getMetrics().addWriteBlockOp(elapsed());
+    datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
   }
 
   @Override
@@ -1161,7 +1159,7 @@ class DataXceiver extends Receiver implements Runnable {
         DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
             checksumInfo.getChecksum());
         // open a block receiver and check if the block does not exist
-        blockReceiver = new BlockReceiver(block, storageType,
+        blockReceiver = getBlockReceiver(block, storageType,
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
@@ -1216,6 +1214,39 @@ class DataXceiver extends Receiver implements Runnable {
     datanode.metrics.addReplaceBlockOp(elapsed());
   }
 
+
+  /**
+   * Separated for testing.
+   */
+  @VisibleForTesting
+  BlockReceiver getBlockReceiver(
+      final ExtendedBlock block, final StorageType storageType,
+      final DataInputStream in,
+      final String inAddr, final String myAddr,
+      final BlockConstructionStage stage,
+      final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
+      final String clientname, final DatanodeInfo srcDataNode,
+      final DataNode dn, DataChecksum requestedChecksum,
+      CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist,
+      final boolean pinning) throws IOException {
+    return new BlockReceiver(block, storageType, in,
+        inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
+        clientname, srcDataNode, dn, requestedChecksum,
+        cachingStrategy, allowLazyPersist, pinning);
+  }
+
+  /**
+   * Separated for testing.
+   * @return
+   */
+  @VisibleForTesting
+  DataOutputStream getBufferedOutputStream() {
+    return new DataOutputStream(
+        new BufferedOutputStream(getOutputStream(), smallBufferSize));
+  }
+
+
   private long elapsed() {
     return monotonicNow() - opStartTime;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88d8736d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
new file mode 100644
index 0000000..d8a7188
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.net.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
+import org.apache.hadoop.util.DataChecksum;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.ArgumentCaptor;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Mock-based unit test to verify that the DataXceiver correctly handles the
+ * LazyPersist hint from clients.
+ */
+public class TestDataXceiverLazyPersistHint {
+  @Rule
+  public Timeout timeout = new Timeout(300000);
+
+  private enum PeerLocality {
+    LOCAL,
+    REMOTE
+  }
+
+  private enum NonLocalLazyPersist {
+    ALLOWED,
+    NOT_ALLOWED
+  }
+
+  /**
+   * Ensure that the correct hint is passed to the block receiver when
+   * the client is local.
+   */
+  @Test
+  public void testWithLocalClient() throws IOException {
+    ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+    DataXceiver xceiver = makeStubDataXceiver(
+        PeerLocality.LOCAL, NonLocalLazyPersist.NOT_ALLOWED, captor);
+
+    for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+      issueWriteBlockCall(xceiver, lazyPersistSetting);
+      assertThat(captor.getValue(), is(lazyPersistSetting));
+    }
+  }
+
+  /**
+   * Ensure that hint is always false when the client is remote.
+   */
+  @Test
+  public void testWithRemoteClient() throws IOException {
+    ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+    DataXceiver xceiver = makeStubDataXceiver(
+        PeerLocality.REMOTE, NonLocalLazyPersist.NOT_ALLOWED, captor);
+
+    for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+      issueWriteBlockCall(xceiver, lazyPersistSetting);
+      assertThat(captor.getValue(), is(false));
+    }
+  }
+
+  /**
+   * Ensure that the correct hint is passed to the block receiver when
+   * the client is remote AND dfs.datanode.allow.non.local.lazy.persist
+   * is set to true.
+   */
+  @Test
+  public void testOverrideWithRemoteClient() throws IOException {
+    ArgumentCaptor<Boolean> captor = ArgumentCaptor.forClass(Boolean.class);
+    DataXceiver xceiver = makeStubDataXceiver(
+        PeerLocality.REMOTE, NonLocalLazyPersist.ALLOWED, captor);
+
+    for (Boolean lazyPersistSetting : Arrays.asList(true, false)) {
+      issueWriteBlockCall(xceiver, lazyPersistSetting);
+      assertThat(captor.getValue(), is(lazyPersistSetting));
+    }
+  }
+
+  /**
+   * Issue a write block call with dummy parameters. The only parameter useful
+   * for this test is the value of lazyPersist.
+   */
+  private void issueWriteBlockCall(DataXceiver xceiver, boolean lazyPersist)
+      throws IOException {
+    xceiver.writeBlock(
+        new ExtendedBlock("Dummy-pool", 0L),
+        StorageType.RAM_DISK,
+        null,
+        "Dummy-Client",
+        new DatanodeInfo[0],
+        new StorageType[0],
+        mock(DatanodeInfo.class),
+        BlockConstructionStage.PIPELINE_SETUP_CREATE,
+        0, 0, 0, 0,
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0),
+        CachingStrategy.newDefaultStrategy(),
+        lazyPersist,
+        false, null);
+  }
+
+  // Helper functions to setup the mock objects.
+
+  private static DataXceiver makeStubDataXceiver(
+      PeerLocality locality,
+      NonLocalLazyPersist nonLocalLazyPersist,
+      final ArgumentCaptor<Boolean> captor) throws IOException {
+    DataXceiver xceiverSpy = spy(DataXceiver.create(
+            getMockPeer(locality),
+            getMockDn(nonLocalLazyPersist),
+            mock(DataXceiverServer.class)));
+
+    doReturn(mock(BlockReceiver.class)).when(xceiverSpy).getBlockReceiver(
+        any(ExtendedBlock.class), any(StorageType.class),
+        any(DataInputStream.class), anyString(), anyString(),
+        any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(),
+        anyString(), any(DatanodeInfo.class), any(DataNode.class),
+        any(DataChecksum.class), any(CachingStrategy.class),
+        captor.capture(), anyBoolean());
+    doReturn(mock(DataOutputStream.class)).when(xceiverSpy)
+        .getBufferedOutputStream();
+    return xceiverSpy;
+  }
+
+  private static Peer getMockPeer(PeerLocality locality) {
+    Peer peer = mock(Peer.class);
+    when(peer.isLocal()).thenReturn(locality == PeerLocality.LOCAL);
+    when(peer.getRemoteAddressString()).thenReturn("1.1.1.1:1000");
+    when(peer.getLocalAddressString()).thenReturn("2.2.2.2:2000");
+    return peer;
+  }
+
+  private static DataNode getMockDn(NonLocalLazyPersist nonLocalLazyPersist) {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        DFS_DATANODE_NON_LOCAL_LAZY_PERSIST,
+        nonLocalLazyPersist == NonLocalLazyPersist.ALLOWED);
+    DNConf dnConf = new DNConf(conf);
+    DataNodeMetrics mockMetrics = mock(DataNodeMetrics.class);
+    DataNode mockDn = mock(DataNode.class);
+    when(mockDn.getDnConf()).thenReturn(dnConf);
+    when(mockDn.getConf()).thenReturn(conf);
+    when(mockDn.getMetrics()).thenReturn(mockMetrics);
+    return mockDn;
+  }
+}