You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/06/19 20:46:43 UTC

[hadoop-ozone] branch master updated: HDDS-3794. Topology Aware read does not work correctly in XceiverClientGrpc (#1078)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2f9f4  HDDS-3794. Topology Aware read does not work correctly in XceiverClientGrpc (#1078)
8b2f9f4 is described below

commit 8b2f9f41eeec9ed29ebc45adfe11a713f29bdc28
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Jun 19 21:46:31 2020 +0100

    HDDS-3794. Topology Aware read does not work correctly in XceiverClientGrpc (#1078)
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  21 ++-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    | 198 +++++++++++++++++++++
 2 files changed, 211 insertions(+), 8 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 28d4491..4adfa85 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -298,7 +298,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     List<DatanodeDetails> datanodeList = null;
 
     DatanodeBlockID blockID = null;
-    if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
+    if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
+      blockID = request.getGetBlock().getBlockID();
+    } else if  (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
       blockID = request.getReadChunk().getBlockID();
     } else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
       blockID = request.getGetSmallFile().getBlock().getBlockID();
@@ -314,15 +316,17 @@ public class XceiverClientGrpc extends XceiverClientSpi {
           // Pull the Cached DN to the top of the DN list
           Collections.swap(datanodeList, 0, getBlockDNCacheIndex);
         }
-      } else if (topologyAwareRead) {
-        datanodeList = pipeline.getNodesInOrder();
       }
     }
     if (datanodeList == null) {
-      datanodeList = pipeline.getNodes();
-      // Shuffle datanode list so that clients do not read in the same order
-      // every time.
-      Collections.shuffle(datanodeList);
+      if (topologyAwareRead) {
+        datanodeList = pipeline.getNodesInOrder();
+      } else {
+        datanodeList = pipeline.getNodes();
+        // Shuffle datanode list so that clients do not read in the same order
+        // every time.
+        Collections.shuffle(datanodeList);
+      }
     }
 
     for (DatanodeDetails dn : datanodeList) {
@@ -421,7 +425,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     }
   }
 
