You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/17 15:41:31 UTC

[ozone] branch master updated: HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d6ce84390 HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)
5d6ce84390 is described below

commit 5d6ce8439015db0cc1f8c58d8e9fcd220b9e682d
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Fri Jun 17 23:41:25 2022 +0800

    HDDS-6890. EC: Fix potential wrong replica read with over-replicated container. (#3523)
    
    Co-authored-by: Uma Maheswara Rao G <um...@cloudera.com>
---
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 34 +++++++++++++++++++-
 .../ozone/client/io/TestECBlockInputStream.java    | 37 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 70bd3847c2..89be7839ac 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.function.Function;
 
 /**
@@ -182,12 +183,43 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
               HddsProtos.ReplicationFactor.ONE),
           blkInfo, pipeline,
           blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
-          refreshFunction);
+          ecPipelineRefreshFunction(locationIndex + 1, refreshFunction));
       blockStreams[locationIndex] = stream;
     }
     return stream;
   }
 
+  /**
+   * Returns a function that builds a Standalone pipeline corresponding
+   * to the replicaIndex given based on the EC pipeline fetched from SCM.
+   * @param replicaIndex
+   * @param refreshFunc
+   * @return
+   */
+  protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
+      int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
+    return (blockID) -> {
+      Pipeline ecPipeline = refreshFunc.apply(blockID);
+      if (ecPipeline == null) {
+        return null;
+      }
+      DatanodeDetails curIndexNode = ecPipeline.getNodes()
+          .stream().filter(dn ->
+              ecPipeline.getReplicaIndex(dn) == replicaIndex)
+          .findAny().orElse(null);
+      if (curIndexNode == null) {
+        return null;
+      }
+      return Pipeline.newBuilder().setReplicationConfig(
+              StandaloneReplicationConfig.getInstance(
+                  HddsProtos.ReplicationFactor.ONE))
+          .setNodes(Collections.singletonList(curIndexNode))
+          .setId(PipelineID.randomId())
+          .setState(Pipeline.PipelineState.CLOSED)
+          .build();
+    };
+  }
+
   /**
    * Returns the length of the Nth block in the block group, taking account of a
    * potentially partial last stripe. Note that the internal block index is
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 536f618f7d..8cf8451775 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -22,8 +22,10 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -394,6 +396,41 @@ public class TestECBlockInputStream {
     }
   }
 
+  @Test
+  public void testEcPipelineRefreshFunction() {
+    repConfig = new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+        ONEMB);
+    BlockLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, 5, 5 * ONEMB);
+
+    BlockID blockID = new BlockID(1, 1);
+    Map<DatanodeDetails, Integer> dnMap = new HashMap<>();
+    for (int i = 1; i <= 5; i++) {
+      dnMap.put(MockDatanodeDetails.randomDatanodeDetails(), i);
+    }
+
+    // Create a refreshFunction that returns a hard-coded EC pipeline.
+    Function<BlockID, Pipeline> refreshFunction = blkID -> Pipeline.newBuilder()
+        .setReplicationConfig(repConfig)
+        .setNodes(new ArrayList<>(dnMap.keySet()))
+        .setReplicaIndexes(dnMap)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .build();
+
+    try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
+        keyInfo, true, null, null, streamFactory)) {
+      Pipeline pipeline =
+          ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID);
+      // Check the pipeline is built with the correct Datanode
+      // with right replicaIndex.
+      Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE,
+          pipeline.getReplicationConfig().getReplicationType());
+      Assertions.assertEquals(1, pipeline.getNodes().size());
+      Assertions.assertEquals(3, dnMap.get(pipeline.getNodes().get(0)));
+    }
+  }
+
   private void validateBufferContents(ByteBuffer buf, int from, int to,
       byte val) {
     for (int i = from; i < to; i++) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org