You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/06/19 20:46:43 UTC
[hadoop-ozone] branch master updated: HDDS-3794. Topology Aware
read does not work correctly in XceiverClientGrpc (#1078)
This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2f9f4 HDDS-3794. Topology Aware read does not work correctly in XceiverClientGrpc (#1078)
8b2f9f4 is described below
commit 8b2f9f41eeec9ed29ebc45adfe11a713f29bdc28
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Jun 19 21:46:31 2020 +0100
HDDS-3794. Topology Aware read does not work correctly in XceiverClientGrpc (#1078)
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 21 ++-
.../hadoop/ozone/scm/TestXceiverClientGrpc.java | 198 +++++++++++++++++++++
2 files changed, 211 insertions(+), 8 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 28d4491..4adfa85 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -298,7 +298,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
List<DatanodeDetails> datanodeList = null;
DatanodeBlockID blockID = null;
- if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
+ if (request.getCmdType() == ContainerProtos.Type.GetBlock) {
+ blockID = request.getGetBlock().getBlockID();
+ } else if (request.getCmdType() == ContainerProtos.Type.ReadChunk) {
blockID = request.getReadChunk().getBlockID();
} else if (request.getCmdType() == ContainerProtos.Type.GetSmallFile) {
blockID = request.getGetSmallFile().getBlock().getBlockID();
@@ -314,15 +316,17 @@ public class XceiverClientGrpc extends XceiverClientSpi {
// Pull the Cached DN to the top of the DN list
Collections.swap(datanodeList, 0, getBlockDNCacheIndex);
}
- } else if (topologyAwareRead) {
- datanodeList = pipeline.getNodesInOrder();
}
}
if (datanodeList == null) {
- datanodeList = pipeline.getNodes();
- // Shuffle datanode list so that clients do not read in the same order
- // every time.
- Collections.shuffle(datanodeList);
+ if (topologyAwareRead) {
+ datanodeList = pipeline.getNodesInOrder();
+ } else {
+ datanodeList = pipeline.getNodes();
+ // Shuffle datanode list so that clients do not read in the same order
+ // every time.
+ Collections.shuffle(datanodeList);
+ }
}
for (DatanodeDetails dn : datanodeList) {
@@ -421,7 +425,8 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}
}
- private XceiverClientReply sendCommandAsync(
+ @VisibleForTesting
+ public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, InterruptedException {
checkOpen(dn, request.getEncodedToken());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
new file mode 100644
index 0000000..32a0b1d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientGrpc.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for TestXceiverClientGrpc, to ensure topology aware reads work
+ * select the closest node, and connections are re-used after a getBlock call.
+ */
+public class TestXceiverClientGrpc {
+
+ private Pipeline pipeline;
+ private List<DatanodeDetails> dns;
+ private List<DatanodeDetails> dnsInOrder;
+ private OzoneConfiguration conf = new OzoneConfiguration();
+
+ @Before
+ public void setup() {
+ dns = new ArrayList<>();
+ dns.add(MockDatanodeDetails.randomDatanodeDetails());
+ dns.add(MockDatanodeDetails.randomDatanodeDetails());
+ dns.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ dnsInOrder = new ArrayList<>();
+ for (int i=2; i>=0; i--) {
+ dnsInOrder.add(dns.get(i));
+ }
+
+ pipeline = Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.THREE)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setNodes(dns)
+ .build();
+ pipeline.setNodesInOrder(dnsInOrder);
+ }
+
+ @Test
+ public void testCorrectDnsReturnedFromPipeline() throws IOException {
+ Assert.assertEquals(dnsInOrder.get(0), pipeline.getClosestNode());
+ Assert.assertEquals(dns.get(0), pipeline.getFirstNode());
+ Assert.assertNotEquals(dns.get(0), dnsInOrder.get(0));
+ }
+
+ @Test(timeout=5000)
+ public void testRandomFirstNodeIsCommandTarget() throws IOException {
+ final ArrayList<DatanodeDetails> allDNs = new ArrayList<>(dns);
+ // Using a new Xceiver Client, call it repeatedly until all DNs in the
+ // pipeline have been the target of the command, indicating it is shuffling
+ // the DNs on each call with a new client. This test will timeout if this
+ // is not happening.
+ while(allDNs.size() > 0) {
+ XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+ @Override
+ public XceiverClientReply sendCommandAsync(
+ ContainerProtos.ContainerCommandRequestProto request,
+ DatanodeDetails dn) {
+ allDNs.remove(dn);
+ return buildValidResponse();
+ }
+ };
+ invokeXceiverClientGetBlock(client);
+ }
+ }
+
+ @Test
+ public void testFirstNodeIsCorrectWithTopologyForCommandTarget()
+ throws IOException {
+ final Set<DatanodeDetails> seenDNs = new HashSet<>();
+ conf.setBoolean(
+ OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ // With a new Client, make 100 calls and ensure the first sortedDN is used
+ // each time. The logic should always use the sorted node, so we can check
+ // only a single DN is ever seen after 100 calls.
+ for (int i=0; i<100; i++) {
+ XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+ @Override
+ public XceiverClientReply sendCommandAsync(
+ ContainerProtos.ContainerCommandRequestProto request,
+ DatanodeDetails dn) {
+ seenDNs.add(dn);
+ return buildValidResponse();
+ }
+ };
+ invokeXceiverClientGetBlock(client);
+ }
+ Assert.assertEquals(1, seenDNs.size());
+ }
+
+ @Test
+ public void testConnectionReusedAfterGetBlock() throws IOException {
+ // With a new Client, make 100 calls. On each call, ensure that only one
+ // DN is seen, indicating the same DN connection is reused.
+ for (int i=0; i<100; i++) {
+ final Set<DatanodeDetails> seenDNs = new HashSet<>();
+ XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf) {
+ @Override
+ public XceiverClientReply sendCommandAsync(
+ ContainerProtos.ContainerCommandRequestProto request,
+ DatanodeDetails dn) {
+ seenDNs.add(dn);
+ return buildValidResponse();
+ }
+ };
+ invokeXceiverClientGetBlock(client);
+ invokeXceiverClientGetBlock(client);
+ invokeXceiverClientReadChunk(client);
+ invokeXceiverClientReadSmallFile(client);
+ Assert.assertEquals(1, seenDNs.size());
+ }
+ }
+
+ private void invokeXceiverClientGetBlock(XceiverClientSpi client)
+ throws IOException {
+ ContainerProtocolCalls.getBlock(client,
+ ContainerProtos.DatanodeBlockID.newBuilder()
+ .setContainerID(1)
+ .setLocalID(1)
+ .setBlockCommitSequenceId(1)
+ .build());
+ }
+
+ private void invokeXceiverClientReadChunk(XceiverClientSpi client)
+ throws IOException {
+ BlockID bid = new BlockID(1, 1);
+ bid.setBlockCommitSequenceId(1);
+ ContainerProtocolCalls.readChunk(client,
+ ContainerProtos.ChunkInfo.newBuilder()
+ .setChunkName("Anything")
+ .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+ .setBytesPerChecksum(512)
+ .setType(ContainerProtos.ChecksumType.CRC32)
+ .build())
+ .setLen(100)
+ .setOffset(100)
+ .build(),
+ bid,
+ null);
+ }
+
+ private void invokeXceiverClientReadSmallFile(XceiverClientSpi client)
+ throws IOException {
+ BlockID bid = new BlockID(1, 1);
+ bid.setBlockCommitSequenceId(1);
+ ContainerProtocolCalls.readSmallFile(client, bid);
+ }
+
+ private XceiverClientReply buildValidResponse() {
+ ContainerProtos.ContainerCommandResponseProto resp =
+ ContainerProtos.ContainerCommandResponseProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.GetBlock)
+ .setResult(ContainerProtos.Result.SUCCESS).build();
+ final CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ replyFuture = new CompletableFuture<>();
+ replyFuture.complete(resp);
+ return new XceiverClientReply(replyFuture);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org