You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/06/16 17:25:25 UTC

[ozone] branch master updated (6ef1df7b47 -> f57a0193c1)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


    omit 6ef1df7b47 HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
     new f57a0193c1 HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6ef1df7b47)
            \
             N -- N -- N   refs/heads/master (f57a0193c1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 01/01: HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)

Posted by um...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit f57a0193c114ac2f17dc255ec1a052dcf12781b8
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu Jun 16 09:15:39 2022 -0700

    HDDS-6806. EC: Implement the EC Reconstruction coordinator. (#3504)
    
    Co-authored-by: Gui Hecheng <ma...@tencent.com>
---
 .../hadoop/hdds/scm/XceiverClientManager.java      |  33 ++
 .../hdds/scm/storage/ECBlockOutputStream.java      |   5 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  19 +-
 .../common/statemachine/DatanodeStateMachine.java  |  10 +-
 .../ReconstructECContainersCommandHandler.java     |   5 +-
 .../reconstruction/ECContainerOperationClient.java | 148 ++++++++
 .../ECReconstructionCoordinator.java               | 410 +++++++++++++++++++++
 .../ECReconstructionCoordinatorTask.java           |  39 ++
 .../reconstruction/ECReconstructionSupervisor.java |  27 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |   1 +
 .../TestECReconstructionSupervisor.java            |   4 +-
 .../hdds/scm/storage/TestContainerCommandsEC.java  | 288 +++++++++++++--
 12 files changed, 938 insertions(+), 51 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index ee4edecb3c..0724d614cc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -66,6 +66,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       LoggerFactory.getLogger(XceiverClientManager.class);
   //TODO : change this to SCM configuration class
   private final ConfigurationSource conf;
+  private final ScmClientConfig clientConfig;
   private final Cache<String, XceiverClientSpi> clientCache;
   private List<X509Certificate> caCerts;
 
@@ -88,6 +89,7 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       List<X509Certificate> caCerts) throws IOException {
     Preconditions.checkNotNull(clientConf);
     Preconditions.checkNotNull(conf);
+    this.clientConfig = clientConf;
     long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
     this.conf = conf;
     this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
@@ -347,6 +349,37 @@ public class XceiverClientManager implements Closeable, XceiverClientFactory {
       this.maxSize = maxSize;
     }
 
+    public void setStaleThreshold(long threshold) {
+      this.staleThreshold = threshold;
+    }
+
+  }
+
+  /**
+   * Builder of ScmClientConfig.
+   */
+  public static class XceiverClientManagerConfigBuilder {
+
+    private int maxCacheSize;
+    private long staleThresholdMs;
+
+    public XceiverClientManagerConfigBuilder setMaxCacheSize(int maxCacheSize) {
+      this.maxCacheSize = maxCacheSize;
+      return this;
+    }
+
+    public XceiverClientManagerConfigBuilder setStaleThresholdMs(
+        long staleThresholdMs) {
+      this.staleThresholdMs = staleThresholdMs;
+      return this;
+    }
+
+    public ScmClientConfig build() {
+      ScmClientConfig clientConfig = new ScmClientConfig();
+      clientConfig.setMaxSize(this.maxCacheSize);
+      clientConfig.setStaleThreshold(this.staleThresholdMs);
+      return clientConfig;
+    }
   }
 
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 3f3fc12f2a..174e507829 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -80,6 +80,11 @@ public class ECBlockOutputStream extends BlockOutputStream {
         writeChunkToContainer(ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
   }
 
+  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> write(
+      ByteBuffer buff) throws IOException {
+    return writeChunkToContainer(ChunkBuffer.wrap(buff));
+  }
+
   public CompletableFuture<ContainerProtos.
       ContainerCommandResponseProto> executePutBlock(boolean close,
       boolean force, long blockGroupLength) throws IOException {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index b5365820e3..16bef69712 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -425,13 +425,15 @@ public final class ContainerProtocolCalls  {
    * @param client  - client
    * @param containerID - ID of container
    * @param encodedToken - encodedToken if security is enabled
+   * @param replicaIndex - index position of the container replica
    * @throws IOException
    */
   @InterfaceStability.Evolving
   public static void createRecoveringContainer(XceiverClientSpi client,
-      long containerID, String encodedToken) throws IOException {
+      long containerID, String encodedToken, int replicaIndex)
+      throws IOException {
     createContainerInternal(client, containerID, encodedToken,
-        ContainerProtos.ContainerDataProto.State.RECOVERING);
+        ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
   }
 
   /**
@@ -443,7 +445,7 @@ public final class ContainerProtocolCalls  {
    */
   public static void createContainer(XceiverClientSpi client, long containerID,
       String encodedToken) throws IOException {
-    createContainerInternal(client, containerID, encodedToken, null);
+    createContainerInternal(client, containerID, encodedToken, null, 0);
   }
   /**
    * createContainer call that creates a container on the datanode.
@@ -451,18 +453,23 @@ public final class ContainerProtocolCalls  {
    * @param containerID - ID of container
    * @param encodedToken - encodedToken if security is enabled
    * @param state - state of the container
+   * @param replicaIndex - index position of the container replica
    * @throws IOException
    */
   private static void createContainerInternal(XceiverClientSpi client,
       long containerID, String encodedToken,
-      ContainerProtos.ContainerDataProto.State state) throws IOException {
+      ContainerProtos.ContainerDataProto.State state, int replicaIndex)
+      throws IOException {
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto.newBuilder();
-    createRequest.setContainerType(ContainerProtos.ContainerType
-        .KeyValueContainer);
+    createRequest
+        .setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
     if (state != null) {
       createRequest.setState(state);
     }
+    if (replicaIndex > 0) {
+      createRequest.setReplicaIndex(replicaIndex);
+    }
 
     String id = client.getPipeline().getFirstNode().getUuidString();
     ContainerCommandRequestProto.Builder request =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ba21e5795e..cc05511b58 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Reco
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -181,9 +182,12 @@ public class DatanodeStateMachine implements Closeable {
     replicationSupervisorMetrics =
         ReplicationSupervisorMetrics.create(supervisor);
 
+    ECReconstructionCoordinator ecReconstructionCoordinator =
+        new ECReconstructionCoordinator(conf, certClient);
     ecReconstructionSupervisor =
         new ECReconstructionSupervisor(container.getContainerSet(), context,
-            replicationConfig.getReplicationMaxStreams());
+            replicationConfig.getReplicationMaxStreams(),
+            ecReconstructionCoordinator);
 
 
     // When we add new handlers just adding a new handler here should do the
@@ -389,6 +393,10 @@ public class DatanodeStateMachine implements Closeable {
       Thread.currentThread().interrupt();
     }
 
+    if (ecReconstructionSupervisor != null) {
+      ecReconstructionSupervisor.close();
+    }
+
     if (connectionManager != null) {
       connectionManager.close();
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index f4ec45f600..009d47d7af 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -53,8 +53,9 @@ public class ReconstructECContainersCommandHandler implements CommandHandler {
             ecContainersCommand.getMissingContainerIndexes(),
             ecContainersCommand.getSources(),
             ecContainersCommand.getTargetDatanodes());
-    this.supervisor.addTask(
-        new ECReconstructionCoordinatorTask(reconstructionCommandInfo));
+    this.supervisor.addTask(new ECReconstructionCoordinatorTask(
+        this.supervisor.getReconstructionCoordinator(),
+        reconstructionCommandInfo));
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
new file mode 100644
index 0000000000..adebe11776
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * This class wraps necessary container-level rpc calls
+ * during ec offline reconstruction.
+ *   - ListBlock
+ *   - CloseContainer
+ */
+public class ECContainerOperationClient implements Closeable {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ECContainerOperationClient.class);
+  private final XceiverClientManager xceiverClientManager;
+
+  public ECContainerOperationClient(XceiverClientManager clientManager) {
+    this.xceiverClientManager = clientManager;
+  }
+
+  public ECContainerOperationClient(ConfigurationSource conf,
+      CertificateClient certificateClient) throws IOException {
+    this(createClientManager(conf, certificateClient));
+  }
+
+  @NotNull
+  private static XceiverClientManager createClientManager(
+      ConfigurationSource conf, CertificateClient certificateClient)
+      throws IOException {
+    return new XceiverClientManager(conf,
+        new XceiverClientManager.XceiverClientManagerConfigBuilder()
+            .setMaxCacheSize(256).setStaleThresholdMs(10 * 1000).build(),
+        certificateClient != null ?
+            HAUtils.buildCAX509List(certificateClient, conf) :
+            null);
+  }
+
+  public BlockData[] listBlock(long containerId, DatanodeDetails dn,
+      ECReplicationConfig repConfig, Token<? extends TokenIdentifier> token)
+      throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager
+        .acquireClient(singleNodePipeline(dn, repConfig));
+    try {
+      List<ContainerProtos.BlockData> blockDataList = ContainerProtocolCalls
+          .listBlock(xceiverClient, containerId, null, Integer.MAX_VALUE, token)
+          .getBlockDataList();
+      return blockDataList.stream().map(i -> {
+        try {
+          return BlockData.getFromProtoBuf(i);
+        } catch (IOException e) {
+          LOG.debug("Failed while converting to protobuf BlockData. Returning"
+                  + " null for listBlock from DN: " + dn,
+              e);
+          // TODO: revisit here.
+          return null;
+        }
+      }).collect(Collectors.toList())
+          .toArray(new BlockData[blockDataList.size()]);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  public void closeContainer(long containerID, DatanodeDetails dn,
+      ECReplicationConfig repConfig, String encodedToken) throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager
+        .acquireClient(singleNodePipeline(dn, repConfig));
+    try {
+      ContainerProtocolCalls
+          .closeContainer(xceiverClient, containerID, encodedToken);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  public void createRecoveringContainer(long containerID, DatanodeDetails dn,
+      ECReplicationConfig repConfig, String encodedToken, int replicaIndex)
+      throws IOException {
+    XceiverClientSpi xceiverClient = this.xceiverClientManager.acquireClient(
+        singleNodePipeline(dn, repConfig));
+    try {
+      ContainerProtocolCalls
+          .createRecoveringContainer(xceiverClient, containerID, encodedToken,
+              replicaIndex);
+    } finally {
+      this.xceiverClientManager.releaseClient(xceiverClient, false);
+    }
+  }
+
+  Pipeline singleNodePipeline(DatanodeDetails dn,
+      ECReplicationConfig repConfig) {
+    // To get the same client from cache, we try to use the DN UUID as
+    // pipelineID for uniqueness. Please note, pipeline does not have any
+    // significance after it's close. So, we are ok to use any ID.
+    return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
+        .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
+        .setState(Pipeline.PipelineState.CLOSED).build();
+  }
+
+  public XceiverClientManager getXceiverClientManager() {
+    return xceiverClientManager;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (xceiverClientManager != null) {
+      xceiverClientManager.close();
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
new file mode 100644
index 0000000000..780c520d72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.ec.reconstruction;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.io.ByteBufferPool;
+import org.apache.hadoop.io.ElasticByteBufferPool;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * The Coordinator implements the main flow of reconstructing
+ * missing container replicas.
+ * <p>
+ * For a container reconstruction task, the main flow is:
+ * - ListBlock from all healthy replicas
+ * - calculate effective block group len for all blocks
+ * - create RECOVERING containers in TargetDNs
+ * -  for each block
+ * -    build a ECReconstructedStripedInputStream to read healthy chunks
+ * -    build a ECBlockOutputStream to write out decoded chunks
+ * -      for each stripe
+ * -        use ECReconstructedStripedInputStream.recoverChunks to decode chunks
+ * -        use ECBlockOutputStream.write to write decoded chunks to TargetDNs
+ * -    PutBlock
+ * - Close RECOVERING containers in TargetDNs
+ */
+public class ECReconstructionCoordinator implements Closeable {
+
+  static final Logger LOG =
+      LoggerFactory.getLogger(ECReconstructionCoordinator.class);
+
+  private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+
+  private final ECContainerOperationClient containerOperationClient;
+
+  private final ConfigurationSource config;
+
+  private final ByteBufferPool byteBufferPool;
+
+  private ExecutorService ecReconstructExecutor;
+
+  private BlockInputStreamFactory blockInputStreamFactory;
+
+  public ECReconstructionCoordinator(ECContainerOperationClient containerClient,
+      ConfigurationSource conf, ByteBufferPool byteBufferPool,
+      ExecutorService reconstructExecutor,
+      BlockInputStreamFactory streamFactory) {
+    this.containerOperationClient = containerClient;
+    this.config = conf;
+    this.byteBufferPool = byteBufferPool;
+    this.blockInputStreamFactory = streamFactory;
+    this.ecReconstructExecutor = reconstructExecutor;
+  }
+
+  public ECReconstructionCoordinator(ConfigurationSource conf,
+      CertificateClient certificateClient) throws IOException {
+    this(new ECContainerOperationClient(conf, certificateClient), conf,
+        new ElasticByteBufferPool(), null, null);
+    this.ecReconstructExecutor =
+        new ThreadPoolExecutor(EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+            config.getObject(OzoneClientConfig.class)
+                .getEcReconstructStripeReadPoolLimit(), 60, TimeUnit.SECONDS,
+            new SynchronousQueue<>(), new ThreadFactoryBuilder()
+            .setNameFormat("ec-reconstruct-reader-TID-%d").build(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.blockInputStreamFactory = BlockInputStreamFactoryImpl
+        .getInstance(byteBufferPool, () -> ecReconstructExecutor);
+  }
+
+  public void reconstructECContainerGroup(long containerID,
+      ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> sourceNodeMap,
+      SortedMap<Integer, DatanodeDetails> targetNodeMap) throws IOException {
+
+    Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap);
+
+    SortedMap<Long, BlockData[]> blockDataMap =
+        getBlockDataMap(containerID, repConfig, sourceNodeMap);
+
+    SortedMap<Long, BlockLocationInfo> blockLocationInfoMap =
+        calcBlockLocationInfoMap(containerID, blockDataMap, pipeline);
+
+    // 1. create target recovering containers.
+    for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+        .entrySet()) {
+      this.containerOperationClient
+          .createRecoveringContainer(containerID, indexDnPair.getValue(),
+              repConfig, null, indexDnPair.getKey());
+    }
+
+    // 2. Reconstruct and transfer to targets
+    for (BlockLocationInfo blockLocationInfo : blockLocationInfoMap.values()) {
+      reconstructECBlockGroup(blockLocationInfo, repConfig, targetNodeMap);
+    }
+
+    // 3. Close containers
+    for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
+        .entrySet()) {
+      DatanodeDetails dn = indexDnPair.getValue();
+      this.containerOperationClient
+          .closeContainer(containerID, dn, repConfig, null);
+    }
+
+  }
+
+  void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
+      ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> targetMap)
+      throws IOException {
+    long safeBlockGroupLength = blockLocationInfo.getLength();
+    List<Integer> missingContainerIndexes = new ArrayList<>(targetMap.keySet());
+
+    // calculate the real missing block indexes
+    int dataLocs = ECBlockInputStreamProxy
+        .expectedDataLocations(repConfig, safeBlockGroupLength);
+    List<Integer> toReconstructIndexes = new ArrayList<>();
+    for (Integer index : missingContainerIndexes) {
+      if (index <= dataLocs || index > repConfig.getData()) {
+        toReconstructIndexes.add(index);
+      }
+      // else padded indexes.
+    }
+
+    // Looks like we don't need to reconstruct any missing blocks in this block
+    // group. The reason for this should be block group had only padding blocks
+    // in the missing locations.
+    if (toReconstructIndexes.size() == 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping the reconstruction for the block: "
+            + blockLocationInfo.getBlockID() + ". In the missing locations: "
+            + missingContainerIndexes
+            + ", this block group has only padded blocks.");
+      }
+      return;
+    }
+
+    try (ECBlockReconstructedStripeInputStream sis
+        = new ECBlockReconstructedStripeInputStream(
+        repConfig, blockLocationInfo, true,
+        this.containerOperationClient.getXceiverClientManager(), null,
+        this.blockInputStreamFactory, byteBufferPool,
+        this.ecReconstructExecutor)) {
+
+      ECBlockOutputStream[] targetBlockStreams =
+          new ECBlockOutputStream[toReconstructIndexes.size()];
+      ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
+      OzoneClientConfig configuration = new OzoneClientConfig();
+      // TODO: Let's avoid unnecessary bufferPool creation. This pool actually
+      //  not used in EC flows, but there are some dependencies on buffer pool.
+      BufferPool bufferPool =
+          new BufferPool(configuration.getStreamBufferSize(),
+              (int) (configuration.getStreamBufferMaxSize() / configuration
+                  .getStreamBufferSize()),
+              ByteStringConversion.createByteBufferConversion(false));
+      for (int i = 0; i < toReconstructIndexes.size(); i++) {
+        DatanodeDetails datanodeDetails =
+            targetMap.get(toReconstructIndexes.get(i));
+        targetBlockStreams[i] =
+            new ECBlockOutputStream(blockLocationInfo.getBlockID(),
+                this.containerOperationClient.getXceiverClientManager(),
+                this.containerOperationClient
+                    .singleNodePipeline(datanodeDetails, repConfig), bufferPool,
+                configuration, blockLocationInfo.getToken());
+        bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
+        // Make sure it's clean. Don't want to reuse the erroneously returned
+        // buffers from the pool.
+        bufs[i].clear();
+      }
+
+      sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
+          .collect(Collectors.toSet()));
+      long length = safeBlockGroupLength;
+      while (length > 0) {
+        int readLen = sis.recoverChunks(bufs);
+        // TODO: can be submitted in parallel
+        for (int i = 0; i < bufs.length; i++) {
+          CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+              future = targetBlockStreams[i].write(bufs[i]);
+          checkFailures(targetBlockStreams[i], future);
+          bufs[i].clear();
+        }
+        length -= readLen;
+      }
+
+      try {
+        for (ECBlockOutputStream targetStream : targetBlockStreams) {
+          targetStream
+              .executePutBlock(true, true, blockLocationInfo.getLength());
+          checkFailures(targetStream,
+              targetStream.getCurrentPutBlkResponseFuture());
+        }
+      } finally {
+        for (ByteBuffer buf : bufs) {
+          byteBufferPool.putBuffer(buf);
+        }
+        IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
+      }
+    }
+  }
+
+  private void checkFailures(ECBlockOutputStream targetBlockStream,
+      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+          currentPutBlkResponseFuture)
+      throws IOException {
+    if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) {
+      // If one chunk response failed, we should retry.
+      // Even after retries if it failed, we should declare the
+      // reconstruction as failed.
+      // For now, let's throw the exception.
+      throw new IOException(
+          "Chunk write failed at the new target node: " + targetBlockStream
+              .getDatanodeDetails() + ". Aborting the reconstruction process.");
+    }
+  }
+
+  private boolean isFailed(ECBlockOutputStream outputStream,
+      CompletableFuture<ContainerProtos.
+          ContainerCommandResponseProto> chunkWriteResponseFuture) {
+    if (chunkWriteResponseFuture == null) {
+      return true;
+    }
+
+    ContainerProtos.ContainerCommandResponseProto
+        containerCommandResponseProto = null;
+    try {
+      containerCommandResponseProto = chunkWriteResponseFuture.get();
+    } catch (InterruptedException e) {
+      outputStream.setIoException(e);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      outputStream.setIoException(e);
+    }
+
+    if (outputStream.getIoException() != null) {
+      return true;
+    }
+
+    if (containerCommandResponseProto == null) {
+      return true;
+    }
+
+    return false;
+  }
+
+  SortedMap<Long, BlockLocationInfo> calcBlockLocationInfoMap(long containerID,
+      SortedMap<Long, BlockData[]> blockDataMap, Pipeline pipeline) {
+
+    SortedMap<Long, BlockLocationInfo> blockInfoMap = new TreeMap<>();
+
+    for (Map.Entry<Long, BlockData[]> entry : blockDataMap.entrySet()) {
+      Long localID = entry.getKey();
+      BlockData[] blockGroup = entry.getValue();
+
+      long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup,
+          pipeline.getReplicationConfig().getRequiredNodes());
+      if (blockGroupLen > 0) {
+        BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder()
+            .setBlockID(new BlockID(containerID, localID))
+            .setLength(blockGroupLen).setPipeline(pipeline).build();
+        blockInfoMap.put(localID, blockLocationInfo);
+      }
+    }
+    return blockInfoMap;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (containerOperationClient != null) {
+      containerOperationClient.close();
+    }
+  }
+
+  private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
+      SortedMap<Integer, DatanodeDetails> sourceNodeMap) {
+
+    List<DatanodeDetails> nodes = new ArrayList<>(sourceNodeMap.values());
+    Map<DatanodeDetails, Integer> dnVsIndex = new HashMap<>();
+
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        sourceNodeMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      Integer key = next.getKey();
+      DatanodeDetails value = next.getValue();
+      dnVsIndex.put(value, key);
+    }
+
+    return Pipeline.newBuilder().setId(PipelineID.randomId())
+        .setReplicationConfig(repConfig).setNodes(nodes)
+        .setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED)
+        .build();
+  }
+
+  private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
+      ECReplicationConfig repConfig,
+      Map<Integer, DatanodeDetails> sourceNodeMap) throws IOException {
+
+    SortedMap<Long, BlockData[]> resultMap = new TreeMap<>();
+
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        sourceNodeMap.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      Integer index = next.getKey();
+      DatanodeDetails dn = next.getValue();
+
+      BlockData[] blockDataArr =
+          containerOperationClient.listBlock(containerID, dn, repConfig, null);
+
+      for (BlockData blockData : blockDataArr) {
+        BlockID blockID = blockData.getBlockID();
+        BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(),
+            new BlockData[repConfig.getRequiredNodes()]);
+        blkDataArr[index - 1] = blockData;
+        resultMap.put(blockID.getLocalID(), blkDataArr);
+      }
+    }
+    return resultMap;
+  }
+
+  /**
+   * Get the effective length of each block group.
+   * We can not be absolutely accurate when there is a failed stripe
+   * in this block since the failed cells could be missing, and
+   * we can not tell from the healthy cells whether the last stripe
+   * is failed or not. But in such case we at most recover one extra
+   * stripe for this block which does not confuse the client data view.
+   *
+   * @param blockGroup
+   * @param replicaCount
+   * @return
+   */
+  private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
+      int replicaCount) {
+    Preconditions.checkState(blockGroup.length == replicaCount);
+
+    long blockGroupLen = Long.MAX_VALUE;
+
+    for (int i = 0; i < replicaCount; i++) {
+      if (blockGroup[i] == null) {
+        continue;
+      }
+
+      String putBlockLenStr = blockGroup[i].getMetadata()
+          .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
+      long putBlockLen = (putBlockLenStr == null) ?
+          Long.MAX_VALUE :
+          Long.parseLong(putBlockLenStr);
+      // Use the min to be conservative
+      blockGroupLen = Math.min(putBlockLen, blockGroupLen);
+    }
+    return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
index 24168e5f69..af4a87e272 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java
@@ -17,14 +17,30 @@
  */
 package org.apache.hadoop.ozone.container.ec.reconstruction;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 /**
  * This is the actual EC reconstruction coordination task.
  */
 public class ECReconstructionCoordinatorTask implements Runnable {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class);
+  private ECReconstructionCoordinator reconstructionCoordinator;
   private ECReconstructionCommandInfo reconstructionCommandInfo;
 
   public ECReconstructionCoordinatorTask(
+      ECReconstructionCoordinator coordinator,
       ECReconstructionCommandInfo reconstructionCommandInfo) {
+    this.reconstructionCoordinator = coordinator;
     this.reconstructionCommandInfo = reconstructionCommandInfo;
   }
 
@@ -42,6 +58,29 @@ public class ECReconstructionCoordinatorTask implements Runnable {
     // 4. Write the recovered chunks to given targets/write locally to
     // respective container. HDDS-6582
     // 5. Close/finalize the recovered containers.
+
+    SortedMap<Integer, DatanodeDetails> sourceNodeMap =
+        reconstructionCommandInfo.getSources().stream().collect(Collectors
+            .toMap(DatanodeDetailsAndReplicaIndex::getReplicaIndex,
+                DatanodeDetailsAndReplicaIndex::getDnDetails, (v1, v2) -> v1,
+                TreeMap::new));
+    SortedMap<Integer, DatanodeDetails> targetNodeMap = IntStream
+        .range(0, reconstructionCommandInfo.getTargetDatanodes().size()).boxed()
+        .collect(Collectors.toMap(i -> (int) reconstructionCommandInfo
+                .getMissingContainerIndexes()[i],
+            i -> reconstructionCommandInfo.getTargetDatanodes().get(i),
+            (v1, v2) -> v1, TreeMap::new));
+
+    try {
+      reconstructionCoordinator.reconstructECContainerGroup(
+          reconstructionCommandInfo.getContainerID(),
+          reconstructionCommandInfo.getEcReplicationConfig(), sourceNodeMap,
+          targetNodeMap);
+    } catch (IOException e) {
+      LOG.warn(
+          "Failed to complete the reconstruction task for the container: "
+              + reconstructionCommandInfo.getContainerID(), e);
+    }
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
index e2c930a8c2..2af04f6500 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionSupervisor.java
@@ -21,6 +21,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -30,28 +32,33 @@ import java.util.concurrent.TimeUnit;
  * This class is to handle all the EC reconstruction tasks to be scheduled as
  * they arrive.
  */
-public class ECReconstructionSupervisor {
+public class ECReconstructionSupervisor implements Closeable {
 
   private final ContainerSet containerSet;
   private final StateContext context;
   private final ExecutorService executor;
+  private final ECReconstructionCoordinator reconstructionCoordinator;
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
-      StateContext context, ExecutorService executor) {
+      StateContext context, ExecutorService executor,
+      ECReconstructionCoordinator coordinator) {
     this.containerSet = containerSet;
     this.context = context;
     this.executor = executor;
+    this.reconstructionCoordinator = coordinator;
   }
 
   public ECReconstructionSupervisor(ContainerSet containerSet,
-      StateContext context, int poolSize) {
+      StateContext context, int poolSize,
+      ECReconstructionCoordinator coordinator) {
     // TODO: ReplicationSupervisor and this class can be refactored to have a
     //  common interface.
     this(containerSet, context,
         new ThreadPoolExecutor(poolSize, poolSize, 60, TimeUnit.SECONDS,
             new LinkedBlockingQueue<>(),
             new ThreadFactoryBuilder().setDaemon(true)
-                .setNameFormat("ECContainerReconstructionThread-%d").build()));
+                .setNameFormat("ECContainerReconstructionThread-%d").build()),
+        coordinator);
   }
 
   public void stop() {
@@ -69,4 +76,16 @@ public class ECReconstructionSupervisor {
   public void addTask(ECReconstructionCoordinatorTask task) {
     executor.execute(task);
   }
+
+  @Override
+  public void close() throws IOException {
+    if (reconstructionCoordinator != null) {
+      reconstructionCoordinator.close();
+    }
+    stop();
+  }
+
+  public ECReconstructionCoordinator getReconstructionCoordinator() {
+    return reconstructionCoordinator;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 7fcbdb3e7f..9590711650 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -283,6 +283,7 @@ public class KeyValueHandler extends Handler {
     }
     newContainerData.setReplicaIndex(request.getCreateContainer()
         .getReplicaIndex());
+
     // TODO: Add support to add metadataList to ContainerData. Add metadata
     // to container during creation.
     KeyValueContainer newContainer = new KeyValueContainer(
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
index e86be82a09..2403364f4c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ec/reconstruction/TestECReconstructionSupervisor.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeoutException;
 public class TestECReconstructionSupervisor {
 
   private final ECReconstructionSupervisor supervisor =
-      new ECReconstructionSupervisor(null, null, 5);
+      new ECReconstructionSupervisor(null, null, 5, null);
 
   @Test
   public void testAddTaskShouldExecuteTheGivenTask()
@@ -42,7 +42,7 @@ public class TestECReconstructionSupervisor {
     private boolean isExecuted = false;
 
     FakeTask(ECReconstructionCommandInfo reconstructionCommandInfo) {
-      super(reconstructionCommandInfo);
+      super(null, reconstructionCommandInfo);
     }
 
     @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 76cfb7b289..3047d18d7e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -47,10 +48,16 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECContainerOperationClient;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
 import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -58,6 +65,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -65,10 +74,16 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -79,7 +94,7 @@ public class TestContainerCommandsEC {
 
   private static MiniOzoneCluster cluster;
   private static StorageContainerManager scm;
-  private static OzoneClient client;
+  private static OzoneClient rpcClient;
   private static ObjectStore store;
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
@@ -90,17 +105,16 @@ public class TestContainerCommandsEC {
   private static final EcCodec EC_CODEC = EcCodec.RS;
   private static final int EC_CHUNK_SIZE = 1024;
   private static final int STRIPE_DATA_SIZE = EC_DATA * EC_CHUNK_SIZE;
-  private static final int NUM_DN = EC_DATA + EC_PARITY;
+  private static final int NUM_DN = EC_DATA + EC_PARITY + 3;
+  private static byte[][] inputChunks = new byte[EC_DATA][EC_CHUNK_SIZE];
 
   // Each key size will be in range [min, max), min inclusive, max exclusive
-  private static final int[][] KEY_SIZE_RANGES = new int[][]{
-      {1, EC_CHUNK_SIZE},
-      {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
-      {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
-      {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
-      {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
-      {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2},
-  };
+  private static final int[][] KEY_SIZE_RANGES =
+      new int[][] {{1, EC_CHUNK_SIZE}, {EC_CHUNK_SIZE, EC_CHUNK_SIZE + 1},
+          {EC_CHUNK_SIZE + 1, STRIPE_DATA_SIZE},
+          {STRIPE_DATA_SIZE, STRIPE_DATA_SIZE + 1},
+          {STRIPE_DATA_SIZE + 1, STRIPE_DATA_SIZE + EC_CHUNK_SIZE},
+          {STRIPE_DATA_SIZE + EC_CHUNK_SIZE, STRIPE_DATA_SIZE * 2}};
   private static byte[][] values;
   private static long containerID;
   private static Pipeline pipeline;
@@ -117,6 +131,7 @@ public class TestContainerCommandsEC {
         OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE);
     startCluster(config);
     prepareData(KEY_SIZE_RANGES);
+    rpcClient = OzoneClientFactory.getRpcClient(config);
   }
 
   @AfterAll
@@ -130,13 +145,11 @@ public class TestContainerCommandsEC {
     Map<DatanodeDetails, Integer> indicesForSinglePipeline = new HashMap<>();
     indicesForSinglePipeline.put(node, replicaIndex);
 
-    return Pipeline.newBuilder()
-        .setId(ecPipeline.getId())
+    return Pipeline.newBuilder().setId(ecPipeline.getId())
         .setReplicationConfig(ecPipeline.getReplicationConfig())
         .setState(ecPipeline.getPipelineState())
         .setNodes(ImmutableList.of(node))
-        .setReplicaIndexes(indicesForSinglePipeline)
-        .build();
+        .setReplicaIndexes(indicesForSinglePipeline).build();
   }
 
   @BeforeEach
@@ -175,22 +188,27 @@ public class TestContainerCommandsEC {
   public void testListBlock() throws Exception {
     for (int i = 0; i < datanodeDetails.size(); i++) {
       final int minKeySize = i < EC_DATA ? i * EC_CHUNK_SIZE : 0;
-      final int numExpectedBlocks = (int) Arrays.stream(values)
-          .mapToInt(v -> v.length).filter(s -> s > minKeySize).count();
+      final int numExpectedBlocks =
+          (int) Arrays.stream(values).mapToInt(v -> v.length)
+              .filter(s -> s > minKeySize).count();
       Function<Integer, Integer> expectedChunksFunc = chunksInReplicaFunc(i);
-      final int numExpectedChunks = Arrays.stream(values)
-          .mapToInt(v -> v.length).map(expectedChunksFunc::apply).sum();
+      final int numExpectedChunks =
+          Arrays.stream(values).mapToInt(v -> v.length)
+              .map(expectedChunksFunc::apply).sum();
       if (numExpectedBlocks == 0) {
         final int j = i;
         Throwable t = Assertions.assertThrows(StorageContainerException.class,
-            () -> ContainerProtocolCalls.listBlock(clients.get(j),
-                containerID, null, numExpectedBlocks + 1, null));
-        Assertions.assertEquals(
-            "ContainerID " + containerID + " does not exist", t.getMessage());
+            () -> ContainerProtocolCalls
+                .listBlock(clients.get(j), containerID, null,
+                    numExpectedBlocks + 1, null));
+        Assertions
+            .assertEquals("ContainerID " + containerID + " does not exist",
+                t.getMessage());
         continue;
       }
-      ListBlockResponseProto response = ContainerProtocolCalls.listBlock(
-          clients.get(i), containerID, null, numExpectedBlocks + 1, null);
+      ListBlockResponseProto response = ContainerProtocolCalls
+          .listBlock(clients.get(i), containerID, null, numExpectedBlocks + 1,
+              null);
       Assertions.assertEquals(numExpectedBlocks, response.getBlockDataCount(),
           "blocks count doesn't match on DN " + i);
       Assertions.assertEquals(numExpectedChunks,
@@ -228,7 +246,7 @@ public class TestContainerCommandsEC {
 
       //Create the recovering container in DN.
       ContainerProtocolCalls.createRecoveringContainer(dnClient,
-          container.containerID().getProtobuf().getId(), null);
+          container.containerID().getProtobuf().getId(), null, 4);
 
       BlockID blockID = ContainerTestHelper
           .getTestBlockID(container.containerID().getProtobuf().getId());
@@ -278,6 +296,207 @@ public class TestContainerCommandsEC {
     }
   }
 
+  private static byte[] getBytesWith(int singleDigitNumber, int total) {
+    StringBuilder builder = new StringBuilder(singleDigitNumber);
+    for (int i = 1; i <= total; i++) {
+      builder.append(singleDigitNumber);
+    }
+    return builder.toString().getBytes(UTF_8);
+  }
+
+  @ParameterizedTest
+  @MethodSource("recoverableMissingIndexes")
+  void testECReconstructionCoordinatorWith(List<Integer> missingIndexes)
+      throws Exception {
+    testECReconstructionCoordinator(missingIndexes);
+  }
+
+  static Stream<List<Integer>> recoverableMissingIndexes() {
+    return Stream
+        .concat(IntStream.rangeClosed(1, 5).mapToObj(ImmutableList::of), Stream
+            .of(ImmutableList.of(2, 3), ImmutableList.of(2, 4),
+                ImmutableList.of(3, 5), ImmutableList.of(4, 5)));
+  }
+
+  /**
+   * Tests the reconstruction of data when more than parity blocks missed.
+   * Test should throw InsufficientLocationsException.
+   */
+  @Test
+  public void testECReconstructionCoordinatorWithMissingIndexes135() {
+    InsufficientLocationsException exception =
+        Assert.assertThrows(InsufficientLocationsException.class, () -> {
+          testECReconstructionCoordinator(ImmutableList.of(1, 3, 5));
+        });
+
+    String expectedMessage =
+        "There are insufficient datanodes to read the EC block";
+    String actualMessage = exception.getMessage();
+
+    Assert.assertEquals(expectedMessage, actualMessage);
+  }
+
+  private void testECReconstructionCoordinator(List<Integer> missingIndexes)
+      throws Exception {
+    ObjectStore objectStore = rpcClient.getObjectStore();
+    String keyString = UUID.randomUUID().toString();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+    OzoneVolume volume = objectStore.getVolume(volumeName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+    for (int i = 0; i < EC_DATA; i++) {
+      inputChunks[i] = getBytesWith(i + 1, EC_CHUNK_SIZE);
+    }
+    XceiverClientManager xceiverClientManager =
+        new XceiverClientManager(config);
+    try (OzoneOutputStream out = bucket.createKey(keyString, 4096,
+        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024),
+        new HashMap<>())) {
+      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+      for (int i = 0; i < inputChunks.length; i++) {
+        out.write(inputChunks[i]);
+      }
+    }
+
+    ECReconstructionCoordinator coordinator =
+        new ECReconstructionCoordinator(config, null);
+
+    OzoneKeyDetails key = bucket.getKey(keyString);
+    long conID = key.getOzoneKeyLocations().get(0).getContainerID();
+
+    //Close the container first.
+    scm.getContainerManager().getContainerStateManager().updateContainerState(
+        HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+        HddsProtos.LifeCycleEvent.FINALIZE);
+    scm.getContainerManager().getContainerStateManager().updateContainerState(
+        HddsProtos.ContainerID.newBuilder().setId(conID).build(),
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
+        scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
+            .getPipelineID());
+
+    SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+
+    List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
+    List<Pipeline> containerToDeletePipeline = new ArrayList<>();
+    for (DatanodeDetails srcDn : nodeSet) {
+      int replIndex = containerPipeline.getReplicaIndex(srcDn);
+      if (missingIndexes.contains(replIndex)) {
+        containerToDeletePipeline
+            .add(createSingleNodePipeline(containerPipeline, srcDn, replIndex));
+        continue;
+      }
+      sourceNodeMap.put(replIndex, srcDn);
+    }
+
+    //Find nodes outside of pipeline
+    List<DatanodeDetails> clusterDnsList =
+        cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
+            .collect(Collectors.toList());
+    List<DatanodeDetails> targetNodes = new ArrayList<>();
+    for (DatanodeDetails clusterDN : clusterDnsList) {
+      if (!nodeSet.contains(clusterDN)) {
+        targetNodes.add(clusterDN);
+        if (targetNodes.size() == missingIndexes.size()) {
+          break;
+        }
+      }
+    }
+
+    Assert.assertEquals(missingIndexes.size(), targetNodes.size());
+
+    List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
+        blockDataArrList = new ArrayList<>();
+    for (int j = 0; j < containerToDeletePipeline.size(); j++) {
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData =
+          new ECContainerOperationClient(new OzoneConfiguration(), null)
+              .listBlock(conID, containerToDeletePipeline.get(j).getFirstNode(),
+                  (ECReplicationConfig) containerToDeletePipeline.get(j)
+                      .getReplicationConfig(), null);
+      blockDataArrList.add(blockData);
+      // Delete the first index container
+      ContainerProtocolCalls.deleteContainer(
+          xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)),
+          conID, true, null);
+    }
+
+    //Give the new target to reconstruct the container
+    SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+    for (int k = 0; k < missingIndexes.size(); k++) {
+      targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
+    }
+
+    coordinator.reconstructECContainerGroup(conID,
+        (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+        sourceNodeMap, targetNodeMap);
+
+    // Assert the original container metadata with the new recovered container.
+    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+        targetNodeMap.entrySet().iterator();
+    int i = 0;
+    while (iterator.hasNext()) {
+      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+      DatanodeDetails targetDN = next.getValue();
+      Map<DatanodeDetails, Integer> indexes = new HashMap<>();
+      indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
+          targetNodeMap.entrySet().iterator().next().getKey());
+      Pipeline newTargetPipeline =
+          Pipeline.newBuilder().setId(PipelineID.randomId())
+              .setReplicationConfig(containerPipeline.getReplicationConfig())
+              .setReplicaIndexes(indexes)
+              .setState(Pipeline.PipelineState.CLOSED)
+              .setNodes(ImmutableList.of(targetDN)).build();
+
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+          reconstructedBlockData =
+          new ECContainerOperationClient(new OzoneConfiguration(), null)
+              .listBlock(conID, newTargetPipeline.getFirstNode(),
+                  (ECReplicationConfig) newTargetPipeline
+                      .getReplicationConfig(), null);
+      Assert.assertEquals(blockDataArrList.get(i).length,
+          reconstructedBlockData.length);
+      checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+      ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
+          ContainerProtocolCalls.readContainer(
+              xceiverClientManager.acquireClient(newTargetPipeline), conID,
+              null);
+      Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+          readContainerResponseProto.getContainerData().getState());
+      i++;
+    }
+
+  }
+
+  private void checkBlockData(
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData,
+      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+          reconstructedBlockData) {
+
+    for (int i = 0; i < blockData.length; i++) {
+      List<ContainerProtos.ChunkInfo> oldBlockDataChunks =
+          blockData[i].getChunks();
+      List<ContainerProtos.ChunkInfo> newBlockDataChunks =
+          reconstructedBlockData[i].getChunks();
+      for (int j = 0; j < oldBlockDataChunks.size(); j++) {
+        ContainerProtos.ChunkInfo chunkInfo = oldBlockDataChunks.get(j);
+        if (chunkInfo.getLen() == 0) {
+          // let's ignore the empty chunks
+          continue;
+        }
+        Assert.assertEquals(chunkInfo, newBlockDataChunks.get(j));
+      }
+    }
+
+   /* Assert.assertEquals(
+        Arrays.stream(blockData).map(b -> b.getProtoBufMessage())
+            .collect(Collectors.toList()),
+        Arrays.stream(reconstructedBlockData).map(b -> b.getProtoBufMessage())
+            .collect(Collectors.toList()));*/
+  }
+
   public static void startCluster(OzoneConfiguration conf) throws Exception {
 
     // Set minimum pipeline to 1 to ensure all data is written to
@@ -287,15 +506,12 @@ public class TestContainerCommandsEC {
     writableECContainerProviderConfig.setMinimumPipelines(1);
     conf.setFromObject(writableECContainerProviderConfig);
 
-    cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(NUM_DN)
-        .setScmId(SCM_ID)
-        .setClusterId(CLUSTER_ID)
-        .build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(NUM_DN)
+        .setScmId(SCM_ID).setClusterId(CLUSTER_ID).build();
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
-    client = OzoneClientFactory.getRpcClient(conf);
-    store = client.getObjectStore();
+    rpcClient = OzoneClientFactory.getRpcClient(conf);
+    store = rpcClient.getObjectStore();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
   }
@@ -314,8 +530,8 @@ public class TestContainerCommandsEC {
       int keySize = RandomUtils.nextInt(ranges[i][0], ranges[i][1]);
       values[i] = RandomUtils.nextBytes(keySize);
       final String keyName = UUID.randomUUID().toString();
-      try (OutputStream out = bucket.createKey(
-          keyName, values[i].length, repConfig, new HashMap<>())) {
+      try (OutputStream out = bucket
+          .createKey(keyName, values[i].length, repConfig, new HashMap<>())) {
         out.write(values[i]);
       }
     }
@@ -330,8 +546,8 @@ public class TestContainerCommandsEC {
   }
 
   public static void stopCluster() throws IOException {
-    if (client != null) {
-      client.close();
+    if (rpcClient != null) {
+      rpcClient.close();
     }
 
     if (storageContainerLocationClient != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org