-  private XceiverClientReply sendCommandAsync(
+  @VisibleForTesting
+  public XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request, DatanodeDetails dn)
       throws IOException, InterruptedException {
     checkOpen(dn, request.getEncodedToken());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
new file mode 100644
index 0000000..32a0b1d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for TestXceiverClientGrpc, to ensure topology aware reads work
+ * select the closest node, and connections are re-used after a getBlock call.
+ */
+public class TestXceiverClientGrpc {
+
+  private Pipeline pipeline;
+  private List<DatanodeDetails> dns;
+  private List<DatanodeDetails> dnsInOrder;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+
+  @Before
+  public void setup() {
+    dns = new ArrayList<>();
+    dns.add(MockDatanodeDetails.randomDatanodeDetails());
+    dns.add(MockDatanodeDetails.randomDatanodeDetails());
+    dns.add(MockDatanodeDetails.randomDatanodeDetails());
+
+    dnsInOrder = new ArrayList<>();
+    for (int i=2; i>=0; i--) {
+      dnsInOrder.add(dns.get(i));
+    }
+
+    pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setNodes(dns)
+        .build();
+    pipeline.setNodesInOrder(dnsInOrder);
+  }
+
+  @Test
+  public void testCorrectDnsReturnedFromPipeline() throws IOException {
+    Assert.assertEquals(dnsInOrder.get(0), pipeline.getClosestNode());
+    Assert.assertEquals(dns.get(0), pipeline.getFirstNode());
+    Assert.assertNotEquals(dns.get(0), dnsInOrder.get(0));
+  }
+
+  @Test(timeout=5000)
+  public void testRandomFirstNodeIsCommandTarget() throws IOException {
+    final ArrayList<DatanodeDetails> allDNs = new ArrayList<>(dns);
+    // Using a new Xceiver Client, call it repeatedly until all DNs in the
+    // pipeline have been the target of the command, indicating it is shuffling
+    // the DNs on each call with a new client. This test will timeout if this
+    // is not happening.
+    while(allDNs.size() > 0) {
+      XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+        @Override
+        public XceiverClientReply sendCommandAsync(
+            ContainerProtos.ContainerCommandRequestProto request,
+            DatanodeDetails dn) {
+          allDNs.remove(dn);
+          return buildValidResponse();
+        }
+      };
+      invokeXceiverClientGetBlock(client);
+    }
+  }
+
+  @Test
+  public void testFirstNodeIsCorrectWithTopologyForCommandTarget()
+      throws IOException {
+    final Set<DatanodeDetails> seenDNs = new HashSet<>();
+    conf.setBoolean(
+        OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+    // With a new Client, make 100 calls and ensure the first sortedDN is used
+    // each time. The logic should always use the sorted node, so we can check
+    // only a single DN is ever seen after 100 calls.
+    for (int i=0; i<100; i++) {
+      XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+        @Override
+        public XceiverClientReply sendCommandAsync(
+            ContainerProtos.ContainerCommandRequestProto request,
+            DatanodeDetails dn) {
+          seenDNs.add(dn);
+          return buildValidResponse();
+        }
+      };
+      invokeXceiverClientGetBlock(client);
+    }
+    Assert.assertEquals(1, seenDNs.size());
+  }
+
+  @Test
+  public void testConnectionReusedAfterGetBlock() throws IOException {
+    // With a new Client, make 100 calls. On each call, ensure that only one
+    // DN is seen, indicating the same DN connection is reused.
+    for (int i=0; i<100; i++) {
+      final Set<DatanodeDetails> seenDNs = new HashSet<>();
+      XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+        @Override
+        public XceiverClientReply sendCommandAsync(
+            ContainerProtos.ContainerCommandRequestProto request,
+            DatanodeDetails dn) {
+          seenDNs.add(dn);
+          return buildValidResponse();
+        }
+      };
+      invokeXceiverClientGetBlock(client);
+      invokeXceiverClientGetBlock(client);
+      invokeXceiverClientReadChunk(client);
+      invokeXceiverClientReadSmallFile(client);
+      Assert.assertEquals(1, seenDNs.size());
+    }
+  }
+
+  private void invokeXceiverClientGetBlock(XceiverClientSpi client)
+      throws IOException {
+    ContainerProtocolCalls.getBlock(client,
+        ContainerProtos.DatanodeBlockID.newBuilder()
+            .setContainerID(1)
+            .setLocalID(1)
+            .setBlockCommitSequenceId(1)
+            .build());
+  }
+
+  private void invokeXceiverClientReadChunk(XceiverClientSpi client)
+      throws IOException {
+    BlockID bid = new BlockID(1, 1);
+    bid.setBlockCommitSequenceId(1);
+    ContainerProtocolCalls.readChunk(client,
+        ContainerProtos.ChunkInfo.newBuilder()
+            .setChunkName("Anything")
+            .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+                .setBytesPerChecksum(512)
+                .setType(ContainerProtos.ChecksumType.CRC32)
+                .build())
+            .setLen(100)
+            .setOffset(100)
+            .build(),
+        bid,
+        null);
+  }
+
+  private void invokeXceiverClientReadSmallFile(XceiverClientSpi client)
+      throws IOException {
+    BlockID bid = new BlockID(1, 1);
+    bid.setBlockCommitSequenceId(1);
+    ContainerProtocolCalls.readSmallFile(client, bid);
+  }
+
+  private XceiverClientReply buildValidResponse() {
+    ContainerProtos.ContainerCommandResponseProto resp =
+        ContainerProtos.ContainerCommandResponseProto.newBuilder()
+            .setCmdType(ContainerProtos.Type.GetBlock)
+            .setResult(ContainerProtos.Result.SUCCESS).build();
+    final CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+        replyFuture = new CompletableFuture<>();
+    replyFuture.complete(resp);
+    return new XceiverClientReply(replyFuture);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org