You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/06/16 16:15:43 UTC

[ozone] branch master updated: HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef1df7b47 HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
6ef1df7b47 is described below

commit 6ef1df7b4790a99986b4ee83c43286055701d10f
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu Jun 16 09:15:39 2022 -0700

    HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
---
 .../hadoop/hdds/scm/XceiverClientManager.java      |  33 ++
 .../hdds/scm/storage/ECBlockOutputStream.java      |   5 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  19 +-
 .../common/statemachine/DatanodeStateMachine.java  |  10 +-
 .../ReconstructECContainersCommandHandler.java     |   5 +-
 .../reconstruction/ECContainerOperationClient.java | 148 ++++++++
 .../ECReconstructionCoordinator.java               | 410 +++++++++++++++++++++
 .../ECReconstructionCoordinatorTask.java           |  39 ++
 .../reconstruction/ECReconstructionSupervisor.java |  27 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |   1 +
 .../TestECReconstructionSupervisor.java            |   4 +-
 .../hdds/scm/storage/TestContainerCommandsEC.java  | 288 +++++++++++++--
 12 files changed, 938 insertions(+), 51 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index ee4edecb3c..0724d614cc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -66,6 +66,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       LoggerFactory.getLogger(XceiverClientManager.class);
   //TODO : change this to SCM configuration class
   private final ConfigurationSource conf;
+  private final ScmClientConfig clientConfig;
   private final Cache<String, XceiverClientSpi> clientCache;
   private List<X509Certificate> caCerts;
 
@@ -88,6 +89,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       List<X509Certificate> caCerts) throws IOException {
     Preconditions.checkNotNull(clientConf);
     Preconditions.checkNotNull(conf);
+    this.clientConfig = clientConf;
     long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
     this.conf = conf;
     this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
@@ -347,6 +349,37 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       this.maxSize = maxSize;
     }
 
