You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/04 18:12:06 UTC

hadoop git commit: HDDS-75. Support for CopyContainer. Contributed by Elek, Marton.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9e96ac666 -> b9932162e


HDDS-75. Support for CopyContainer. Contributed by Elek, Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b9932162
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b9932162
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b9932162

Branch: refs/heads/trunk
Commit: b9932162e9eb4acc9c790fc3c4938a5057fc1658
Parents: 9e96ac6
Author: Nanda kumar <na...@apache.org>
Authored: Tue Sep 4 23:41:50 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Tue Sep 4 23:41:50 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   3 +
 .../main/proto/DatanodeContainerProtocol.proto  |  12 +-
 .../common/src/main/resources/ozone-default.xml |  10 +-
 .../container/common/interfaces/Handler.java    |  15 +-
 .../statemachine/DatanodeStateMachine.java      |  25 +--
 .../ReplicateContainerCommandHandler.java       | 124 ++++++++++++-
 .../transport/server/XceiverServerGrpc.java     |  12 +-
 .../container/keyvalue/KeyValueContainer.java   |   3 +-
 .../container/keyvalue/KeyValueHandler.java     | 100 ++++++-----
 .../container/ozoneimpl/OzoneContainer.java     |   9 +-
 .../replication/ContainerDownloader.java        |  40 +++++
 .../replication/ContainerReplicationSource.java |  49 ++++++
 .../replication/ContainerStreamingOutput.java   |  45 +++++
 .../replication/GrpcReplicationClient.java      | 169 ++++++++++++++++++
 .../replication/GrpcReplicationService.java     | 130 ++++++++++++++
 .../OnDemandContainerReplicationSource.java     |  76 ++++++++
 .../replication/SimpleContainerDownloader.java  | 121 +++++++++++++
 .../container/replication/package-info.java     |  21 +++
 .../TestReplicateContainerCommandHandler.java   | 146 ++++++++++++++++
 .../commandhandler/package-info.java            |  22 +++
 .../container/TestContainerReplication.java     | 175 +++++++++++++++++++
 .../TestReplicateContainerHandler.java          |  70 --------
 .../container/ozoneimpl/TestOzoneContainer.java |   4 +-
 23 files changed, 1244 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8f53da5..0f2b108 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -265,6 +265,9 @@ public final class OzoneConfigKeys {
   public static final long
       HDDS_LOCK_SUPPRESS_WARNING_INTERVAL_MS_DEAFULT = 10000L;
 
+  public static final String OZONE_CONTAINER_COPY_WORKDIR =
+      "hdds.datanode.replication.work.dir";
+
   /**
    * There is no need to instantiate this class.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 930f314..ba0d2d4 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -430,16 +430,22 @@ message CopyContainerRequestProto {
 }
 
 message CopyContainerResponseProto {
-  required string archiveName = 1;
+  required int64 containerID = 1;
   required uint64 readOffset = 2;
   required uint64 len = 3;
   required bool eof = 4;
-  repeated bytes data = 5;
+  required bytes data = 5;
   optional int64 checksum = 6;
 }
 
 service XceiverClientProtocolService {
   // A client-to-datanode RPC to send container commands
   rpc send(stream ContainerCommandRequestProto) returns
-    (stream ContainerCommandResponseProto) {}
+    (stream ContainerCommandResponseProto) {};
+
+}
+
+service IntraDatanodeProtocolService {
+  // An intradatanode service to copy the raw containerdata betwen nodes
+  rpc download (CopyContainerRequestProto) returns (stream CopyContainerResponseProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a9fd10b..ca3da41 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1124,4 +1124,12 @@
     on. Right now, we have SSD and DISK as profile options.</description>
   </property>
 
-</configuration>
\ No newline at end of file
+  <property>
+    <name>hdds.datanode.replication.work.dir</name>
+    <tag>DATANODE</tag>
+    <description>Temporary which is used during the container replication
+      betweeen datanodes. Should have enough space to store multiple container
+      (in compressed format), but doesn't require fast io access such as SSD.
+    </description>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 15eed4f..53e1c68 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 
+import java.io.FileInputStream;
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -30,7 +33,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
-
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -67,6 +70,16 @@ public abstract class Handler {
   public abstract ContainerCommandResponseProto handle(
       ContainerCommandRequestProto msg, Container container);
 
+  /**
+   * Import container data from a raw input stream.
+   */
+  public abstract Container importContainer(
+      long containerID,
+      long maxSize,
+      FileInputStream rawContainerStream,
+      TarContainerPacker packer)
+      throws IOException;
+
   public void setScmID(String scmId) {
     this.scmID = scmId;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 0a23912..875d063 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -16,12 +16,17 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
@@ -39,15 +44,12 @@ import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * State Machine Class.
  */
@@ -87,13 +89,14 @@ public class DatanodeStateMachine implements Closeable {
         new OzoneConfiguration(conf), context);
     nextHB = new AtomicLong(Time.monotonicNow());
 
-     // When we add new handlers just adding a new handler here should do the
+    // When we add new handlers just adding a new handler here should do the
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
         .addHandler(new CloseContainerCommandHandler())
         .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
             conf))
