You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:46:09 UTC

[ozone] 20/27: continue the refactor

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit ec7d9220a319a5241d96c6df9a12b06c19e252a0
Author: Elek Márton <el...@apache.org>
AuthorDate: Fri Feb 19 10:40:00 2021 +0100

    continue the refactor
---
 hadoop-hdds/container-service/pom.xml              |  15 +
 .../common/interfaces/ContainerPacker.java         |  59 ----
 .../container/keyvalue/KeyValueContainer.java      |  39 ---
 .../container/keyvalue/TarContainerPacker.java     | 217 ------------
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   2 +-
 .../replication/ContainerReplicationSource.java    |  49 ---
 .../replication/ContainerStreamingDestination.java |  26 ++
 .../replication/ContainerStreamingSource.java      |  48 +++
 .../container/replication/GrpcOutputStream.java    | 129 -------
 .../replication/GrpcReplicationClient.java         | 165 ---------
 .../replication/GrpcReplicationService.java        |  62 ----
 .../replication/NullContainerDownloader.java       |  16 -
 .../OnDemandContainerReplicationSource.java        |  63 ----
 .../container/replication/ReplicationServer.java   |  79 +----
 .../replication/SimpleContainerDownloader.java     | 104 +++---
 .../container/stream/DirstreamClientHandler.java   |  95 ++++++
 .../container/stream/DirstreamServerHandler.java   |  68 ++++
 .../ozone/container/stream/StreamingClient.java    |  62 ++++
 .../container/stream/StreamingDestination.java     |   9 +
 .../ozone/container/stream/StreamingServer.java    |  74 ++++
 .../ozone/container/stream/StreamingSource.java    |  10 +
 .../container/keyvalue/TestKeyValueContainer.java  |  36 --
 .../container/keyvalue/TestTarContainerPacker.java | 373 ---------------------
 .../replication/TestGrpcOutputStream.java          | 213 ------------
 .../replication/TestReplicationService.java        | 139 +++++---
 .../apache/hadoop/ozone/debug/ExportContainer.java | 185 ----------
 26 files changed, 572 insertions(+), 1765 deletions(-)

diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index aaa5302..acb779c 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -43,6 +43,21 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>commons-compress</artifactId>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdds-common</artifactId>
       <type>test-jar</type>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
deleted file mode 100644
index a76e6f0..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.interfaces;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
-/**
- * Service to pack/unpack ContainerData container data to/from a single byte
- * stream.
- */
-public interface ContainerPacker {
-
-  /**
-   * Extract the container data to the path defined by the container.
-   * <p>
-   * This doesn't contain the extraction of the container descriptor file.
-   *
-   * @return the byte content of the descriptor (which won't be written to a
-   * file but returned).
-   */
-  byte[] unpackContainerData(
-      KeyValueContainerData container,
-      InputStream inputStream)
-      throws IOException;
-
-  /**
-   * Compress all the container data (chunk data, metadata db AND container
-   * descriptor) to one single archive.
-   */
-  void pack(KeyValueContainerData containerData, OutputStream destination)
-      throws IOException;
-
-  /**
-   * Read the descriptor from the finished archive to get the data before
-   * importing the container.
-   */
-  byte[] unpackContainerDescriptor(InputStream inputStream)
-      throws IOException;
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 763f5ac..9785825 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.time.Instant;
@@ -42,7 +41,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -618,43 +616,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     return checker.fullCheck(throttler, canceler);
   }
 
-  public void exportContainerData(
-      OutputStream destination,
-      ContainerPacker packer
-  ) throws IOException {
-    writeLock();
-    try {
-      // Closed/ Quasi closed containers are considered for replication by
-      // replication manager if they are under-replicated.
-      ContainerProtos.ContainerDataProto.State state =
-          getContainerData().getState();
-      if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
-          state == ContainerDataProto.State.QUASI_CLOSED)) {
-        throw new IllegalStateException(
-            "Only (quasi)closed containers can be exported, but " +
-                "ContainerId=" + getContainerData().getContainerID() +
-                " is in state " + state);
-      }
-
-      try {
-        compactDB();
-        // Close DB (and remove from cache) to avoid concurrent modification
-        // while packing it.
-        BlockUtils.removeDB(containerData, config);
-      } finally {
-        readLock();
-        writeUnlock();
-      }
-
-      packer.pack(containerData, destination);
-    } finally {
-      if (lock.isWriteLockedByCurrentThread()) {
-        writeUnlock();
-      } else {
-        readUnlock();
-      }
-    }
-  }
 
   private enum ContainerCheckLevel {
     NO_CHECK, FAST_CHECK, FULL_CHECK
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
deleted file mode 100644
index c745fdb9..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-
-import static java.util.stream.Collectors.toList;
-import org.apache.commons.compress.archivers.ArchiveEntry;
-import org.apache.commons.compress.archivers.ArchiveInputStream;
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.compress.compressors.CompressorStreamFactory;
-import org.apache.commons.io.IOUtils;
-
-/**
- * Compress/uncompress KeyValueContainer data to a tar.gz archive.
- */
-public class TarContainerPacker
-    implements ContainerPacker {
-
-  static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
-
-  static final String DB_DIR_NAME = "db";
-
-  private static final String CONTAINER_FILE_NAME = "container.yaml";
-
-  /**
-   * Given an input stream (tar file) extract the data to the specified
-   * directories.
-   *
-   * @param containerData container which defines the destination structure.
-   * @param input the input stream.
-   */
-  @Override
-  public byte[] unpackContainerData(KeyValueContainerData containerData,
-      InputStream input)
-      throws IOException {
-    byte[] descriptorFileContent = null;
-    Path dbRoot = containerData.getDbFile().toPath();
-    Path chunksRoot = Paths.get(containerData.getChunksPath());
-
-    try (ArchiveInputStream archiveInput = untar(input)) {
-
-      ArchiveEntry entry = archiveInput.getNextEntry();
-      while (entry != null) {
-        String name = entry.getName();
-        long size = entry.getSize();
-        if (name.startsWith(DB_DIR_NAME + "/")) {
-          Path destinationPath = dbRoot
-              .resolve(name.substring(DB_DIR_NAME.length() + 1));
-          extractEntry(archiveInput, size, dbRoot, destinationPath);
-        } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
-          Path destinationPath = chunksRoot
-              .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
-          extractEntry(archiveInput, size, chunksRoot, destinationPath);
-        } else if (CONTAINER_FILE_NAME.equals(name)) {
-          //Don't do anything. Container file should be unpacked in a
-          //separated step by unpackContainerDescriptor call.
-          descriptorFileContent = readEntry(archiveInput, size);
-        } else {
-          throw new IllegalArgumentException(
-              "Unknown entry in the tar file: " + "" + name);
-        }
-        entry = archiveInput.getNextEntry();
-      }
-      return descriptorFileContent;
-    }
-  }
-
-  private void extractEntry(InputStream input, long size,
-                            Path ancestor, Path path) throws IOException {
-    HddsUtils.validatePath(path, ancestor);
-    Path parent = path.getParent();
-    if (parent != null) {
-      Files.createDirectories(parent);
-    }
-
-    try (OutputStream fileOutput = new FileOutputStream(path.toFile());
-         OutputStream output = new BufferedOutputStream(fileOutput)) {
-      int bufferSize = 1024;
-      byte[] buffer = new byte[bufferSize + 1];
-      long remaining = size;
-      while (remaining > 0) {
-        int len = (int) Math.min(remaining, bufferSize);
-        int read = input.read(buffer, 0, len);
-        if (read >= 0) {
-          remaining -= read;
-          output.write(buffer, 0, read);
-        } else {
-          remaining = 0;
-        }
-      }
-    }
-  }
-
-  /**
-   * Given a containerData include all the required container data/metadata
-   * in a tar file.
-   *
-   * @param containerData Container to archive
-   * @param output   Destination tar file/stream.
-   */
-  @Override
-  public void pack(KeyValueContainerData containerData,
-      OutputStream output)
-      throws IOException {
-
-    try (ArchiveOutputStream archiveOutput = tar(output)) {
-
-      includePath(containerData.getDbFile().toPath(), DB_DIR_NAME,
-          archiveOutput);
-
-      includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
-          archiveOutput);
-
-      includeFile(containerData.getContainerFile(), CONTAINER_FILE_NAME,
-          archiveOutput);
-    }
-
-  }
-
-  @Override
-  public byte[] unpackContainerDescriptor(InputStream input)
-      throws IOException {
-    try (ArchiveInputStream archiveInput = untar(input)) {
-
-      ArchiveEntry entry = archiveInput.getNextEntry();
-      while (entry != null) {
-        String name = entry.getName();
-        if (CONTAINER_FILE_NAME.equals(name)) {
-          return readEntry(archiveInput, entry.getSize());
-        }
-        entry = archiveInput.getNextEntry();
-      }
-    }
-    throw new IOException(
-        "Container descriptor is missing from the container archive.");
-  }
-
-  private byte[] readEntry(InputStream input, final long size)
-      throws IOException {
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    int bufferSize = 1024;
-    byte[] buffer = new byte[bufferSize + 1];
-    long remaining = size;
-    while (remaining > 0) {
-      int len = (int) Math.min(remaining, bufferSize);
-      int read = input.read(buffer, 0, len);
-      remaining -= read;
-      output.write(buffer, 0, read);
-    }
-    return output.toByteArray();
-  }
-
-  private void includePath(Path dir, String subdir,
-      ArchiveOutputStream archiveOutput) throws IOException {
-
-    try (Stream<Path> dirEntries = Files.list(dir)) {
-      for (Path path : dirEntries.collect(toList())) {
-        String entryName = subdir + "/" + path.getFileName();
-        includeFile(path.toFile(), entryName, archiveOutput);
-      }
-    }
-  }
-
-  static void includeFile(File file, String entryName,
-      ArchiveOutputStream archiveOutput) throws IOException {
-    ArchiveEntry entry = archiveOutput.createArchiveEntry(file, entryName);
-    archiveOutput.putArchiveEntry(entry);
-    try (InputStream input = new FileInputStream(file)) {
-      IOUtils.copy(input, archiveOutput);
-    }
-    archiveOutput.closeArchiveEntry();
-  }
-
-  private static ArchiveInputStream untar(InputStream input) {
-    return new TarArchiveInputStream(input);
-  }
-
-  private static ArchiveOutputStream tar(OutputStream output) {
-    return new TarArchiveOutputStream(output);
-  }
-
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 4ea2d6d..1d83524 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -148,7 +148,7 @@ public class OzoneContainer {
         context);
 
     replicationServer = new ReplicationServer(
-        controller,
+        containerSet,
         conf.getObject(ReplicationConfig.class),
         secConf,
         certClient);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