+    public void setStaleThreshold(long threshold) {
+      this.staleThreshold = threshold;
+    }
+
+  }
+
+  /**
+   * Builder of ScmClientConfig.
+   */
+  public static class XceiverClientManagerConfigBuilder {
+
+    private int maxCacheSize;
+    private long staleThresholdMs;
+
+    public XceiverClientManagerConfigBuilder setMaxCacheSize(int maxCacheSize) {
+      this.maxCacheSize = maxCacheSize;
+      return this;
+    }
+
+    public XceiverClientManagerConfigBuilder setStaleThresholdMs(
+        long staleThresholdMs) {
+      this.staleThresholdMs = staleThresholdMs;
+      return this;
+    }
+
+    public ScmClientConfig build() {
+      ScmClientConfig clientConfig = new ScmClientConfig();
+      clientConfig.setMaxSize(this.maxCacheSize);
+      clientConfig.setStaleThreshold(this.staleThresholdMs);
+      return clientConfig;
+    }
   }
 
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 3f3fc12f2a..174e507829 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -80,6 +80,11 @@ public class ECBlockOutputStream extends BlockOutputStream {
         writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
   }
 
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
+      ByteBuffer buff) throws IOException {
+    return writeChunkToContainer(ChunkBuffer.wrap(buff));
+  }
+
   public CompletableFuture<ContainerProtos.
       ContainerCommandResponseProto> executePutBlock(boolean close,
       boolean force, long blockGroupLength) throws IOException {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index b5365820e3..16bef69712 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -425,13 +425,15 @@ public final class ContainerProtocolCalls  {
    * @param client  - client
    * @param containerID - ID of container
    * @param encodedToken - encodedToken if security is enabled
+   * @param replicaIndex - index position of the container replica
    * @throws IOException
    */
   @InterfaceStability.Evolving
   public static void createRecoveringContainer(XceiverClientSpi client,
-      long containerID, String encodedToken) throws IOException {
+      long containerID, String encodedToken, int replicaIndex)
+      throws IOException {
     createContainerInternal(client, containerID, encodedToken,
-        ContainerProtos.ContainerDataProto.State.RECOVERING);
+        ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
   }
 
   /**
@@ -443,7 +445,7 @@ public final class ContainerProtocolCalls  {
    */
   public static void createContainer(XceiverClientSpi client, long containerID,
       String encodedToken) throws IOException {
-    createContainerInternal(client, containerID, encodedToken, null);
+    createContainerInternal(client, containerID, encodedToken, null, 0);
   }
   /**
    * createContainer call that creates a container on the datanode.
@@ -451,18 +453,23 @@ public final class ContainerProtocolCalls  {
    * @param containerID - ID of container
    * @param encodedToken - encodedToken if security is enabled
    * @param state - state of the container
+   * @param replicaIndex - index position of the container replica
    * @throws IOException
    */
   private static void createContainerInternal(XceiverClientSpi client,
       long containerID, String encodedToken,
-      ContainerProtos.ContainerDataProto.State state) throws IOException {
+      ContainerProtos.ContainerDataProto.State state, int replicaIndex)
+      throws IOException {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto.newBuilder();
-    createRequest.setContainerType(ContainerProtos.ContainerType
-        .KeyValueContainer);
+    createRequest
+        .setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
     if (state != null) {
       createRequest.setState(state);
     }
+    if (replicaIndex > 0) {
+      createRequest.setReplicaIndex(replicaIndex);
+    }
 
     String id = client.getPipeline().getFirstNode().getUuidString();
     ContainerCommandRequestProto.Builder request =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ba21e5795e..cc05511b58 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Reco
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -181,9 +182,12 @@ public class DatanodeStateMachine implements Closeable {
     replicationSupervisorMetrics =
         ReplicationSupervisorMetrics.create(supervisor);
 
+    ECReconstructionCoordinator ecReconstructionCoordinator =
+        new ECReconstructionCoordinator(conf, certClient);
     ecReconstructionSupervisor =
         new ECReconstructionSupervisor(container.getContainerSet(), context,
-            replicationConfig.getReplicationMaxStreams());
+            replicationConfig.getReplicationMaxStreams(),
+            ecReconstructionCoordinator);
 
 
     // When we add new handlers just adding a new handler here should do the
@@ -389,6 +393,10 @@ public class DatanodeStateMachine implements Closeable {
       Thread.currentThread().interrupt();
     }
 
+    if (ecReconstructionSupervisor != null) {
+      ecReconstructionSupervisor.close();
+    }
+
     if (connectionManager != null) {
       connectionManager.close();
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index f4ec45f600..009d47d7af 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -53,8 +53,9 @@ public class ReconstructECContainersCommandHandler implements CommandHandler {
             ecContainersCommand.getMissingContainerIndexes(),
             ecContainersCommand.getSources(),
             ecContainersCommand.getTargetDatanodes());
-    this.supervisor.addTask(
-        new ECReconstructionCoordinatorTask(reconstructionCommandInfo));
+    this.supervisor.addTask(new ECReconstructionCoordinatorTask(
+        this.supervisor.getReconstructionCoordinator(),
+        reconstructionCommandInfo));
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
new file mode 100644
index 0000000000..adebe11776
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class wraps necessary container-level rpc calls
+ * during ec offline reconstruction.
+ *   - ListBlock
+ *   - CloseContainer
+ */
+public class ECContainerOperationClient implements Closeable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECContainerOperationClient.class);
+  private final XceiverClientManager xceiverClientManager;
+
+  public ECContainerOperationClient(XceiverClientManager clientManager) {
+    this.xceiverClientManager = clientManager;
+  }
+
+  public ECContainerOperationClient(ConfigurationSource conf,
+      CertificateClient certificateClient) throws IOException {
+    this(createClientManager(conf, certificateClient));
+  }
+
+  @NotNull
+  private static XceiverClientManager createClientManager(
+      ConfigurationSource conf, CertificateClient certificateClient)
+      throws IOException {
+    return new XceiverClientManager(conf,
+        new XceiverClientManager.XceiverClientManagerConfigBuilder()
+            .setMaxCacheSize(256).setStaleThresholdMs(10 * 1000).build(),
+        certificateClient != null ?
+            HAUtils.buildCAX509List(certificateClient, conf) :
+            null);
+  }
+
+  public BlockData[] listBlock(long containerId, DatanodeDetails dn,
+      ECReplicationConfig repConfig, Token<? extends TokenIdentifier> token)
+      throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager
+        .acquireClient(singleNodePipeline(dn, repConfig));
+    try {
+      List<ContainerProtos.BlockData> blockDataList = ContainerProtocolCalls
+          .listBlock(xceiverClient, containerId, null, Integer.MAX_VALUE, token)
+          .getBlockDataList();
+      return blockDataList.stream().map(i -> {
+        try {
+          return BlockData.getFromProtoBuf(i);
+        } catch (IOException e) {
+          LOG.debug("Failed while converting to protobuf BlockData. Returning"
+                  + " null for listBlock from DN: " + dn,
+              e);
+          // TODO: revisit here.
+          return null;
+        }
+      }).collect(Collectors.toList())
+          .toArray(new BlockData[blockDataList.size()]);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  public void closeContainer(long containerID, DatanodeDetails dn,
+      ECReplicationConfig repConfig, String encodedToken) throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager
+        .acquireClient(singleNodePipeline(dn, repConfig));
+    try {
+      ContainerProtocolCalls
+          .closeContainer(xceiverClient, containerID, encodedToken);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  public void createRecoveringContainer(long containerID, DatanodeDetails dn,
+      ECReplicationConfig repConfig, String encodedToken, int replicaIndex)
+      throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(
+        singleNodePipeline(dn, repConfig));
+    try {
+      ContainerProtocolCalls
+          .createRecoveringContainer(xceiverClient, containerID, encodedToken,
+              replicaIndex);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  Pipeline singleNodePipeline(DatanodeDetails dn,
+      ECReplicationConfig repConfig) {
+    // To get the same client from cache, we try to use the DN UUID as
+    // pipelineID for uniqueness. Please note, pipeline does not have any
+    // significance after it's close. So, we are ok to use any ID.
+    return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
+        .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
+        .setState(Pipeline.PipelineState.CLOSED).build();
+  }
+
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (xceiverClientManager != null) {
+      xceiverClientManager.close();
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
new file mode 100644
index 0000000000..780c520d72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * The Coordinator implements the main flow of reconstructing
+ * missing container replicas.
+ * <p>
+ * For a container reconstruction task, the main flow is:
+ * - ListBlock from all healthy replicas
+ * - calculate effective block group len for all blocks
+ * - create RECOVERING containers in TargetDNs
+ * -  for each block
+ * -    build a ECReconstructedStripedInputStream to read healthy chunks
+ * -    build a ECBlockOutputStream to write out decoded chunks
+ * -      for each stripe
+ * -        use ECReconstructedStripedInputStream.recoverChunks to decode chunks
+ * -        use ECBlockOutputStream.write to write decoded chunks to TargetDNs
+ * -    PutBlock
+ * - Close RECOVERING containers in TargetDNs
+ */
+public class ECReconstructionCoordinator implements Closeable {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(ECReconstructionCoordinator.class);
+
+  private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+
+  private final ECContainerOperationClient containerOperationClient;
+
+  private final ConfigurationSource config;
+
+  private final ByteBufferPool byteBufferPool;
+
+  private ExecutorService ecReconstructExecutor;
+
+  private BlockInputStreamFactory blockInputStreamFactory;
+
+  public ECReconstructionCoordinator(ECContainerOperationClient containerClient,
+      ConfigurationSource conf, ByteBufferPool byteBufferPool,
+      ExecutorService reconstructExecutor,
+      BlockInputStreamFactory streamFactory) {
+    this.containerOperationClient = containerClient;
+    this.config = conf;
+    this.byteBufferPool = byteBufferPool;
+    this.blockInputStreamFactory = streamFactory;
+    this.ecReconstructExecutor = reconstructExecutor;
+  }
+
+  public ECReconstructionCoordinator(ConfigurationSource conf,
+      CertificateClient certificateClient) throws IOException {
+    this(new ECContainerOperationClient(conf, certificateClient), conf,
+        new ElasticByteBufferPool(), null, null);
+    this.ecReconstructExecutor =
+        new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+            config.getObject(OzoneClientConfig.class)
+                .getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS,
+            new SynchronousQueue<>(), new ThreadFactoryBuilder()
+            .setNameFormat("ec-reconstruct-reader-TID-%d").build(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.blockInputStreamFactory = BlockInputStreamFactoryImpl
+        .getInstance(byteBufferPool, () -> ecReconstructExecutor);
+  }
+
+  public void reconstructECContainerGroup(long containerID,
+      ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> sourceNodeMap,
+      SortedMap<Integer, DatanodeDetails> targetNodeMap) throws IOException {
+
+    Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap);
+
+    SortedMap<Long, BlockData[]> blockDataMap =
+        getBlockDataMap(containerID, repConfig, sourceNodeMap);
+
+    SortedMap<Long, BlockLocationInfo> blockLocationInfoMap =
+        calcBlockLocationInfoMap(containerID, blockDataMap, pipeline);
+
+    // 1. create target recovering containers.
+    for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+        .entrySet()) {
+      this.containerOperationClient
+          .createRecoveringContainer(containerID, indexDnPair.getValue(),
+              repConfig, null, indexDnPair.getKey());
+    }
+
+    // 2. Reconstruct and transfer to targets
+    for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap.values()) {
+      reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap);
+    }
+
+    // 3. Close containers
+    for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+        .entrySet()) {
+      DatanodeDetails dn = indexDnPair.getValue();
+      this.containerOperationClient
+          .closeContainer(containerID, dn, repConfig, null);
+    }
+
+  }
+
+  void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
+      ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> targetMap)
+      throws IOException {
+    long safeBlockGroupLength = blockLocationInfo.getLength();
+    List<Integer> missingContainerIndexes = new ArrayList<>(targetMap.keySet());
+
+    // calculate the real missing block indexes
+    int dataLocs = ECBlockInputStreamProxy
+        .expectedDataLocations(repConfig, safeBlockGroupLength);
+    List<Integer> toReconstructIndexes = new ArrayList<>();
+    for (Integer index : missingContainerIndexes) {
+      if (index <= dataLocs || index > repConfig.getData()) {
+        toReconstructIndexes.add(index);
+      }
+      // else padded indexes.
+    }
+
+    // Looks like we don't need to reconstruct any missing blocks in this block
+    // group. The reason for this should be block group had only padding blocks
+    // in the missing locations.
+    if (toReconstructIndexes.size() == 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping the reconstruction for the block: "
+            + blockLocationInfo.getBlockID() + ". In the missing locations: "
+            + missingContainerIndexes
+            + ", this block group has only padded blocks.");
+      }
+      return;
+    }
+
+    try (ECBlockReconstructedStripeInputStream sis
+        = new ECBlockReconstructedStripeInputStream(
+        repConfig, blockLocationInfo, true,
+        this.containerOperationClient.getXceiverClientManager(), null,
+        this.blockInputStreamFactory, byteBufferPool,
+        this.ecReconstructExecutor)) {
+
+      ECBlockOutputStream[] targetBlockStreams =
+          new ECBlockOutputStream[toReconstructIndexes.size()];
+      ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
+      OzoneClientConfig configuration = new OzoneClientConfig();
+      // TODO: Let's avoid unnecessary bufferPool creation. This pool actually
+      //  not used in EC flows, but there are some dependencies on buffer pool.
+      BufferPool bufferPool =
+          new BufferPool(configuration.getStreamBufferSize(),
+              (int) (configuration.getStreamBufferMaxSize() / configuration
+                  .getStreamBufferSize()),
+              ByteStringConversion.createByteBufferConversion(false));
+      for (int i = 0; i < toReconstructIndexes.size(); i++) {
+        DatanodeDetails datanodeDetails =
+            targetMap.get(toReconstructIndexes.get(i));
+        targetBlockStreams[i] =
+            new ECBlockOutputStream(blockLocationInfo.getBlockID(),
+                this.containerOperationClient.getXceiverClientManager(),
+                this.containerOperationClient
+                    .singleNodePipeline(datanodeDetails, repConfig), bufferPool,
+                configuration, blockLocationInfo.getToken());
+        bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
+        // Make sure it's clean. Don't want to reuse the erroneously returned
+        // buffers from the pool.
+        bufs[i].clear();
+      }
+
+      sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
+          .collect(Collectors.toSet()));
+      long length = safeBlockGroupLength;
+      while (length > 0) {
+        int readLen = sis.recoverChunks(bufs);
+        // TODO: can be submitted in parallel
+        for (int i = 0; i < bufs.length; i++) {
+          CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+              future = targetBlockStreams[i].write(bufs[i]);
+          checkFailures(targetBlockStreams[i], future);
+          bufs[i].clear();
+        }
+        length -= readLen;
+      }
+
+      try {
+        for (ECBlockOutputStream targetStream : targetBlockStreams) {
+          targetStream
+              .executePutBlock(true, true, blockLocationInfo.getLength());
+          checkFailures(targetStream,
+              targetStream.getCurrentPutBlkResponseFuture());
+        }
+      } finally {
+        for (ByteBuffer buf : bufs) {
+          byteBufferPool.putBuffer(buf);
+        }
+        IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
+      }
+    }
+  }
+
+  private void checkFailures(ECBlockOutputStream targetBlockStream,
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          currentPutBlkResponseFuture)
+      throws IOException {
+    if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) {
+      // If one chunk response failed, we should retry.
+      // Even after retries if it failed, we should declare the
+      // reconstruction as failed.
+      // For now, let's throw the exception.
+      throw new IOException(
+          "Chunk write failed at the new target node: " + targetBlockStream
+              .getDatanodeDetails() + ". Aborting the reconstruction process.");
+    }
+  }
+
+  private boolean isFailed(ECBlockOutputStream outputStream,
+      CompletableFuture<ContainerProtos.
+          ContainerCommandResponseProto> chunkWriteResponseFuture) {
+    if (chunkWriteResponseFuture == null) {
+      return true;
+    }
+
+    ContainerProtos.ContainerCommandResponseProto
+        containerCommandResponseProto = null;
+    try {
+      containerCommandResponseProto = chunkWriteResponseFuture.get();
+    } catch (InterruptedException e) {
+      outputStream.setIoException(e);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      outputStream.setIoException(e);
+    }
+
+    if (outputStream.getIoException() != null) {
+      return true;
+    }
+
+    if (containerCommandResponseProto == null) {
+      return true;
+    }
+
+    return false;
+  }
+
+  SortedMap<Long, BlockLocationInfo> calcBlockLocationInfoMap(long containerID,
+      SortedMap<Long, BlockData[]> blockDataMap, Pipeline pipeline) {
+
+    SortedMap<Long, BlockLocationInfo> blockInfoMap = new TreeMap<>();
+
+    for (Map.Entry<Long, BlockData[]> entry : blockDataMap.entrySet()) {
+      Long localID = entry.getKey();
+      BlockData[] blockGroup = entry.getValue();
+
+      long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup,
+          pipeline.getReplicationConfig().getRequiredNodes());
+      if (blockGroupLen > 0) {
+        BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder()
+            .setBlockID(new BlockID(containerID, localID))
+            .setLength(blockGroupLen).setPipeline(pipeline).build();
+        blockInfoMap.put(localID, blockLocationInfo);
+      }
+    }
+    return blockInfoMap;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (containerOperationClient != null) {
+      containerOperationClient.close();
+    }
+  }
+
+  private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> sourceNodeMap) {
+
+    List<DatanodeDetails> nodes = new ArrayList<>(sourceNodeMap.values());
+    Map<DatanodeDetails, Integer> dnVsIndex = new HashMap<>();
+
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        sourceNodeMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      Integer key = next.getKey();
+      DatanodeDetails value = next.getValue();
+      dnVsIndex.put(value, key);
+    }
+
+    return Pipeline.newBuilder().setId(PipelineID.randomId())
+        .setReplicationConfig(repConfig).setNodes(nodes)
+        .setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED)
+        .build();
+  }
+
+  private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
+      ECReplicationConfig repConfig,
+      Map<Integer, DatanodeDetails> sourceNodeMap) throws IOException {
+
+    SortedMap<Long, BlockData[]> resultMap = new TreeMap<>();
+
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        sourceNodeMap.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      Integer index = next.getKey();
+      DatanodeDetails dn = next.getValue();
+
+      BlockData[] blockDataArr =
+          containerOperationClient.listBlock(containerID, dn, repConfig, null);
+
+      for (BlockData blockData : blockDataArr) {
+        BlockID blockID = blockData.getBlockID();
+        BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(),
+            new BlockData[repConfig.getRequiredNodes()]);
+        blkDataArr[index - 1] = blockData;
+        resultMap.put(blockID.getLocalID(), blkDataArr);
+      }
+    }
+    return resultMap;
+  }
+
+  /**
+   * Get the effective length of each block group.
+   * We can not be absolutely accurate when there is a failed stripe
+   * in this block since the failed cells could be missing, and
+   * we can not tell from the healthy cells whether the last stripe
+   * is failed or not. But in such case we at most recover one extra
+   * stripe for this block which does not confuse the client data view.
+   *
+   * @param blockGroup
+   * @param replicaCount
+   * @return
+   */
+  private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
+      int replicaCount) {
+    Preconditions.checkState(blockGroup.length == replicaCount);
+
+    long blockGroupLen = Long.MAX_VALUE;
+
+    for (int i = 0; i < replicaCount; i++) {
+      if (blockGroup[i] == null) {
+        continue;
+      }
+
+      String putBlockLenStr = blockGroup[i].getMetadata()
+          .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
+      long putBlockLen = (putBlockLenStr == null) ?
+          Long.MAX_VALUE :
+          Long.parseLong(putBlockLenStr);
+      // Use the min to be conservative
+      blockGroupLen = Math.min(putBlockLen, blockGroupLen);
+    }
+    return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index 24168e5f69..af4a87e272 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -17,14 +17,30 @@
  */
 package org.apache.hadoop.ozone.container.ec.reconstruction;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * This is the actual EC reconstruction coordination task.
  */
 public class ECReconstructionCoordinatorTask implements Runnable {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
+  private ECReconstructionCoordinator reconstructionCoordinator;
   private ECReconstructionCommandInfo reconstructionCommandInfo;
 
   public ECReconstructionCoordinatorTask(
+      ECReconstructionCoordinator coordinator,
       ECReconstructionCommandInfo reconstructionCommandInfo) {
+    this.reconstructionCoordinator = coordinator;
     this.reconstructionCommandInfo = reconstructionCommandInfo;
   }
 
@@ -42,6 +58,29 @@ public class ECReconstructionCoordinatorTask implements Runnable {
     // 4. Write the recovered chunks to given targets/write locally to
     // respective container. HDDS-6582
     // 5. Close/finalize the recovered containers.
+
+    SortedMap<Integer, DatanodeDetails> sourceNodeMap =
+        reconstructionCommandInfo.getSources().stream().collect(Collectors
+            .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
+                DatanodeDetailsAndReplicaIndex::getDnDetails, (v1, v2) -> v1,
+                TreeMap::new));
+    SortedMap<Integer, DatanodeDetails> targetNodeMap = IntStream
+        .range(0, reconstructionCommandInfo.getTargetDatanodes().size()).boxed()
+        .collect(Collectors.toMap(i -> (int) reconstructionCommandInfo
+                .getMissingContainerIndexes()[i],
+            i -> reconstructionCommandInfo.getTargetDatanodes().get(i),
+            (v1, v2) -> v1, TreeMap::new));
+
+    try {
+      reconstructionCoordinator.reconstructECContainerGroup(
+          reconstructionCommandInfo.getContainerID(),
+          reconstructionCommandInfo.getEcReplicationConfig(), sourceNodeMap,
+          targetNodeMap);
+    } catch (IOException e) {
+      LOG.warn(
+          "Failed to complete the reconstruction task for the container: "
+              + reconstructionCommandInfo.getContainerID(), e);
+    }
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
index e2c930a8c2..2af04f6500 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
@@ -21,6 +21,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -30,28 +32,33 @@ import java.util.concurrent.TimeUnit;
  * This class is to handle all the EC reconstruction tasks to be scheduled as
  * they arrive.
  */
-public class ECReconstructionSupervisor {
+public class ECReconstructionSupervisor implements Closeable {
 
   private final ContainerSet containerSet;
   private final StateContext context;
   private final ExecutorService executor;
+  private final ECReconstructionCoordinator reconstructionCoordinator;
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
-      StateContext context, ExecutorService executor) {
+      StateContext context, ExecutorService executor,
+      ECReconstructionCoordinator coordinator) {
     this.containerSet = containerSet;
     this.context = context;
     this.executor = executor;
+    this.reconstructionCoordinator = coordinator;
   }
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
-      StateContext context, int poolSize) {
+      StateContext context, int poolSize,
+      ECReconstructionCoordinator coordinator) {
     // TODO: ReplicationSupervisor and this class can be refactored to have a
     //  common interface.
     this(containerSet, context,
         new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS,
             new LinkedBlockingQueue<>(),
             new ThreadFactoryBuilder().setDaemon(true)
-                .setNameFormat("ECContainerReconstructionThread-%d").build()));
+                .setNameFormat("ECContainerReconstructionThread-%d").build()),
+        coordinator);
   }
 
   public void stop() {
@@ -69,4 +76,16 @@ public class ECReconstructionSupervisor {
   public void addTask(ECReconstructionCoordinatorTask task) {
     executor.execute(task);
   }
+
+  @Override
+  public void close() throws IOException {
+    if (reconstructionCoordinator != null) {
+      reconstructionCoordinator.close();
+    }
+    stop();
+  }
+
+  public ECReconstructionCoordinator getReconstructionCoordinator() {
+    return reconstructionCoordinator;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 7fcbdb3e7f..9590711650 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -283,6 +283,7 @@ public class KeyValueHandler extends Handler {
     }
     newContainerData.setReplicaIndex(request.getCreateContainer()
         .getReplicaIndex());
+
     // TODO: Add support to add metadataList to ContainerData. Add metadata
     // to container during creation.
     KeyValueContainer newContainer = new KeyValueContainer(
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
index e86be82a09..2403364f4c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeoutException;
 public class TestECReconstructionSupervisor {
 
   private final ECReconstructionSupervisor supervisor =
-      new ECReconstructionSupervisor(null, null, 5);
+      new ECReconstructionSupervisor(null, null, 5, null);
 
   @Test
   public void testAddTaskShouldExecuteTheGivenTask()
@@ -42,7 +42,7 @@ public class TestECReconstructionSupervisor {
     private boolean isExecuted = false;
 
     FakeTask(ECReconstructionCommandInfo reconstructionCommandInfo) {
-      super(reconstructionCommandInfo);
+      super(null, reconstructionCommandInfo);
     }
 
     @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 76cfb7b289..3047d18d7e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -47,10 +48,16 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -58,6 +65,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -65,10 +74,16 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -79,7 +94,7 @@ public class TestContainerCommandsEC {
 
   private static MiniOzoneCluster cluster;
   private static StorageContainerManager scm;
-  private static OzoneClient client;
+  private static OzoneClient rpcClient;
   private static ObjectStore store;
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
@@ -90,17 +105,16 @@ public class TestContainerCommandsEC {
   private static final EcCodec EC_CODEC = EcCodec.RS;
   private static final int EC_CHUNK_SIZE = 1024;
   private static final int STRIPE_DATA_SIZE = EC_DATA * EC_CHUNK_SIZE;
-  private static final int NUM_DN = EC_DATA + EC_PARITY;
+  private static final int NUM_DN = EC_DATA + EC_PARITY + 3;
+  private static byte[][] inputChunks = new byte[EC_DATA][EC_CHUNK_SIZE];
 
   // Each key size will be in range [min, max), min inclusive, max exclusive
-  private static final int[][] KEY_SIZE_RANGES = new int[][]{
-      {1, EC_CHUNK_SIZE},
-      {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
-      {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
-      {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
-      {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
-      {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2},
-  };
+  private static final int[][] KEY_SIZE_RANGES =
+      new int[][] {{1, EC_CHUNK_SIZE}, {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
+          {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
+          {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
+          {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
+          {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2}};
   private static byte[][] values;
   private static long containerID;
   private static Pipeline pipeline;
@@ -117,6 +131,7 @@ public class TestContainerCommandsEC {
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
     startCluster(config);
     prepareData(KEY_SIZE_RANGES);
+    rpcClient = OzoneClientFactory.getRpcClient(config);
   }
 
   @AfterAll
@@ -130,13 +145,11 @@ public class TestContainerCommandsEC {
     Map<DatanodeDetails, Integer> indicesForSinglePipeline = new HashMap<>();
     indicesForSinglePipeline.put(node, replicaIndex);
 
-    return Pipeline.newBuilder()
-        .setId(ecPipeline.getId())
+    return Pipeline.newBuilder().setId(ecPipeline.getId())
         .setReplicationConfig(ecPipeline.getReplicationConfig())
         .setState(ecPipeline.getPipelineState())
         .setNodes(ImmutableList.of(node))
-        .setReplicaIndexes(indicesForSinglePipeline)
-        .build();
+        .setReplicaIndexes(indicesForSinglePipeline).build();
   }
 
   @BeforeEach
@@ -175,22 +188,27 @@ public class TestContainerCommandsEC {
   public void testListBlock() throws Exception {
     for (int i = 0; i < datanodeDetails.size(); i++) {
       final int minKeySize = i < EC_DATA ? i * EC_CHUNK_SIZE : 0;
-      final int numExpectedBlocks = (int) Arrays.stream(values)
-          .mapToInt(v -> v.length).filter(s -> s > minKeySize).count();
+      final int numExpectedBlocks =
+          (int) Arrays.stream(values).mapToInt(v -> v.length)
+              .filter(s -> s > minKeySize).count();
       Function<Integer, Integer> expectedChunksFunc = chunksInReplicaFunc(i);
-      final int numExpectedChunks = Arrays.stream(values)
-          .mapToInt(v -> v.length).map(expectedChunksFunc::apply).sum();
+      final int numExpectedChunks =
+          Arrays.stream(values).mapToInt(v -> v.length)
+              .map(expectedChunksFunc::apply).sum();
       if (numExpectedBlocks == 0) {
         final int j = i;
         Throwable t = Assertions.assertThrows(StorageContainerException.class,
-            () -> ContainerProtocolCalls.listBlock(clients.get(j),
-                containerID, null, numExpectedBlocks + 1, null));
-        Assertions.assertEquals(
-            "ContainerID " + containerID + " does not exist", t.getMessage());
+            () -> ContainerProtocolCalls
+                .listBlock(clients.get(j), containerID, null,
+                    numExpectedBlocks + 1, null));
+        Assertions
+            .assertEquals("ContainerID " + containerID + " does not exist",
+                t.getMessage());
         continue;
       }
-      ListBlockResponseProto response = ContainerProtocolCalls.listBlock(
-          clients.get(i), containerID, null, numExpectedBlocks + 1, null);
+      ListBlockResponseProto response = ContainerProtocolCalls
+          .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1,
+              null);
       Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(),
           "blocks count doesn't match on DN " + i);
       Assertions.assertEquals(numExpectedChunks,
@@ -228,7 +246,7 @@ public class TestContainerCommandsEC {
 
       //Create the recovering container in DN.
       ContainerProtocolCalls.createRecoveringContainer(dnClient,
-          container.containerID().getProtobuf().getId(), null);
+          container.containerID().getProtobuf().getId(), null, 4);
 
       BlockID blockID = ContainerTestHelper
           .getTestBlockID(container.containerID().getProtobuf().getId());
@@ -278,6 +296,207 @@ public class TestContainerCommandsEC {
     }
   }
 
+  private static byte[] getBytesWith(int singleDigitNumber, int total) {
+    StringBuilder builder = new StringBuilder(singleDigitNumber);
+    for (int i = 1; i <= total; i++) {
+      builder.append(singleDigitNumber);
+    }
+    return builder.toString().getBytes(UTF_8);
+  }
+
+  @ParameterizedTest
+  @MethodSource("recoverableMissingIndexes")
+  void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
+      throws Exception {
+    testECReconstructionCoordinator(missingIndexes);
+  }
+
+  static Stream<List<Integer>> recoverableMissingIndexes() {
+    return Stream
+        .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
+            .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
+                ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
+  }
+
+  /**
+   * Tests the reconstruction of data when more than parity blocks missed.
+   * Test should throw InsufficientLocationsException.
+   */
+  @Test
+  public void testECReconstructionCoordinatorWithMissingIndexes135() {
+    InsufficientLocationsException exception =
+        Assert.assertThrows(InsufficientLocationsException.class, () -> {
+          testECReconstructionCoordinator(ImmutableList.of(1, 3, 5));
+        });
+
+    String expectedMessage =
+        "There are insufficient datanodes to read the EC block";
+    String actualMessage = exception.getMessage();
+
+    Assert.assertEquals(expectedMessage, actualMessage);
+  }
+
+  private void testECReconstructionCoordinator(List<Integer> missingIndexes)
+      throws Exception {
+    ObjectStore objectStore = rpcClient.getObjectStore();
+    String keyString = UUID.randomUUID().toString();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    for (int i = 0; i < EC_DATA; i++) {
+      inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
+    }
+    XceiverClientManager xceiverClientManager =
+        new XceiverClientManager(config);
+    try (OzoneOutputStream out = bucket.createKey(keyString, 4096,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024),
+        new HashMap<>())) {
+      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+
+    ECReconstructionCoordinator coordinator =
+        new ECReconstructionCoordinator(config, null);
+
+    OzoneKeyDetails key = bucket.getKey(keyString);
+    long conID = key.getOzoneKeyLocations().get(0).getContainerID();
+
+    //Close the container first.
+    scm.getContainerManager().getContainerStateManager().updateContainerState(
+        HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+        HddsProtos.LifeCycleEvent.FINALIZE);
+    scm.getContainerManager().getContainerStateManager().updateContainerState(
+        HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
+        scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
+            .getPipelineID());
+
+    SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+
+    List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
+    List<Pipeline> containerToDeletePipeline = new ArrayList<>();
+    for (DatanodeDetails srcDn : nodeSet) {
+      int replIndex = containerPipeline.getReplicaIndex(srcDn);
+      if (missingIndexes.contains(replIndex)) {
+        containerToDeletePipeline
+            .add(createSingleNodePipeline(containerPipeline, srcDn, replIndex));
+        continue;
+      }
+      sourceNodeMap.put(replIndex, srcDn);
+    }
+
+    //Find nodes outside of pipeline
+    List<DatanodeDetails> clusterDnsList =
+        cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
+            .collect(Collectors.toList());
+    List<DatanodeDetails> targetNodes = new ArrayList<>();
+    for (DatanodeDetails clusterDN : clusterDnsList) {
+      if (!nodeSet.contains(clusterDN)) {
+        targetNodes.add(clusterDN);
+        if (targetNodes.size() == missingIndexes.size()) {
+          break;
+        }
+      }
+    }
+
+    Assert.assertEquals(missingIndexes.size(), targetNodes.size());
+
+    List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
+        blockDataArrList = new ArrayList<>();
+    for (int j = 0; j < containerToDeletePipeline.size(); j++) {
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData =
+          new ECContainerOperationClient(new OzoneConfiguration(), null)
+              .listBlock(conID, containerToDeletePipeline.get(j).getFirstNode(),
+                  (ECReplicationConfig) containerToDeletePipeline.get(j)
+                      .getReplicationConfig(), null);
+      blockDataArrList.add(blockData);
+      // Delete the first index container
+      ContainerProtocolCalls.deleteContainer(
+          xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)),
+          conID, true, null);
+    }
+
+    //Give the new target to reconstruct the container
+    SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+    for (int k = 0; k < missingIndexes.size(); k++) {
+      targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
+    }
+
+    coordinator.reconstructECContainerGroup(conID,
+        (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+        sourceNodeMap, targetNodeMap);
+
+    // Assert the original container metadata with the new recovered container.
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        targetNodeMap.entrySet().iterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      DatanodeDetails targetDN = next.getValue();
+      Map<DatanodeDetails, Integer> indexes = new HashMap<>();
+      indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
+          targetNodeMap.entrySet().iterator().next().getKey());
+      Pipeline newTargetPipeline =
+          Pipeline.newBuilder().setId(PipelineID.randomId())
+              .setReplicationConfig(containerPipeline.getReplicationConfig())
+              .setReplicaIndexes(indexes)
+              .setState(Pipeline.PipelineState.CLOSED)
+              .setNodes(ImmutableList.of(targetDN)).build();
+
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+          reconstructedBlockData =
+          new ECContainerOperationClient(new OzoneConfiguration(), null)
+              .listBlock(conID, newTargetPipeline.getFirstNode(),
+                  (ECReplicationConfig) newTargetPipeline
+                      .getReplicationConfig(), null);
+      Assert.assertEquals(blockDataArrList.get(i).length,
+          reconstructedBlockData.length);
+      checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+      ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
+          ContainerProtocolCalls.readContainer(
+              xceiverClientManager.acquireClient(newTargetPipeline), conID,
+              null);
+      Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+          readContainerResponseProto.getContainerData().getState());
+      i++;
+    }
+
+  }
+
+  private void checkBlockData(
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+          reconstructedBlockData) {
+
+    for (int i = 0; i < blockData.length; i++) {
+      List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
+          blockData[i].getChunks();
+      List<ContainerProtos.ChunkInfo> newBlockDataChunks =
+          reconstructedBlockData[i].getChunks();
+      for (int j = 0; j < oldBlockDataChunks.size(); j++) {
+        ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
+        if (chunkInfo.getLen() == 0) {
+          // let's ignore the empty chunks
+          continue;
+        }
+        Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
+      }
+    }
+
+   /* Assert.assertEquals(
+        Arrays.stream(blockData).map(b -> b.getProtoBufMessage())
+            .collect(Collectors.toList()),
+        Arrays.stream(reconstructedBlockData).map(b -> b.getProtoBufMessage())
+            .collect(Collectors.toList()));*/
+  }
+
   public static void startCluster(OzoneConfiguration conf) throws Exception {
 
     // Set minimum pipeline to 1 to ensure all data is written to
@@ -287,15 +506,12 @@ public class TestContainerCommandsEC {
     writableECContainerProviderConfig.setMinimumPipelines(1);
     conf.setFromObject(writableECContainerProviderConfig);
 
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(NUM_DN)
-        .setScmId(SCM_ID)
-        .setClusterId(CLUSTER_ID)
-        .build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(NUM_DN)
+        .setScmId(SCM_ID).setClusterId(CLUSTER_ID).build();
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
-    client = OzoneClientFactory.getRpcClient(conf);
-    store = client.getObjectStore();
+    rpcClient = OzoneClientFactory.getRpcClient(conf);
+    store = rpcClient.getObjectStore();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
   }
@@ -314,8 +530,8 @@ public class TestContainerCommandsEC {
       int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
       values[i] = RandomUtils.nextBytes(keySize);
       final String keyName = UUID.randomUUID().toString();
-      try (OutputStream out = bucket.createKey(
-          keyName, values[i].length, repConfig, new HashMap<>())) {
+      try (OutputStream out = bucket
+          .createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
         out.write(values[i]);
       }
     }
@@ -330,8 +546,8 @@ public class TestContainerCommandsEC {
   }
 
   public static void stopCluster() throws IOException {
-    if (client != null) {
-      client.close();
+    if (rpcClient != null) {
+      rpcClient.close();
     }
 
     if (storageContainerLocationClient != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org