-        .addHandler(new ReplicateContainerCommandHandler())
+        .addHandler(new ReplicateContainerCommandHandler(conf,
+            container.getContainerSet(), container.getDispatcher()))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index fe1d4e8..d1895a8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -16,14 +16,32 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import java.io.FileInputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine
     .SCMConnectionManager;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import org.slf4j.Logger;
@@ -33,22 +51,120 @@ import org.slf4j.LoggerFactory;
  * Command handler to copy containers from sources.
  */
 public class ReplicateContainerCommandHandler implements CommandHandler {
+
   static final Logger LOG =
       LoggerFactory.getLogger(ReplicateContainerCommandHandler.class);
 
+  private ContainerDispatcher containerDispatcher;
+
   private int invocationCount;
 
   private long totalTime;
-  private boolean cmdExecuted;
+
+  private ContainerDownloader downloader;
+
+  private Configuration conf;
+
+  private TarContainerPacker packer = new TarContainerPacker();
+
+  private ContainerSet containerSet;
+
+  private Lock lock = new ReentrantLock();
+
+  public ReplicateContainerCommandHandler(
+      Configuration conf,
+      ContainerSet containerSet,
+      ContainerDispatcher containerDispatcher,
+      ContainerDownloader downloader) {
+    this.conf = conf;
+    this.containerSet = containerSet;
+    this.downloader = downloader;
+    this.containerDispatcher = containerDispatcher;
+  }
+
+  public ReplicateContainerCommandHandler(
+      Configuration conf,
+      ContainerSet containerSet,
+      ContainerDispatcher containerDispatcher) {
+    this(conf, containerSet, containerDispatcher,
+        new SimpleContainerDownloader(conf));
+  }
 
   @Override
   public void handle(SCMCommand command, OzoneContainer container,
       StateContext context, SCMConnectionManager connectionManager) {
-    LOG.warn("Replicate command is not yet handled");
+
+    ReplicateContainerCommand replicateCommand =
+        (ReplicateContainerCommand) command;
     try {
-      cmdExecuted = true;
+
+      long containerID = replicateCommand.getContainerID();
+      LOG.info("Starting replication of container {} from {}", containerID,
+          replicateCommand.getSourceDatanodes());
+      CompletableFuture<Path> tempTarFile = downloader
+          .getContainerDataFromReplicas(containerID,
+              replicateCommand.getSourceDatanodes());
+
+      CompletableFuture<Void> result =
+          tempTarFile.thenAccept(path -> {
+            LOG.info("Container {} is downloaded, starting to import.",
+                containerID);
+            importContainer(containerID, path);
+          });
+
+      result.whenComplete((aVoid, throwable) -> {
+        if (throwable != null) {
+          LOG.error("Container replication was unsuccessful .", throwable);
+        } else {
+          LOG.info("Container {} is replicated successfully", containerID);
+        }
+      });
+    } finally {
+      updateCommandStatus(context, command, true, LOG);
+
+    }
+  }
+
+  protected void importContainer(long containerID, Path tarFilePath) {
+    lock.lock();
+    try {
+      ContainerData originalContainerData;
+      try (FileInputStream tempContainerTarStream = new FileInputStream(
+          tarFilePath.toFile())) {
+        byte[] containerDescriptorYaml =
+            packer.unpackContainerDescriptor(tempContainerTarStream);
+        originalContainerData = ContainerDataYaml.readContainer(
+            containerDescriptorYaml);
+      }
+
+      try (FileInputStream tempContainerTarStream = new FileInputStream(
+          tarFilePath.toFile())) {
+
+        Handler handler = containerDispatcher.getHandler(
+            originalContainerData.getContainerType());
+
+        Container container = handler.importContainer(containerID,
+            originalContainerData.getMaxSize(),
+            tempContainerTarStream,
+            packer);
+
+        containerSet.addContainer(container);
+      }
+
+    } catch (Exception e) {
+      LOG.error(
+          "Can't import the downloaded container data id=" + containerID,
+          e);
+      try {
+        Files.delete(tarFilePath);
+      } catch (Exception ex) {
+        LOG.error(
+            "Container import is failed and the downloaded file can't be "
+                + "deleted: "
+                + tarFilePath.toAbsolutePath().toString());
+      }
     } finally {
-      updateCommandStatus(context, command, cmdExecuted, LOG);
+      lock.unlock();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 4dc232d..4a90144 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+
+import org.apache.ratis.shaded.io.grpc.BindableService;
 import org.apache.ratis.shaded.io.grpc.Server;
 import org.apache.ratis.shaded.io.grpc.ServerBuilder;
 import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
@@ -54,7 +56,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
    * @param conf - Configuration
    */
   public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
-                       ContainerDispatcher dispatcher) {
+      ContainerDispatcher dispatcher, BindableService... additionalServices) {
     Preconditions.checkNotNull(conf);
 
     this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
@@ -80,6 +82,14 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
         .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
         .addService(new GrpcXceiverService(dispatcher))
         .build();
+    NettyServerBuilder nettyServerBuilder =
+        ((NettyServerBuilder) ServerBuilder.forPort(port))
+            .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+            .addService(new GrpcXceiverService(dispatcher));
+    for (BindableService service : additionalServices) {
+      nettyServerBuilder.addService(service);
+    }
+    server = nettyServerBuilder.build();
     storageContainer = dispatcher;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index f60c5e9..b893a38 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -35,7 +35,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.io.nativeio.NativeIO;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index e254833..5acecb4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -35,80 +41,72 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .GetSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .PutSmallFileRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume
     .RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
 import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
 import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.keyvalue.statemachine
-    .background.BlockDeletingService;
+import org.apache.hadoop.ozone.container.keyvalue.statemachine.background
+    .BlockDeletingService;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
+    .Result.BLOCK_NOT_COMMITTED;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CLOSED_CONTAINER_IO;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.DELETE_ON_OPEN_CONTAINER;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
+    .Result.GET_SMALL_FILE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.INVALID_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.GET_SMALL_FILE_ERROR;
+    .Result.IO_EXCEPTION;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.PUT_SMALL_FILE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.BLOCK_NOT_COMMITTED;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Stage;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
-import static org.apache.hadoop.ozone
-    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handler for KeyValue Container type.
@@ -831,4 +829,22 @@ public class KeyValueHandler extends Handler {
       throw new StorageContainerException(msg, result);
     }
   }
+
+  public Container importContainer(long containerID, long maxSize,
+      FileInputStream rawContainerStream,
+      TarContainerPacker packer)
+      throws IOException {
+
+    KeyValueContainerData containerData =
+        new KeyValueContainerData(containerID,
+            maxSize);
+
+    KeyValueContainer container = new KeyValueContainer(containerData,
+        conf);
+
+    populateContainerPathFields(container, maxSize);
+    container.importContainerData(rawContainerStream, packer);
+    return container;
+
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 812777b..b1bf381 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -35,6 +35,9 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication
+    .OnDemandContainerReplicationSource;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@ public class OzoneContainer {
         context);
     server = new XceiverServerSpi[]{
         new XceiverServerGrpc(datanodeDetails, this.config, this
-            .hddsDispatcher),
+            .hddsDispatcher, createReplicationService()),
         XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
             .config, hddsDispatcher)
     };
@@ -87,6 +90,10 @@ public class OzoneContainer {
 
   }
 
+  private GrpcReplicationService createReplicationService() {
+    return new GrpcReplicationService(
+        new OnDemandContainerReplicationSource(containerSet));
+  }
 
   /**
    * Build's container map.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
new file mode 100644
index 0000000..9511241
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.Closeable;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * Service to download container data from other datanodes.
+ * <p>
+ * The implementation of this interface should copy the raw container data in
+ * compressed form to working directory.
+ * <p>
+ * A smart implementation would use multiple sources to do parallel download.
+ */
+public interface ContainerDownloader extends Closeable {
+
+  CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
+      List<DatanodeDetails> sources);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
new file mode 100644
index 0000000..69582f7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Contract to prepare provide the container in binary form..
+ * <p>
+ * Prepare will be called when container is closed. An implementation could
+ * precache any binary representation of a container and store the pre packede
+ * images.
+ */
+public interface ContainerReplicationSource {
+
+  /**
+   * Prepare for the replication.
+   *
+   * @param containerId The name of the container the package.
+   */
+  void prepare(long containerId);
+
+  /**
+   * Copy the container data to an output stream.
+   *
+   * @param containerId Container to replicate
+   * @param destination   The destination stream to copy all the container data.
+   * @throws IOException
+   */
+  void copyData(long containerId, OutputStream destination)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
new file mode 100644
index 0000000..f7fd8a4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * JAX-RS streaming output to return the binary container data.
+ */
+public class ContainerStreamingOutput implements StreamingOutput {
+
+  private long containerId;
+
+  private ContainerReplicationSource containerReplicationSource;
+
+  public ContainerStreamingOutput(long containerId,
+      ContainerReplicationSource containerReplicationSource) {
+    this.containerId = containerId;
+    this.containerReplicationSource = containerReplicationSource;
+  }
+
+  @Override
+  public void write(OutputStream outputStream)
+      throws IOException, WebApplicationException {
+    containerReplicationSource.copyData(containerId, outputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
new file mode 100644
index 0000000..91d098f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .IntraDatanodeProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.base.Preconditions;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to read container data from Grpc.
+ */
+public class GrpcReplicationClient {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcReplicationClient.class);
+
+  private final ManagedChannel channel;
+
+  private final IntraDatanodeProtocolServiceStub client;
+
+  private final Path workingDirectory;
+
+  public GrpcReplicationClient(String host,
+      int port, Path workingDir) {
+
+    channel = NettyChannelBuilder.forAddress(host, port)
+        .usePlaintext()
+        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+        .build();
+    client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
+    this.workingDirectory = workingDir;
+
+  }
+
+  public CompletableFuture<Path> download(long containerId) {
+    CopyContainerRequestProto request =
+        CopyContainerRequestProto.newBuilder()
+            .setContainerID(containerId)
+            .setLen(-1)
+            .setReadOffset(0)
+            .build();
+
+    CompletableFuture<Path> response = new CompletableFuture<>();
+
+    Path destinationPath =
+        getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
+
+    client.download(request,
+        new StreamDownloader(containerId, response, destinationPath));
+    return response;
+  }
+
+  private Path getWorkingDirectory() {
+    return workingDirectory;
+  }
+
+  public void shutdown() {
+    channel.shutdown();
+  }
+
+  /**
+   * Grpc stream observer to ComletableFuture adapter.
+   */
+  public static class StreamDownloader
+      implements StreamObserver<CopyContainerResponseProto> {
+
+    private final CompletableFuture<Path> response;
+
+    private final long containerId;
+
+    private BufferedOutputStream stream;
+
+    private Path outputPath;
+
+    public StreamDownloader(long containerId, CompletableFuture<Path> response,
+        Path outputPath) {
+      this.response = response;
+      this.containerId = containerId;
+      this.outputPath = outputPath;
+      try {
+        outputPath = Preconditions.checkNotNull(outputPath);
+        Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
+        Files.createDirectories(parentPath);
+        stream =
+            new BufferedOutputStream(new FileOutputStream(outputPath.toFile()));
+      } catch (IOException e) {
+        throw new RuntimeException("OutputPath can't be used: " + outputPath,
+            e);
+      }
+
+    }
+
+    @Override
+    public void onNext(CopyContainerResponseProto chunk) {
+      try {
+        stream.write(chunk.getData().toByteArray());
+      } catch (IOException e) {
+        response.completeExceptionally(e);
+      }
+    }
+
+    @Override
+    public void onError(Throwable throwable) {
+      try {
+        stream.close();
+        LOG.error("Container download was unsuccessfull", throwable);
+        try {
+          Files.delete(outputPath);
+        } catch (IOException ex) {
+          LOG.error(
+              "Error happened during the download but can't delete the "
+                  + "temporary destination.", ex);
+        }
+        response.completeExceptionally(throwable);
+      } catch (IOException e) {
+        response.completeExceptionally(e);
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      try {
+        stream.close();
+        response.complete(outputPath);
+        LOG.info("Container is downloaded to {}", outputPath);
+      } catch (IOException e) {
+        response.completeExceptionally(e);
+      }
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
new file mode 100644
index 0000000..d8f696f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CopyContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CopyContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .IntraDatanodeProtocolServiceGrpc;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to make containers available for replication.
+ */
+public class GrpcReplicationService extends
+    IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(GrpcReplicationService.class);
+
+  private final ContainerReplicationSource containerReplicationSource;
+
+  public GrpcReplicationService(
+      ContainerReplicationSource containerReplicationSource) {
+    this.containerReplicationSource = containerReplicationSource;
+  }
+
+  @Override
+  public void download(CopyContainerRequestProto request,
+      StreamObserver<CopyContainerResponseProto> responseObserver) {
+    LOG.info("Streaming container data ({}) to other datanode",
+        request.getContainerID());
+    try {
+        GrpcOutputStream outputStream =
+            new GrpcOutputStream(responseObserver, request.getContainerID());
+        containerReplicationSource
+            .copyData(request.getContainerID(), outputStream);
+
+    } catch (IOException e) {
+      LOG.error("Can't stream the container data", e);
+      responseObserver.onError(e);
+    }
+  }
+
+  private static class GrpcOutputStream extends OutputStream
+      implements Closeable {
+
+    private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024;
+
+    private final StreamObserver<CopyContainerResponseProto> responseObserver;
+
+    private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+    private long containerId;
+
+    private int readOffset = 0;
+
+    private int writtenBytes;
+
+    GrpcOutputStream(
+        StreamObserver<CopyContainerResponseProto> responseObserver,
+        long containerId) {
+      this.responseObserver = responseObserver;
+      this.containerId = containerId;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      try {
+        buffer.write(b);
+        if (buffer.size() > BUFFER_SIZE_IN_BYTES) {
+          flushBuffer(false);
+        }
+      } catch (Exception ex) {
+        responseObserver.onError(ex);
+      }
+    }
+
+    private void flushBuffer(boolean eof) {
+      if (buffer.size() > 0) {
+        CopyContainerResponseProto response =
+            CopyContainerResponseProto.newBuilder()
+                .setContainerID(containerId)
+                .setData(ByteString.copyFrom(buffer.toByteArray()))
+                .setEof(eof)
+                .setReadOffset(readOffset)
+                .setLen(buffer.size())
+                .build();
+        responseObserver.onNext(response);
+        readOffset += buffer.size();
+        writtenBytes += buffer.size();
+        buffer.reset();
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      flushBuffer(true);
+      LOG.info("{} bytes written to the rpc stream from container {}",
+          writtenBytes, containerId);
+      responseObserver.onCompleted();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
new file mode 100644
index 0000000..d557b54
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A naive implementation of the replication source which creates a tar file
+ * on-demand without pre-create the compressed archives.
+ */
+public class OnDemandContainerReplicationSource
+    implements ContainerReplicationSource {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReplicationSource.class);
+
+  private ContainerSet containerSet;
+
+  private ContainerPacker packer = new TarContainerPacker();
+
+  public OnDemandContainerReplicationSource(
+      ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public void prepare(long containerId) {
+
+  }
+
+  @Override
+  public void copyData(long containerId, OutputStream destination)
+      throws IOException {
+
+    Container container = containerSet.getContainer(containerId);
+
+    Preconditions
+        .checkNotNull(container, "Container is not found " + containerId);
+
+    switch (container.getContainerType()) {
+    case KeyValueContainer:
+      packer.pack(container,
+          destination);
+      break;
+    default:
+      LOG.warn("Container type " + container.getContainerType()
+          + " is not replicable as no compression algorithm for that.");
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
new file mode 100644
index 0000000..a461a98
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple ContainerDownloaderImplementation to download the missing container
+ * from the first available datanode.
+ * <p>
+ * This is not the most effective implementation as it uses only one source
+ * for he container download.
+ */
+public class SimpleContainerDownloader implements ContainerDownloader {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SimpleContainerDownloader.class);
+
+  private final Path workingDirectory;
+
+  private ExecutorService executor;
+
+  public SimpleContainerDownloader(Configuration conf) {
+
+    String workDirString =
+        conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR);
+
+    if (workDirString == null) {
+      workingDirectory = Paths.get(System.getProperty("java.io.tmpdir"))
+          .resolve("container-copy");
+    } else {
+      workingDirectory = Paths.get(workDirString);
+    }
+
+    ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Container downloader thread - %d").build();
+    executor = Executors.newSingleThreadExecutor(build);
+    LOG.info("Starting container downloader service to copy "
+        + "containers to replicate.");
+  }
+
+  @Override
+  public CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
+      List<DatanodeDetails> sourceDatanodes) {
+
+    CompletableFuture<Path> result = null;
+    for (DatanodeDetails datanode : sourceDatanodes) {
+      try {
+
+        if (result == null) {
+          GrpcReplicationClient grpcReplicationClient =
+              new GrpcReplicationClient(datanode.getIpAddress(),
+                  datanode.getPort(Name.STANDALONE).getValue(),
+                  workingDirectory);
+          result = grpcReplicationClient.download(containerId);
+        } else {
+          result = result.thenApply(CompletableFuture::completedFuture)
+              .exceptionally(t -> {
+                LOG.error("Error on replicating container: " + containerId, t);
+                GrpcReplicationClient grpcReplicationClient =
+                    new GrpcReplicationClient(datanode.getIpAddress(),
+                        datanode.getPort(Name.STANDALONE).getValue(),
+                        workingDirectory);
+                return grpcReplicationClient.download(containerId);
+              }).thenCompose(Function.identity());
+        }
+      } catch (Exception ex) {
+        LOG.error(String.format(
+            "Container %s download from datanode %s was unsuccessful. "
+                + "Trying the next datanode", containerId, datanode), ex);
+      }
+
+    }
+    return result;
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Can't stop container downloader gracefully", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..38a853c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+/**
+ Classes to replicate container data between datanodes.
+**/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
new file mode 100644
index 0000000..6a14d33
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerCommandHandler.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.replication.ContainerDownloader;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.TestGenericTestUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test replication command handler.
+ */
+public class TestReplicateContainerCommandHandler {
+
+  private static final String EXCEPTION_MESSAGE = "Oh my god";
+  private ReplicateContainerCommandHandler handler;
+  private StubDownloader downloader;
+  private ReplicateContainerCommand command;
+  private List<Long> importedContainerIds;
+
+  @Before
+  public void init() {
+    importedContainerIds = new ArrayList<>();
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    ContainerSet containerSet = Mockito.mock(ContainerSet.class);
+    ContainerDispatcher containerDispatcher =
+        Mockito.mock(ContainerDispatcher.class);
+
+    downloader = new StubDownloader();
+
+    handler = new ReplicateContainerCommandHandler(conf, containerSet,
+        containerDispatcher, downloader) {
+      @Override
+      protected void importContainer(long containerID, Path tarFilePath) {
+        importedContainerIds.add(containerID);
+      }
+    };
+
+    //the command
+    ArrayList<DatanodeDetails> datanodeDetails = new ArrayList<>();
+    datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+    datanodeDetails.add(Mockito.mock(DatanodeDetails.class));
+
+    command = new ReplicateContainerCommand(1L, datanodeDetails);
+  }
+
+  @Test
+  public void handle() throws TimeoutException, InterruptedException {
+    //GIVEN
+
+    //WHEN
+    handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+    TestGenericTestUtils
+        .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+    Assert.assertNotNull(downloader.futureByContainers.get(1L));
+    downloader.futureByContainers.get(1L).complete(Paths.get("/tmp/test"));
+
+    TestGenericTestUtils
+        .waitFor(() -> importedContainerIds.size() == 1, 100, 2000);
+  }
+
+  @Test
+  public void handleWithErrors() throws TimeoutException, InterruptedException {
+    //GIVEN
+    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+        .captureLogs(ReplicateContainerCommandHandler.LOG);
+
+    //WHEN
+    handler.handle(command, null, Mockito.mock(StateContext.class), null);
+
+    //THEN
+
+    TestGenericTestUtils
+        .waitFor(() -> downloader.futureByContainers.size() == 1, 100, 2000);
+
+    Assert.assertNotNull(downloader.futureByContainers.get(1L));
+    downloader.futureByContainers.get(1L)
+        .completeExceptionally(new IllegalArgumentException(
+            EXCEPTION_MESSAGE));
+
+    TestGenericTestUtils
+        .waitFor(() -> {
+          String output = logCapturer.getOutput();
+          return output.contains("unsuccessful") && output
+            .contains(EXCEPTION_MESSAGE); },
+            100,
+            2000);
+  }
+
+  private static class StubDownloader implements ContainerDownloader {
+
+    private Map<Long, CompletableFuture<Path>> futureByContainers =
+        new HashMap<>();
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public CompletableFuture<Path> getContainerDataFromReplicas(
+        long containerId, List<DatanodeDetails> sources) {
+      CompletableFuture<Path> future = new CompletableFuture<>();
+      futureByContainers.put(containerId, future);
+      return future;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
new file mode 100644
index 0000000..05ac76d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * Tests for command handlers.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
new file mode 100644
index 0000000..7391b25
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .DatanodeBlockID;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
+    .writeChunkForContainer;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Tests ozone containers replication.
+ */
+public class TestContainerReplication {
+  /**
+   * Set the timeout for every test.
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(300000);
+
+  @Test
+  public void testContainerReplication() throws Exception {
+    //GIVEN
+    OzoneConfiguration conf = newOzoneConfiguration();
+
+    long containerId = 1L;
+
+    conf.setSocketAddr("hdls.datanode.http-address",
+        new InetSocketAddress("0.0.0.0", 0));
+
+    MiniOzoneCluster cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+            .setRandomContainerPort(true).build();
+    cluster.waitForClusterToBeReady();
+
+    HddsDatanodeService firstDatanode = cluster.getHddsDatanodes().get(0);
+
+    //copy from the first datanode
+    List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
+    sourceDatanodes.add(firstDatanode.getDatanodeDetails());
+
+    Pipeline sourcePipelines =
+        ContainerTestHelper.createPipeline(sourceDatanodes);
+
+    //create a new client
+    XceiverClientSpi client = new XceiverClientGrpc(sourcePipelines, conf);
+    client.connect();
+
+    //New container for testing
+    TestOzoneContainer.createContainerForTesting(client, containerId);
+
+    ContainerCommandRequestProto requestProto =
+        writeChunkForContainer(client, containerId, 1024);
+
+    DatanodeBlockID blockID = requestProto.getWriteChunk().getBlockID();
+
+    // Put Key to the test container
+    ContainerCommandRequestProto putKeyRequest = ContainerTestHelper
+        .getPutKeyRequest(sourcePipelines, requestProto.getWriteChunk());
+
+    ContainerProtos.KeyData keyData = putKeyRequest.getPutKey().getKeyData();
+
+    ContainerCommandResponseProto response = client.sendCommand(putKeyRequest);
+
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    Assert.assertTrue(putKeyRequest.getTraceID().equals(response.getTraceID()));
+
+    HddsDatanodeService destinationDatanode =
+        chooseDatanodeWithoutContainer(sourcePipelines,
+            cluster.getHddsDatanodes());
+
+    //WHEN: send the order to replicate the container
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
+            new ReplicateContainerCommand(containerId,
+                sourcePipelines.getMachines()));
+
+    Thread.sleep(3000);
+
+    OzoneContainer ozoneContainer =
+        destinationDatanode.getDatanodeStateMachine().getContainer();
+
+
+
+    Container container =
+        ozoneContainer
+            .getContainerSet().getContainer(containerId);
+
+    Assert.assertNotNull(
+        "Container is not replicated to the destination datanode",
+        container);
+
+    Assert.assertNotNull(
+        "ContainerData of the replicated container is null",
+        container.getContainerData());
+
+    long keyCount = ((KeyValueContainerData) container.getContainerData())
+        .getKeyCount();
+
+    KeyValueHandler handler = (KeyValueHandler) ozoneContainer.getDispatcher()
+        .getHandler(ContainerType.KeyValueContainer);
+
+    KeyData key = handler.getKeyManager()
+        .getKey(container, BlockID.getFromProtobuf(blockID));
+
+    Assert.assertNotNull(key);
+    Assert.assertEquals(1, key.getChunks().size());
+    Assert.assertEquals(requestProto.getWriteChunk().getChunkData(),
+        key.getChunks().get(0));
+
+  }
+
+  private HddsDatanodeService chooseDatanodeWithoutContainer(Pipeline pipeline,
+      List<HddsDatanodeService> dataNodes) {
+    for (HddsDatanodeService datanode : dataNodes) {
+      if (!pipeline.getMachines().contains(datanode.getDatanodeDetails())) {
+        return datanode;
+      }
+    }
+    throw new AssertionError("No datanode outside of the pipeline");
+  }
+
+  static OzoneConfiguration newOzoneConfiguration() {
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    return conf;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
deleted file mode 100644
index 9e08212..0000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReplicateContainerHandler.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CONTAINER_SIZE;
-import org.junit.Test;
-
-/**
- * Tests the behavior of the datanode, when replicate container command is
- * received.
- */
-public class TestReplicateContainerHandler {
-
-  @Test
-  public void test() throws IOException, TimeoutException, InterruptedException,
-      OzoneException {
-
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(ReplicateContainerCommandHandler.LOG);
-
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
-    MiniOzoneCluster cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
-    cluster.waitForClusterToBeReady();
-
-    DatanodeDetails datanodeDetails =
-        cluster.getHddsDatanodes().get(0).getDatanodeDetails();
-    //send the order to replicate the container
-    cluster.getStorageContainerManager().getScmNodeManager()
-        .addDatanodeCommand(datanodeDetails.getUuid(),
-            new ReplicateContainerCommand(1L,
-                new ArrayList<>()));
-
-    //TODO: here we test only the serialization/unserialization as
-    // the implementation is not yet done
-    GenericTestUtils
-        .waitFor(() -> logCapturer.getOutput().contains("not yet handled"), 500,
-            5 * 1000);
-
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b9932162/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index f112d26..5dd88fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -513,7 +513,7 @@ public class TestOzoneContainer {
     return new XceiverClientGrpc(pipeline, conf);
   }
 
-  private static void createContainerForTesting(XceiverClientSpi client,
+  public static void createContainerForTesting(XceiverClientSpi client,
       long containerID) throws Exception {
     // Create container
     ContainerProtos.ContainerCommandRequestProto request =
@@ -525,7 +525,7 @@ public class TestOzoneContainer {
     Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
   }
 
-  private static ContainerProtos.ContainerCommandRequestProto
+  public static ContainerProtos.ContainerCommandRequestProto
       writeChunkForContainer(XceiverClientSpi client,
       long containerID, int dataLen) throws Exception {
     // Write Chunk


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org