You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/17 15:41:31 UTC
[ozone] branch master updated: HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5d6ce84390 HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)
5d6ce84390 is described below
commit 5d6ce8439015db0cc1f8c58d8e9fcd220b9e682d
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Fri Jun 17 23:41:25 2022 +0800
HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)
Co-authored-by: Uma Maheswara Rao G <um...@cloudera.com>
---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 34 +++++++++++++++++++-
.../ozone/client/io/TestECBlockInputStream.java | 37 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 70bd3847c2..89be7839ac 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.function.Function;
/**
@@ -182,12 +183,43 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
HddsProtos.ReplicationFactor.ONE),
blkInfo, pipeline,
blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
- refreshFunction);
+ ecPipelineRefreshFunction(locationIndex + 1, refreshFunction));
blockStreams[locationIndex] = stream;
}
return stream;
}
+ /**
+ * Returns a function that builds a Standalone pipeline corresponding
+ * to the replicaIndex given based on the EC pipeline fetched from SCM.
+ * @param replicaIndex
+ * @param refreshFunc
+ * @return
+ */
+ protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
+ int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
+ return (blockID) -> {
+ Pipeline ecPipeline = refreshFunc.apply(blockID);
+ if (ecPipeline == null) {
+ return null;
+ }
+ DatanodeDetails curIndexNode = ecPipeline.getNodes()
+ .stream().filter(dn ->
+ ecPipeline.getReplicaIndex(dn) == replicaIndex)
+ .findAny().orElse(null);
+ if (curIndexNode == null) {
+ return null;
+ }
+ return Pipeline.newBuilder().setReplicationConfig(
+ StandaloneReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE))
+ .setNodes(Collections.singletonList(curIndexNode))
+ .setId(PipelineID.randomId())
+ .setState(Pipeline.PipelineState.CLOSED)
+ .build();
+ };
+ }
+
/**
* Returns the length of the Nth block in the block group, taking account of a
* potentially partial last stripe. Note that the internal block index is
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 536f618f7d..8cf8451775 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -22,8 +22,10 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -394,6 +396,41 @@ public class TestECBlockInputStream {
}
}
+ @Test
+ public void testEcPipelineRefreshFunction() {
+ repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ ONEMB);
+ BlockLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+
+ BlockID blockID = new BlockID(1, 1);
+ Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+ for (int i = 1; i <= 5; i++) {
+ dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+ }
+
+ // Create a refreshFunction that returns a hard-coded EC pipeline.
+ Function<BlockID, Pipeline> refreshFunction = blkID -> Pipeline.newBuilder()
+ .setReplicationConfig(repConfig)
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .build();
+
+ try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+ keyInfo, true, null, null, streamFactory)) {
+ Pipeline pipeline =
+ ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID);
+ // Check the pipeline is built with the correct Datanode
+ // with right replicaIndex.
+ Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE,
+ pipeline.getReplicationConfig().getReplicationType());
+ Assertions.assertEquals(1, pipeline.getNodes().size());
+ Assertions.assertEquals(3, dnMap.get(pipeline.getNodes().get(0)));
+ }
+ }
+
private void validateBufferContents(ByteBuffer buf, int from, int to,
byte val) {
for (int i = from; i < to; i++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org