deleted file mode 100644
index 69582f7..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Contract to prepare provide the container in binary form..
- * <p>
- * Prepare will be called when container is closed. An implementation could
- * precache any binary representation of a container and store the pre packede
- * images.
- */
-public interface ContainerReplicationSource {
-
-  /**
-   * Prepare for the replication.
-   *
-   * @param containerId The name of the container the package.
-   */
-  void prepare(long containerId);
-
-  /**
-   * Copy the container data to an output stream.
-   *
-   * @param containerId Container to replicate
-   * @param destination   The destination stream to copy all the container data.
-   * @throws IOException
-   */
-  void copyData(long containerId, OutputStream destination)
-      throws IOException;
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
new file mode 100644
index 0000000..b0e60ac
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingDestination.java
@@ -0,0 +1,26 @@
+package org.apache.hadoop.ozone.container.replication;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.stream.StreamingDestination;
+
+public class ContainerStreamingDestination implements StreamingDestination {
+
+  private final KeyValueContainerData containerData;
+
+  public ContainerStreamingDestination(KeyValueContainerData containerData) {
+    this.containerData = containerData;
+  }
+
+  @Override
+  public Path mapToDestination(String name) {
+    String[] parts = name.split("/", 2);
+    if (parts[0].equals("DB")) {
+      return Paths.get(containerData.getContainerDBFile().getAbsolutePath()
+          , parts[1]);
+    }
+    throw new IllegalArgumentException("Unknown container part:" + parts[0]);
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
new file mode 100644
index 0000000..46fb767
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingSource.java
@@ -0,0 +1,48 @@
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.stream.StreamingSource;
+
+public class ContainerStreamingSource implements StreamingSource {
+
+  private ContainerSet containerSet;
+
+  public ContainerStreamingSource(ContainerSet containerSet) {
+    this.containerSet = containerSet;
+  }
+
+  @Override
+  public Map<String, Path> getFilesToStream(String id) {
+
+    Map<String, Path> filesToStream = new HashMap<>();
+
+    final Long containerId = Long.valueOf(id);
+    final KeyValueContainer container =
+        (KeyValueContainer) containerSet.getContainer(containerId);
+    if (container == null) {
+      throw new IllegalArgumentException("No such container " + containerId);
+    }
+    final File dbPath =
+        container.getContainerData().getContainerDBFile();
+    try {
+      final List<Path> dbFiles =
+          Files.list(dbPath.toPath()).collect(Collectors.toList());
+      for (Path dbFile : dbFiles) {
+        filesToStream.put("DB/" + dbFile.getFileName().toString(), dbFile);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return filesToStream;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
deleted file mode 100644
index c09c8f6..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Adapter from {@code OutputStream} to gRPC {@code StreamObserver}.
- * Data is buffered in a limited buffer of the specified size.
- */
-class GrpcOutputStream extends OutputStream {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(GrpcOutputStream.class);
-
-  private final StreamObserver<CopyContainerResponseProto> responseObserver;
-
-  private final ByteString.Output buffer;
-
-  private final long containerId;
-
-  private final int bufferSize;
-
-  private long writtenBytes;
-
-  GrpcOutputStream(
-      StreamObserver<CopyContainerResponseProto> responseObserver,
-      long containerId, int bufferSize) {
-    this.responseObserver = responseObserver;
-    this.containerId = containerId;
-    this.bufferSize = bufferSize;
-    buffer = ByteString.newOutput(bufferSize);
-  }
-
-  @Override
-  public void write(int b) {
-    try {
-      buffer.write(b);
-      if (buffer.size() >= bufferSize) {
-        flushBuffer(false);
-      }
-    } catch (Exception ex) {
-      responseObserver.onError(ex);
-    }
-  }
-
-  @Override
-  public void write(@Nonnull byte[] data, int offset, int length) {
-    if ((offset < 0) || (offset > data.length) || (length < 0) ||
-        ((offset + length) > data.length) || ((offset + length) < 0)) {
-      throw new IndexOutOfBoundsException();
-    } else if (length == 0) {
-      return;
-    }
-
-    try {
-      if (buffer.size() >= bufferSize) {
-        flushBuffer(false);
-      }
-
-      int remaining = length;
-      int off = offset;
-      int len = Math.min(remaining, bufferSize - buffer.size());
-      while (remaining > 0) {
-        buffer.write(data, off, len);
-        if (buffer.size() >= bufferSize) {
-          flushBuffer(false);
-        }
-        off += len;
-        remaining -= len;
-        len = Math.min(bufferSize, remaining);
-      }
-    } catch (Exception ex) {
-      responseObserver.onError(ex);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    flushBuffer(true);
-    LOG.info("Sent {} bytes for container {}",
-        writtenBytes, containerId);
-    responseObserver.onCompleted();
-    buffer.close();
-  }
-
-  private void flushBuffer(boolean eof) {
-    int length = buffer.size();
-    if (length > 0) {
-      ByteString data = buffer.toByteString();
-      LOG.debug("Sending {} bytes (of type {}) for container {}",
-          length, data.getClass().getSimpleName(), containerId);
-      CopyContainerResponseProto response =
-          CopyContainerResponseProto.newBuilder()
-              .setContainerID(containerId)
-              .setData(data)
-              .setEof(eof)
-              .setReadOffset(writtenBytes)
-              .setLen(length)
-              .build();
-      responseObserver.onNext(response);
-      writtenBytes += length;
-      buffer.reset();
-    }
-  }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
deleted file mode 100644
index 89e23fe..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.security.cert.X509Certificate;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
-import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Client to read container data from gRPC.
- */
-public class GrpcReplicationClient implements AutoCloseable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(GrpcReplicationClient.class);
-
-  private final ManagedChannel channel;
-
-  private final IntraDatanodeProtocolServiceStub client;
-
-  private final Path workingDirectory;
-
-  public GrpcReplicationClient(
-      String host, int port, Path workingDir,
-      SecurityConfig secConfig, X509Certificate caCert
-  ) throws IOException {
-    NettyChannelBuilder channelBuilder =
-        NettyChannelBuilder.forAddress(host, port)
-            .usePlaintext()
-            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
-
-    if (secConfig.isSecurityEnabled()) {
-      channelBuilder.useTransportSecurity();
-
-      SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
-      if (caCert != null) {
-        sslContextBuilder.trustManager(caCert);
-      }
-
-      sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
-      sslContextBuilder.keyManager(
-          new File(secConfig.getCertificateFileName()),
-          new File(secConfig.getPrivateKeyFileName()));
-      if (secConfig.useTestCert()) {
-        channelBuilder.overrideAuthority("localhost");
-      }
-      channelBuilder.sslContext(sslContextBuilder.build());
-    }
-    channel = channelBuilder.build();
-    client = IntraDatanodeProtocolServiceGrpc.newStub(channel);
-    workingDirectory = workingDir;
-  }
-
-  public void download(
-      KeyValueContainerData containerData,
-      OutputStream outputStream
-  ) {
-    CopyContainerRequestProto request =
-        CopyContainerRequestProto.newBuilder()
-            .setContainerID(containerData.getContainerID())
-            .setLen(-1)
-            .setReadOffset(0)
-            .build();
-
-    client.download(request, new StreamDownloader(outputStream));
-  }
-
-  private Path getWorkingDirectory() {
-    return workingDirectory;
-  }
-
-  public void shutdown() {
-    channel.shutdown();
-    try {
-      channel.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.error("failed to shutdown replication channel", e);
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    shutdown();
-  }
-
-  /**
-   * gRPC stream observer to CompletableFuture adapter.
-   */
-  public static class StreamDownloader
-      implements StreamObserver<CopyContainerResponseProto> {
-
-    private final OutputStream outputStream;
-
-    public StreamDownloader(
-        OutputStream output
-    ) {
-      this.outputStream = output;
-    }
-
-    @Override
-    public void onNext(CopyContainerResponseProto chunk) {
-      try {
-        chunk.getData().writeTo(outputStream);
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void onError(Throwable throwable) {
-      try {
-        outputStream.close();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      try {
-        outputStream.close();
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-
-    }
-
-  }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
deleted file mode 100644
index 00bf844..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
-
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Service to make containers available for replication.
- */
-public class GrpcReplicationService extends
-    IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(GrpcReplicationService.class);
-
-  private static final int BUFFER_SIZE = 1024 * 1024;
-
-  private final ContainerReplicationSource source;
-
-  public GrpcReplicationService(ContainerReplicationSource source) {
-    this.source = source;
-  }
-
-  @Override
-  public void download(CopyContainerRequestProto request,
-      StreamObserver<CopyContainerResponseProto> responseObserver) {
-    long containerID = request.getContainerID();
-    LOG.info("Streaming container data ({}) to other datanode", containerID);
-    try (GrpcOutputStream outputStream =
-        new GrpcOutputStream(responseObserver, containerID, BUFFER_SIZE)) {
-      source.copyData(containerID, outputStream);
-    } catch (IOException e) {
-      LOG.error("Error streaming container {}", containerID, e);
-      responseObserver.onError(e);
-    }
-  }
-
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
index 106fccb..1821663 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/NullContainerDownloader.java
@@ -19,14 +19,10 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.file.Path;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -50,19 +46,7 @@ public class NullContainerDownloader extends SimpleContainerDownloader {
       KeyValueContainerData preCreated,
       DatanodeDetails datanode
   ) throws IOException {
-    CompletableFuture<Path> result;
-    GrpcReplicationClient grpcReplicationClient =
-        new GrpcReplicationClient(datanode.getIpAddress(),
-            datanode.getPort(Name.REPLICATION).getValue(),
-            workingDirectory, securityConfig, caCert);
 
-    OutputStream nullOutputStream = new OutputStream() {
-      @Override
-      public void write(int i) throws IOException {
-        //
-      }
-    };
-    grpcReplicationClient.download(preCreated, nullOutputStream);
     return preCreated;
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
deleted file mode 100644
index 474333e..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A naive implementation of the replication source which creates a tar file
- * on-demand without pre-create the compressed archives.
- */
-public class OnDemandContainerReplicationSource
-    implements ContainerReplicationSource {
-
-  private final ContainerController controller;
-
-  private final TarContainerPacker packer = new TarContainerPacker();
-
-  public OnDemandContainerReplicationSource(
-      ContainerController controller) {
-    this.controller = controller;
-  }
-
-  @Override
-  public void prepare(long containerId) {
-    // no pre-create in this implementation
-  }
-
-  @Override
-  public void copyData(long containerId, OutputStream destination)
-      throws IOException {
-
-    KeyValueContainer container =
-        (KeyValueContainer) controller.getContainer(containerId);
-
-    Preconditions.checkNotNull(
-        container, "Container is not found " + containerId);
-
-    container.exportContainerData(destination, packer);
-
-  }
-}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 1288b22..e5b0b33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -17,26 +17,16 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import javax.net.ssl.SSLException;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-
-import org.apache.ratis.thirdparty.io.grpc.Server;
-import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
-import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
-import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.stream.StreamingServer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,84 +38,48 @@ public class ReplicationServer {
   private static final Logger LOG =
       LoggerFactory.getLogger(ReplicationServer.class);
 
-  private Server server;
-
   private SecurityConfig secConf;
 
   private CertificateClient caClient;
 
-  private ContainerController controller;
+  private ContainerSet containerSet;
 
   private int port;
 
+  private StreamingServer server;
+
   public ReplicationServer(
-      ContainerController controller,
+      ContainerSet containerSet,
       ReplicationConfig replicationConfig,
       SecurityConfig secConf,
       CertificateClient caClient
   ) {
     this.secConf = secConf;
     this.caClient = caClient;
-    this.controller = controller;
+    this.containerSet = containerSet;
     this.port = replicationConfig.getPort();
     init();
   }
 
   public void init() {
-    NettyServerBuilder nettyServerBuilder =
-        ((NettyServerBuilder) ServerBuilder.forPort(port))
-            .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
-
-    GrpcServerInterceptor tracingInterceptor = new GrpcServerInterceptor();
-    nettyServerBuilder
-        .addService(ServerInterceptors.intercept(new GrpcReplicationService(
-            new OnDemandContainerReplicationSource(controller)
-        ), tracingInterceptor));
-
-    if (secConf.isSecurityEnabled()) {
-      try {
-        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(
-            caClient.getPrivateKey(), caClient.getCertificate());
-
-        sslContextBuilder = GrpcSslContexts.configure(
-            sslContextBuilder, secConf.getGrpcSslProvider());
-
-        sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
-        sslContextBuilder.trustManager(caClient.getCACertificate());
-
-        nettyServerBuilder.sslContext(sslContextBuilder.build());
-      } catch (SSLException ex) {
-        throw new IllegalArgumentException(
-            "Unable to setup TLS for secure datanode replication GRPC "
-                + "endpoint.", ex);
-      }
-    }
-
-    server = nettyServerBuilder.build();
+    server = new StreamingServer(new ContainerStreamingSource(containerSet),
+        this.port);
   }
 
   public void start() throws IOException {
-    server.start();
-
-    if (port == 0) {
-      LOG.info("{} is started using port {}", getClass().getSimpleName(),
-          server.getPort());
+    try {
+      server.start();
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Couldn't start replication server", e);
     }
-
-    port = server.getPort();
-
   }
 
   public void stop() {
-    try {
-      server.shutdown().awaitTermination(10L, TimeUnit.SECONDS);
-    } catch (InterruptedException ex) {
-      LOG.warn("{} couldn't be stopped gracefully", getClass().getSimpleName());
-    }
+    server.stop();
   }
 
   public int getPort() {
-    return port;
+    return server.getPort();
   }
 
   @ConfigGroup(prefix = "hdds.datanode.replication")
@@ -145,5 +99,4 @@ public class ReplicationServer {
       return this;
     }
   }
-
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 666a412..ac57896 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -19,26 +19,23 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.stream.StreamingClient;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.netty.channel.Channel;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +55,6 @@ public class SimpleContainerDownloader implements ContainerDownloader {
   protected final Path workingDirectory;
   protected  final SecurityConfig securityConfig;
   protected  final X509Certificate caCert;
-  protected  TarContainerPacker packer = new TarContainerPacker();
 
   public SimpleContainerDownloader(
       ConfigurationSource conf,
@@ -124,50 +120,58 @@ public class SimpleContainerDownloader implements ContainerDownloader {
       KeyValueContainerData preCreated,
       DatanodeDetails datanode
   ) throws IOException {
-    CompletableFuture<Path> result;
-    GrpcReplicationClient grpcReplicationClient =
-        new GrpcReplicationClient(datanode.getIpAddress(),
-            datanode.getPort(Name.REPLICATION).getValue(),
-            workingDirectory, securityConfig, caCert);
-
-    PipedOutputStream outputStream = new PipedOutputStream();
-
-    grpcReplicationClient.download(preCreated, outputStream);
-    final byte[] descriptor = packer
-        .unpackContainerData(preCreated, new PipedInputStream(outputStream));
-
-    //parse descriptor
-    //now, we have extracted the container descriptor from the previous
-    //datanode. We can load it and upload it with the current data
-    // (original metadata + current filepath fields)
-    KeyValueContainerData replicated =
-        (KeyValueContainerData) ContainerDataYaml
-            .readContainer(descriptor);
-
-    KeyValueContainerData updated = new KeyValueContainerData(
-        replicated.getContainerID(),
-        replicated.getLayOutVersion(),
-        replicated.getMaxSize(),
-        replicated.getOriginPipelineId(),
-        replicated.getOriginNodeId());
-
-    //inherited from the replicated
-    updated
-        .setState(replicated.getState());
-    updated
-        .setContainerDBType(replicated.getContainerDBType());
-    updated
-        .updateBlockCommitSequenceId(replicated.getBlockCommitSequenceId());
-    updated
-        .setSchemaVersion(replicated.getSchemaVersion());
-
-    //inherited from the pre-created seed container
-    updated.setMetadataPath(preCreated.getMetadataPath());
-    updated.setDbFile(preCreated.getDbFile());
-    updated.setChunksPath(preCreated.getChunksPath());
-    updated.setVolume(preCreated.getVolume());
-
-    return updated;
+    //    CompletableFuture<Path> result;
+    //
+    try {
+      StreamingClient client =
+          new StreamingClient(datanode.getIpAddress(), datanode.getPort(
+              Name.REPLICATION).getValue(),
+              new ContainerStreamingDestination(preCreated));
+      final Channel channel = client.connect();
+      channel.writeAndFlush(preCreated.getContainerID() + "\n");
+      channel.closeFuture().sync().addListener(f -> {
+        LOG.info("Container " + preCreated.getContainerID()
+            + " is downloaded succesfully");
+      });
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+
+        //parse descriptor
+        //now, we have extracted the container descriptor from the previous
+        //datanode. We can load it and upload it with the current data
+        // (original metadata + current filepath fields)
+        KeyValueContainerData replicated =
+            (KeyValueContainerData) ContainerDataYaml
+                .readContainer(descriptor);
+
+        KeyValueContainerData updated = new KeyValueContainerData(
+            replicated.getContainerID(),
+            replicated.getLayOutVersion(),
+            replicated.getMaxSize(),
+            replicated.getOriginPipelineId(),
+            replicated.getOriginNodeId());
+
+        //inherited from the replicated
+        updated
+            .setState(replicated.getState());
+        updated
+            .setContainerDBType(replicated.getContainerDBType());
+        updated
+            .updateBlockCommitSequenceId(replicated
+           .getBlockCommitSequenceId());
+        updated
+            .setSchemaVersion(replicated.getSchemaVersion());
+
+        //inherited from the pre-created seed container
+        updated.setMetadataPath(preCreated.getMetadataPath());
+        updated.setDbFile(preCreated.getDbFile());
+        updated.setChunksPath(preCreated.getChunksPath());
+        updated.setVolume(preCreated.getVolume());
+
+        return updated;
+    return preCreated;
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
new file mode 100644
index 0000000..d6076e5
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java
@@ -0,0 +1,95 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ByteProcessor;
+
+public class DirstreamClientHandler extends ChannelInboundHandlerAdapter {
+
+    private final StreamingDestination destination;
+    private boolean headerMode = true;
+    private StringBuilder currentFileName = new StringBuilder();
+    private RandomAccessFile destFile;
+
+    private FileChannel destFileChannel;
+
+    private long remaining;
+
+    public DirstreamClientHandler(StreamingDestination streamingDestination) {
+        this.destination = streamingDestination;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws IOException {
+        ByteBuf buffer = (ByteBuf) msg;
+
+        if (headerMode) {
+            int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer
+                .readerIndex();
+            if (eolPosition > 0) {
+                headerMode = false;
+                currentFileName.append(buffer.readBytes(eolPosition)
+                    .toString(Charset.defaultCharset()));
+                buffer.readBytes(1);
+                String[] parts = currentFileName.toString().split(" ", 2);
+                remaining = Long.parseLong(parts[0]);
+                System.out.println("Starting to write to " + parts[1]);
+                Path destFilePath = destination.mapToDestination(parts[1]);
+                Files.createDirectories(destFilePath.getParent());
+                this.destFile =
+                    new RandomAccessFile(destFilePath.toFile(), "rw");
+                destFileChannel = this.destFile.getChannel();
+            } else {
+                currentFileName
+                    .append(buffer.toString(Charset.defaultCharset()));
+            }
+        }
+        if (!headerMode) {
+            if (remaining > buffer.readableBytes()) {
+                remaining -=
+                    buffer.readBytes(destFileChannel, buffer.readableBytes());
+            } else {
+                remaining -= buffer.readBytes(destFileChannel, (int) remaining);
+                currentFileName = new StringBuilder();
+                headerMode = true;
+                if (buffer.readableBytes() > 0) {
+                    channelRead(ctx, buffer);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext ctx) {
+        try {
+            if (destFile != null) {
+                destFile.close();
+            }
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        try {
+            destFileChannel.close();
+            destFile.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        ctx.close();
+    }
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
new file mode 100644
index 0000000..4264f71
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java
@@ -0,0 +1,68 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.DefaultFileRegion;
+
+public class DirstreamServerHandler extends ChannelInboundHandlerAdapter {
+
+    private StreamingSource source;
+
+    public DirstreamServerHandler(StreamingSource source) {
+        this.source = source;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
+        throws Exception {
+        ChannelFuture future = null;
+
+        for (Map.Entry<String, Path> entries : source.getFilesToStream("1")
+            .entrySet()) {
+            Path file = entries.getValue();
+            String name = entries.getKey();
+
+            if (future == null) {
+                long fileSize = Files.size(file);
+                future = ctx.writeAndFlush(
+                    fileSize + " " + name + "\n")
+                    .addListener(f -> ctx.writeAndFlush(
+                        new DefaultFileRegion(file.toFile(), 0, fileSize)));
+
+            } else {
+                long fileSize = Files.size(file);
+                future.addListener(f -> ctx.writeAndFlush(
+                    fileSize + " " + file.getFileName().toString() + "\n"))
+                    .addListener(f -> ctx.writeAndFlush(
+                        new DefaultFileRegion(file.toFile(), 0, fileSize)));
+
+            }
+        }
+
+        future.addListener(f -> ctx.channel().close());
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) {
+        ctx.flush();
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+        throws Exception {
+        cause.printStackTrace();
+        if (ctx.channel().isActive()) {
+            ctx.writeAndFlush("ERR: " +
+                cause.getClass().getSimpleName() + ": " +
+                cause.getMessage() + '\n').addListener(
+                ChannelFutureListener.CLOSE);
+        }
+        ctx.close();
+    }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
new file mode 100644
index 0000000..0676a74
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java
@@ -0,0 +1,62 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.CharsetUtil;
+
+public class StreamingClient {
+
+    private static EventLoopGroup group;
+    private final Bootstrap b;
+    private ChannelFuture f;
+    private int port;
+    private String host;
+    private StreamingDestination streamingDestination;
+
+    public StreamingClient(
+        String host,
+        int port,
+        StreamingDestination streamingDestination
+    ) throws InterruptedException {
+        this.port = port;
+        this.host = host;
+        this.streamingDestination = streamingDestination;
+
+        group = new NioEventLoopGroup();
+
+        b = new Bootstrap();
+        b.group(group)
+            .channel(NioSocketChannel.class)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline p = ch.pipeline();
+                    p.addLast(new LoggingHandler(LogLevel.INFO),
+                        new StringEncoder(CharsetUtil.UTF_8),
+                        new DirstreamClientHandler(streamingDestination));
+                }
+            });
+
+    }
+
+    public Channel connect() throws InterruptedException {
+        f = b.connect(host, port).sync();
+        return f.channel();
+    }
+
+    public void close() throws InterruptedException {
+        f.channel().closeFuture().sync();
+        group.shutdownGracefully();
+    }
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java
new file mode 100644
index 0000000..63c3ea8
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java
@@ -0,0 +1,9 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Path;
+
+public interface StreamingDestination {
+
+  Path mapToDestination(String name);
+
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java
new file mode 100644
index 0000000..c81ce96
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java
@@ -0,0 +1,74 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.net.InetSocketAddress;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+
+public class StreamingServer {
+
+    private int port;
+
+    private StreamingSource source;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup workerGroup;
+
+    public StreamingServer(
+        StreamingSource source, int port
+    ) {
+        this.port = port;
+        this.source = source;
+    }
+
+    public void start() throws InterruptedException {
+        ServerBootstrap b = new ServerBootstrap();
+        bossGroup = new NioEventLoopGroup(1);
+        workerGroup = new NioEventLoopGroup();
+
+        b.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .option(ChannelOption.SO_BACKLOG, 100)
+            .handler(new LoggingHandler(LogLevel.INFO))
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ch.pipeline().addLast(
+                        new StringEncoder(CharsetUtil.UTF_8),
+                        new LineBasedFrameDecoder(8192),
+                        new StringDecoder(CharsetUtil.UTF_8),
+                        new ChunkedWriteHandler(),
+                        new DirstreamServerHandler(source));
+                }
+            });
+
+        ChannelFuture f = b.bind(port).sync();
+        final InetSocketAddress socketAddress =
+            (InetSocketAddress) f.channel().localAddress();
+        port = socketAddress.getPort();
+
+    }
+
+    public void stop() {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+
+    public int getPort() {
+        return port;
+    }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java
new file mode 100644
index 0000000..c299eae
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java
@@ -0,0 +1,10 @@
+package org.apache.hadoop.ozone.container.stream;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public interface StreamingSource {
+
+  Map<String, Path> getFilesToStream(String id);
+
+}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 6ca5e44..da03318 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -19,16 +19,10 @@
 package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -55,7 +49,6 @@ import static org.apache.ratis.util.Preconditions.assertTrue;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Rule;
@@ -180,35 +173,6 @@ public class TestKeyValueContainer {
         ContainerProtos.ContainerDataProto.State.CLOSED);
   }
 
-  @Test
-  public void concurrentExport() throws Exception {
-    createContainer();
-    populate(100);
-    closeContainer();
-
-    AtomicReference<String> failed = new AtomicReference<>();
-
-    TarContainerPacker packer = new TarContainerPacker();
-    List<Thread> threads = IntStream.range(0, 20)
-        .mapToObj(i -> new Thread(() -> {
-          try {
-            File file = folder.newFile("concurrent" + i + ".tar.gz");
-            try (OutputStream out = new FileOutputStream(file)) {
-              keyValueContainer.exportContainerData(out, packer);
-            }
-          } catch (Exception e) {
-            failed.compareAndSet(null, e.getMessage());
-          }
-        }))
-        .collect(Collectors.toList());
-
-    threads.forEach(Thread::start);
-    for (Thread thread : threads) {
-      thread.join();
-    }
-
-    assertNull(failed.get());
-  }
 
   @Test
   public void testDuplicateContainer() throws Exception {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
deleted file mode 100644
index c067a59..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
-import org.apache.hadoop.test.LambdaTestUtils;
-
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Test the tar/untar for a given container.
- */
-@RunWith(Parameterized.class)
-public class TestTarContainerPacker {
-
-  private static final String TEST_DB_FILE_NAME = "test1";
-
-  private static final String TEST_DB_FILE_CONTENT = "test1";
-
-  private static final String TEST_CHUNK_FILE_NAME = "chunk1";
-
-  private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk";
-
-  private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
-
-  private final ContainerPacker packer
-      = new TarContainerPacker();
-
-  private static final Path SOURCE_CONTAINER_ROOT =
-      Paths.get("target/test/data/packer-source-dir");
-
-  private static final Path DEST_CONTAINER_ROOT =
-      Paths.get("target/test/data/packer-dest-dir");
-
-  private static final Path TEMP_DIR =
-      Paths.get("target/test/data/packer-tmp-dir");
-
-  private static final AtomicInteger CONTAINER_ID = new AtomicInteger(1);
-
-  private final ChunkLayOutVersion layout;
-
-  public TestTarContainerPacker(ChunkLayOutVersion layout) {
-    this.layout = layout;
-  }
-
-  @Parameterized.Parameters
-  public static Iterable<Object[]> parameters() {
-    return ChunkLayoutTestInfo.chunkLayoutParameters();
-  }
-
-  @BeforeClass
-  public static void init() throws IOException {
-    initDir(SOURCE_CONTAINER_ROOT);
-    initDir(DEST_CONTAINER_ROOT);
-    initDir(TEMP_DIR);
-  }
-
-  @AfterClass
-  public static void cleanup() throws IOException {
-    FileUtils.deleteDirectory(SOURCE_CONTAINER_ROOT.toFile());
-    FileUtils.deleteDirectory(DEST_CONTAINER_ROOT.toFile());
-    FileUtils.deleteDirectory(TEMP_DIR.toFile());
-  }
-
-  private static void initDir(Path path) throws IOException {
-    if (path.toFile().exists()) {
-      FileUtils.deleteDirectory(path.toFile());
-    }
-    Files.createDirectories(path);
-  }
-
-  private KeyValueContainerData createContainer(Path dir) throws IOException {
-    long id = CONTAINER_ID.getAndIncrement();
-
-    Path containerDir = dir.resolve("container" + id);
-    Path dbDir = containerDir.resolve("db");
-    Path dataDir = containerDir.resolve("data");
-    Files.createDirectories(dbDir);
-    Files.createDirectories(dataDir);
-
-    KeyValueContainerData containerData = new KeyValueContainerData(
-        id, layout,
-        -1, UUID.randomUUID().toString(), UUID.randomUUID().toString());
-    containerData.setChunksPath(dataDir.toString());
-    containerData.setMetadataPath(dbDir.getParent().toString());
-    containerData.setDbFile(dbDir.toFile());
-
-    return containerData;
-  }
-
-  @Test
-  public void pack() throws IOException, CompressorException {
-
-    //GIVEN
-    OzoneConfiguration conf = new OzoneConfiguration();
-
-    KeyValueContainerData sourceContainerData =
-        createContainer(SOURCE_CONTAINER_ROOT);
-
-    KeyValueContainer sourceContainer =
-        new KeyValueContainer(sourceContainerData, conf);
-
-    //sample db file in the metadata directory
-    writeDbFile(sourceContainerData, TEST_DB_FILE_NAME);
-
-    //sample chunk file in the chunk directory
-    writeChunkFile(sourceContainerData, TEST_CHUNK_FILE_NAME);
-
-    //sample container descriptor file
-    writeDescriptor(sourceContainer);
-
-    Path targetFile = TEMP_DIR.resolve("container.tar");
-
-    //WHEN: pack it
-    try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
-      packer.pack(sourceContainer.getContainerData(), output);
-    }
-
-    //THEN: check the result
-    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
-      TarArchiveInputStream tarStream = new TarArchiveInputStream(input);
-
-      TarArchiveEntry entry;
-      Map<String, TarArchiveEntry> entries = new HashMap<>();
-      while ((entry = tarStream.getNextTarEntry()) != null) {
-        entries.put(entry.getName(), entry);
-      }
-      Assert.assertTrue(
-          entries.containsKey("container.yaml"));
-
-    }
-
-    //read the container descriptor only
-    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
-      String containerYaml = new String(packer.unpackContainerDescriptor(input),
-          StandardCharsets.UTF_8);
-      Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml);
-    }
-
-    KeyValueContainerData destinationContainerData =
-        createContainer(DEST_CONTAINER_ROOT);
-
-    String descriptor;
-
-    //unpackContainerData
-    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
-      descriptor =
-          new String(
-              packer.unpackContainerData(destinationContainerData, input),
-              StandardCharsets.UTF_8);
-    }
-
-    assertExampleMetadataDbIsGood(
-        destinationContainerData.getDbFile().toPath(),
-        TEST_DB_FILE_NAME);
-    assertExampleChunkFileIsGood(
-        Paths.get(destinationContainerData.getChunksPath()),
-        TEST_CHUNK_FILE_NAME);
-    Assert.assertFalse(
-        "Descriptor file should not have been extracted by the "
-            + "unpackContainerData Call",
-        destinationContainerData.getContainerFile().exists());
-    Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
-  }
-
-  @Test
-  public void unpackContainerDataWithValidRelativeDbFilePath()
-      throws Exception {
-    //GIVEN
-    KeyValueContainerData sourceContainerData =
-        createContainer(SOURCE_CONTAINER_ROOT);
-
-    String fileName = "sub/dir/" + TEST_DB_FILE_NAME;
-    File file = writeDbFile(sourceContainerData, fileName);
-    String entryName = TarContainerPacker.DB_DIR_NAME + "/" + fileName;
-
-    File containerFile = packContainerWithSingleFile(file, entryName);
-
-    // WHEN
-    KeyValueContainerData dest = unpackContainerData(containerFile);
-
-    // THEN
-    assertExampleMetadataDbIsGood(dest.getDbFile().toPath(), fileName);
-  }
-
-  @Test
-  public void unpackContainerDataWithValidRelativeChunkFilePath()
-      throws Exception {
-    //GIVEN
-    KeyValueContainerData sourceContainerData =
-        createContainer(SOURCE_CONTAINER_ROOT);
-
-    String fileName = "sub/dir/" + TEST_CHUNK_FILE_NAME;
-    File file = writeChunkFile(sourceContainerData, fileName);
-    String entryName = TarContainerPacker.CHUNKS_DIR_NAME + "/" + fileName;
-
-    File containerFile = packContainerWithSingleFile(file, entryName);
-
-    // WHEN
-    KeyValueContainerData dest = unpackContainerData(containerFile);
-
-    // THEN
-    assertExampleChunkFileIsGood(Paths.get(dest.getChunksPath()), fileName);
-  }
-
-  @Test
-  public void unpackContainerDataWithInvalidRelativeDbFilePath()
-      throws Exception {
-    //GIVEN
-    KeyValueContainerData sourceContainerData =
-        createContainer(SOURCE_CONTAINER_ROOT);
-
-    String fileName = "../db_file";
-    File file = writeDbFile(sourceContainerData, fileName);
-    String entryName = TarContainerPacker.DB_DIR_NAME + "/" + fileName;
-
-    File containerFile = packContainerWithSingleFile(file, entryName);
-
-    LambdaTestUtils.intercept(IllegalArgumentException.class,
-        () -> unpackContainerData(containerFile));
-  }
-
-  @Test
-  public void unpackContainerDataWithInvalidRelativeChunkFilePath()
-      throws Exception {
-    //GIVEN
-    KeyValueContainerData sourceContainerData =
-        createContainer(SOURCE_CONTAINER_ROOT);
-
-    String fileName = "../chunk_file";
-    File file = writeChunkFile(sourceContainerData, fileName);
-    String entryName = TarContainerPacker.CHUNKS_DIR_NAME + "/" + fileName;
-
-    File containerFile = packContainerWithSingleFile(file, entryName);
-
-    LambdaTestUtils.intercept(IllegalArgumentException.class,
-        () -> unpackContainerData(containerFile));
-  }
-
-  private KeyValueContainerData unpackContainerData(File containerFile)
-      throws IOException {
-    try (FileInputStream input = new FileInputStream(containerFile)) {
-      OzoneConfiguration conf = new OzoneConfiguration();
-      KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
-      packer.unpackContainerData(data, input);
-      return data;
-    }
-  }
-
-  private void writeDescriptor(KeyValueContainer container) throws IOException {
-    try (FileWriter writer = new FileWriter(
-        container.getContainerData().getContainerFile())) {
-      IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer);
-    }
-  }
-
-  private File writeChunkFile(
-      KeyValueContainerData containerData, String chunkFileName)
-      throws IOException {
-    Path path = Paths.get(containerData.getChunksPath())
-        .resolve(chunkFileName);
-    Files.createDirectories(path.getParent());
-    File file = path.toFile();
-    try (FileWriter writer = new FileWriter(file)) {
-      IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer);
-    }
-    return file;
-  }
-
-  private File writeDbFile(
-      KeyValueContainerData containerData, String dbFileName)
-      throws IOException {
-    Path path = containerData.getDbFile().toPath()
-        .resolve(dbFileName);
-    Files.createDirectories(path.getParent());
-    File file = path.toFile();
-    try (FileWriter writer = new FileWriter(file)) {
-      IOUtils.write(TEST_DB_FILE_CONTENT, writer);
-    }
-    return file;
-  }
-
-  private File packContainerWithSingleFile(File file, String entryName)
-      throws Exception {
-    File targetFile = TEMP_DIR.resolve("container.tar").toFile();
-    try (FileOutputStream output = new FileOutputStream(targetFile);
-         ArchiveOutputStream archive = new TarArchiveOutputStream(output)) {
-      TarContainerPacker.includeFile(file, entryName, archive);
-    }
-    return targetFile;
-  }
-
-  private void assertExampleMetadataDbIsGood(Path dbPath, String filename)
-      throws IOException {
-
-    Path dbFile = dbPath.resolve(filename);
-
-    Assert.assertTrue(
-        "example DB file is missing after pack/unpackContainerData: " + dbFile,
-        Files.exists(dbFile));
-
-    try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) {
-      List<String> strings = IOUtils
-          .readLines(testFile, StandardCharsets.UTF_8);
-      Assert.assertEquals(1, strings.size());
-      Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0));
-    }
-  }
-
-  private void assertExampleChunkFileIsGood(Path chunkPath, String filename)
-      throws IOException {
-
-    Path chunkFile = chunkPath.resolve(filename);
-
-    Assert.assertTrue(
-        "example chunk file is missing after pack/unpackContainerData: "
-            + chunkFile,
-        Files.exists(chunkFile));
-
-    try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) {
-      List<String> strings = IOUtils
-          .readLines(testFile, StandardCharsets.UTF_8);
-      Assert.assertEquals(1, strings.size());
-      Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0));
-    }
-  }
-
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
deleted file mode 100644
index cf6ece3..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for {@code GrpcOutputStream}.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class TestGrpcOutputStream {
-
-  private static final Random RND = new Random();
-
-  private final long containerId = RND.nextLong();
-  private final int bufferSize = RND.nextInt(1024) + 128 + 1;
-
-  @Mock
-  private StreamObserver<CopyContainerResponseProto> observer;
-
-  private OutputStream subject;
-
-  @Before
-  public void setUp() throws Exception {
-    subject = new GrpcOutputStream(observer, containerId, bufferSize);
-  }
-
-  @Test
-  public void seriesOfBytesInSingleResponse() throws IOException {
-    byte[] bytes = getRandomBytes(5);
-    for (byte b : bytes) {
-      subject.write(b);
-    }
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void mixedBytesAndArraysInSingleResponse() throws IOException {
-    byte[] bytes = getRandomBytes(16);
-    subject.write(bytes[0]);
-    subject.write(bytes, 1, 14);
-    subject.write(bytes[15]);
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void mixedArraysAndBytesInSingleResponse() throws IOException {
-    byte[] bytes = getRandomBytes(10);
-
-    subject.write(bytes, 0, 5);
-    subject.write(bytes[5]);
-    subject.write(bytes, 6, 4);
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void seriesOfArraysInSingleResponse() throws IOException {
-    byte[] bytes = getRandomBytes(8);
-
-    subject.write(bytes, 0, 5);
-    subject.write(bytes, 5, 3);
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void seriesOfArraysExactlyFillBuffer() throws IOException {
-    int half = bufferSize / 2, otherHalf = bufferSize - half;
-    byte[] bytes = getRandomBytes(2 * bufferSize);
-
-    // fill buffer
-    subject.write(bytes, 0, half);
-    subject.write(bytes, half, otherHalf);
-    // fill buffer again
-    subject.write(bytes, bufferSize, half);
-    subject.write(bytes, bufferSize + half, otherHalf);
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void bufferFlushedWhenFull() throws IOException {
-    byte[] bytes = getRandomBytes(bufferSize);
-
-    subject.write(bytes, 0, bufferSize-1);
-    subject.write(bytes[bufferSize-1]);
-    verify(observer).onNext(any());
-
-    subject.write(bytes[0]);
-    subject.write(bytes, 1, bufferSize-1);
-    verify(observer, times(2)).onNext(any());
-  }
-
-  @Test
-  public void singleArraySpansMultipleResponses() throws IOException {
-    byte[] bytes = writeBytes(subject, 2 * bufferSize + bufferSize/2);
-    subject.close();
-
-    verifyResponses(bytes);
-  }
-
-  @Test
-  public void secondWriteSpillsToNextResponse() throws IOException {
-    byte[] bytes1 = writeBytes(subject, bufferSize / 2);
-    byte[] bytes2 = writeBytes(subject, 2 * bufferSize);
-    subject.close();
-
-    verifyResponses(concat(bytes1, bytes2));
-  }
-
-  private void verifyResponses(byte[] bytes) {
-    int expectedResponseCount = bytes.length / bufferSize;
-    if (bytes.length % bufferSize > 0) {
-      expectedResponseCount++;
-    }
-
-    ArgumentCaptor<CopyContainerResponseProto> captor =
-        ArgumentCaptor.forClass(CopyContainerResponseProto.class);
-    verify(observer, times(expectedResponseCount)).onNext(captor.capture());
-
-    List<CopyContainerResponseProto> responses =
-        new ArrayList<>(captor.getAllValues());
-    for (int i = 0; i < expectedResponseCount; i++) {
-      CopyContainerResponseProto response = responses.get(i);
-      assertEquals(containerId, response.getContainerID());
-
-      int expectedOffset = i * bufferSize;
-      assertEquals(expectedOffset, response.getReadOffset());
-
-      int size = Math.min(bufferSize, bytes.length - expectedOffset);
-      assertEquals(size, response.getLen());
-
-      byte[] part = new byte[size];
-      System.arraycopy(bytes, expectedOffset, part, 0, size);
-      ByteString data = response.getData();
-      assertArrayEquals(part, data.toByteArray());
-
-      // we don't want concatenated ByteStrings
-      assertEquals("LiteralByteString", data.getClass().getSimpleName());
-    }
-
-    verify(observer, times(1)).onCompleted();
-  }
-
-  private static byte[] concat(byte[]... parts) {
-    int length = Arrays.stream(parts).mapToInt(each -> each.length).sum();
-    byte[] bytes = new byte[length];
-    int pos = 0;
-    for (byte[] part : parts) {
-      System.arraycopy(part, 0, bytes, pos, part.length);
-      pos += part.length;
-    }
-    return bytes;
-  }
-
-  private static byte[] writeBytes(OutputStream subject, int size)
-      throws IOException {
-    byte[] bytes = getRandomBytes(size);
-    subject.write(bytes);
-    return bytes;
-  }
-
-  private static byte[] getRandomBytes(int size) {
-    byte[] bytes = new byte[size];
-    RND.nextBytes(bytes);
-    return bytes;
-  }
-
-}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
index 5360eac..c9cd1eb 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationService.java
@@ -1,13 +1,13 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
@@ -39,7 +39,9 @@ import org.apache.hadoop.test.GenericTestUtils;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
 /**
@@ -48,20 +50,97 @@ import org.junit.Test;
 public class TestReplicationService {
 
   @Test
-  public void test() throws IOException, TimeoutException,
-      InterruptedException {
-    final UUID scmUuid = UUID.randomUUID();
+  public void test() throws Exception {
+
+    final String scmUuid = "99335668-b0b2-46ff-bd6e-7bc9cc0e1404";
+    final String clusterUuid = "    7001371d-d474-4ae2-bd80-d942b31f8bc9";
     //start server
-    ConfigurationSource ozoneConfig = new OzoneConfiguration();
+    final Path sourceDir = Paths
+        .get(System.getProperty("user.dir"), "target", "test-data", "source");
+    final Path destDir = Paths
+        .get(System.getProperty("user.dir"), "target", "test-data", "dest");
+    FileUtils.deleteDirectory(sourceDir.toFile());
+    FileUtils.deleteDirectory(destDir.toFile());
+
+    final String sourceDnUUID = "d6979383-5fd5-4fa5-be02-9b39f06d763d";
+    final String destDnUUID = "bb11a0cc-8902-4f07-adae-a853ba891132";
+
+    ReplicationServer replicationServer =
+        initSource(clusterUuid, scmUuid, destDnUUID, sourceDir.toString());
+
+    //start client
+    ContainerSet
+        destinationContainerSet =
+        replicateContainer(scmUuid,
+            sourceDnUUID,
+            replicationServer.getPort(),
+            destDnUUID,
+            destDir);
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return destinationContainerSet.getContainer(1L) != null;
+      }
+    }, 1000, 10_000);
+  }
+
+  @NotNull
+  private ContainerSet replicateContainer(
+      String scmUuid,
+      String sourceDnUUID,
+      int port,
+      String destDnUUID,
+      Path destDir
+      ) throws IOException {
+    ContainerSet sourceContainerSet = new ContainerSet();
+
+    OzoneConfiguration clientConfig = new OzoneConfiguration();
+    clientConfig.set("hdds.datanode.dir", destDir.toString());
+    MutableVolumeSet volumeSet =
+        new MutableVolumeSet(destDnUUID, clientConfig);
+
+    DownloadAndImportReplicator replicator = new DownloadAndImportReplicator(
+        clientConfig,
+        () -> scmUuid,
+        sourceContainerSet,
+        new SimpleContainerDownloader(clientConfig, null),
+        volumeSet);
+
+    DatanodeDetails source =
+        DatanodeDetails.newBuilder()
+            .setIpAddress("127.0.0.1")
+            .setUuid(UUID.fromString(sourceDnUUID))
+            .build();
+
+    source.setPort(Name.REPLICATION, port);
+    List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
+    sourceDatanodes.add(source);
+
+    ContainerSet destinationContainerSet = new ContainerSet();
+    ReplicationSupervisor supervisor =
+        new ReplicationSupervisor(destinationContainerSet, replicator, 10);
+    replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
+    return destinationContainerSet;
+  }
+
+  private ReplicationServer initSource(
+      String clusterUuid,
+      String scmUuid,
+      String sourceDnUUID,
+      String sourceDir
+  )
+      throws Exception {
+    OzoneConfiguration ozoneConfig = new OzoneConfiguration();
+    ozoneConfig.set("hdds.datanode.dir", sourceDir);
 
-    final String sourceDnUUID = UUID.randomUUID().toString();
-    final String destDnUUID = UUID.randomUUID().toString();
     MutableVolumeSet sourceVolumes =
         new MutableVolumeSet(sourceDnUUID, ozoneConfig);
 
     VolumeChoosingPolicy v = new RoundRobinVolumeChoosingPolicy();
     final HddsVolume volume =
         v.chooseVolume(sourceVolumes.getVolumesList(), 5L);
+    volume.format(clusterUuid);
 
     KeyValueContainerData kvd = new KeyValueContainerData(1L, "/tmp/asd");
     kvd.setState(State.OPEN);
@@ -82,7 +161,7 @@ public class TestReplicationService {
     final ContainerCommandRequestProto containerCommandRequest =
         ContainerCommandRequestProto.newBuilder()
             .setCmdType(Type.WriteChunk)
-            .setDatanodeUuid(destDnUUID)
+            .setDatanodeUuid(sourceDnUUID)
             .setContainerID(kvc.getContainerData().getContainerID())
             .setWriteChunk(WriteChunkRequestProto.newBuilder()
                 .setBlockID(DatanodeBlockID.newBuilder()
@@ -103,7 +182,8 @@ public class TestReplicationService {
                 .build())
             .build();
 
-    handler.handle(containerCommandRequest, kvc, new DispatcherContext.Builder().build());
+    handler.handle(containerCommandRequest, kvc,
+        new DispatcherContext.Builder().build());
 
     HashMap<ContainerType, Handler> handlers = Maps.newHashMap();
     ContainerController controller =
@@ -114,45 +194,14 @@ public class TestReplicationService {
 
     SecurityConfig securityConfig = new SecurityConfig(ozoneConfig);
     ReplicationServer replicationServer =
-        new ReplicationServer(controller, replicationConfig, securityConfig,
+        new ReplicationServer(sourceContainerSet, replicationConfig, securityConfig,
             null);
 
+    kvd.setState(State.CLOSED);
+
     replicationServer.init();
     replicationServer.start();
-
-    //start client
-    OzoneConfiguration clientConfig = new OzoneConfiguration();
-    clientConfig.set("hdds.datanode.dir","tmp/qwe");
-    MutableVolumeSet volumeSet =
-        new MutableVolumeSet(destDnUUID, clientConfig);
-
-    DownloadAndImportReplicator replicator = new DownloadAndImportReplicator(
-        ozoneConfig,
-        () -> scmUuid.toString(),
-        sourceContainerSet,
-        new SimpleContainerDownloader(ozoneConfig, null),
-        volumeSet);
-
-    DatanodeDetails source =
-        DatanodeDetails.newBuilder()
-            .setIpAddress("127.0.0.1")
-            .setUuid(UUID.randomUUID())
-            .build();
-    source.setPort(Name.REPLICATION, replicationServer.getPort());
-    List<DatanodeDetails> sourceDatanodes = new ArrayList<>();
-    sourceDatanodes.add(source);
-
-    ContainerSet destinationContainerSet = new ContainerSet();
-    ReplicationSupervisor supervisor =
-        new ReplicationSupervisor(destinationContainerSet, replicator, 10);
-    replicator.replicate(new ReplicationTask(1L, sourceDatanodes));
-
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        return destinationContainerSet.getContainer(1L) != null;
-      }
-    }, 1000, 10_000);
+    return replicationServer;
   }
 
 }
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java
deleted file mode 100644
index 686c8be..0000000
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ExportContainer.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.debug;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-
-import org.apache.hadoop.hdds.cli.GenericParentCommand;
-import org.apache.hadoop.hdds.cli.SubcommandWithParent;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerReader;
-import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource;
-import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-
-import com.google.common.base.Preconditions;
-import org.kohsuke.MetaInfServices;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import picocli.CommandLine;
-import picocli.CommandLine.Command;
-import picocli.CommandLine.ParentCommand;
-
-@Command(name = "export-container",
-    description = "Export one container to a tarball")
-@MetaInfServices(SubcommandWithParent.class)
-public class ExportContainer implements SubcommandWithParent, Callable<Void> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ExportContainer.class);
-  @ParentCommand
-  private GenericParentCommand parent;
-
-  @CommandLine.Option(names = {"--container"},
-      required = true,
-      description = "Container Id")
-  private long containerId;
-
-  @CommandLine.Option(names = {"--dest"},
-      defaultValue = "/tmp",
-      description = "Destination directory")
-  private String destination;
-
-  @Override
-  public Class<?> getParentType() {
-    return OzoneDebug.class;
-  }
-
-  @Override
-  public Void call() throws Exception {
-
-    ConfigurationSource conf = parent.createOzoneConfiguration();
-
-    ContainerSet containerSet = new ContainerSet();
-
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-
-    String firstStorageDir = getFirstStorageDir(conf);
-
-    String datanodeUuid = getDatanodeUUID(firstStorageDir, conf);
-
-    String scmId = getScmId(firstStorageDir);
-
-    MutableVolumeSet volumeSet = new MutableVolumeSet(datanodeUuid, conf);
-
-    Map<ContainerType, Handler> handlers = new HashMap<>();
-
-    for (ContainerType containerType : ContainerType.values()) {
-      final Handler handler =
-          Handler.getHandlerForContainerType(
-              containerType,
-              conf,
-              datanodeUuid,
-              containerSet,
-              volumeSet,
-              metrics,
-              containerReplicaProto -> {
-              });
-      handler.setScmID(scmId);
-      handlers.put(containerType, handler);
-    }
-
-    ContainerController controller =
-        new ContainerController(containerSet, handlers);
-
-    final ContainerReplicationSource replicationSource =
-        new OnDemandContainerReplicationSource(controller);
-
-    Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList()
-        .iterator();
-
-    LOG.info("Starting the read all the container metadata");
-
-    while (volumeSetIterator.hasNext()) {
-      HddsVolume volume = volumeSetIterator.next();
-      LOG.info("Loading container metadata from volume " + volume.toString());
-      final ContainerReader reader =
-          new ContainerReader(volumeSet, volume, containerSet, conf);
-      reader.run();
-    }
-
-    LOG.info("All the container metadata is loaded. Starting to replication");
-
-    replicationSource.prepare(containerId);
-    LOG.info("Preparation is done");
-
-    final File destinationFile =
-        new File(destination, "container-" + containerId + ".tar.gz");
-    try (FileOutputStream fos = new FileOutputStream(destinationFile)) {
-      replicationSource.copyData(containerId, fos);
-    }
-    LOG.info("Container is exported to {}", destinationFile);
-
-    return null;
-  }
-
-  public String getScmId(String storageDir) throws IOException {
-    Preconditions.checkNotNull(storageDir);
-    return Files.list(Paths.get(storageDir, "hdds"))
-        .filter(Files::isDirectory)
-        .findFirst().get().getFileName().toString();
-  }
-
-  public String getDatanodeUUID(String storageDir, ConfigurationSource config)
-      throws IOException {
-
-    final File versionFile = new File(storageDir, "hdds/VERSION");
-
-    Properties props = DatanodeVersionFile.readFrom(versionFile);
-    if (props.isEmpty()) {
-      throw new InconsistentStorageStateException(
-          "Version file " + versionFile + " is missing");
-    }
-
-    return HddsVolumeUtil
-        .getProperty(props, OzoneConsts.DATANODE_UUID, versionFile);
-  }
-
-  private String getFirstStorageDir(ConfigurationSource config)
-      throws IOException {
-    final Collection<String> storageDirs =
-        MutableVolumeSet.getDatanodeStorageDirs(config);
-
-    return
-        StorageLocation.parse(storageDirs.iterator().next())
-            .getUri().getPath();
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org