You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2021/02/24 08:45:53 UTC

[ozone] 04/27: closed container replication

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 664bd5e83c681a9e8feff805f2c56a7bffd91cda
Author: Elek Márton <el...@apache.org>
AuthorDate: Fri Jan 22 06:33:19 2021 +0100

    closed container replication
---
 .../ozone/container/common/impl/ContainerData.java |  67 ++++++--
 .../container/common/impl/ContainerDataYaml.java   |  14 +-
 .../container/common/impl/HddsDispatcher.java      |   4 +
 .../container/common/interfaces/Container.java     |  24 +--
 .../common/interfaces/ContainerPacker.java         |   9 +-
 .../ozone/container/common/interfaces/Handler.java |  21 ---
 .../common/statemachine/DatanodeStateMachine.java  |   8 +-
 .../container/keyvalue/KeyValueContainer.java      | 177 ++++++---------------
 .../container/keyvalue/KeyValueContainerCheck.java |  26 +--
 .../container/keyvalue/KeyValueContainerData.java  | 106 +++++++-----
 .../ozone/container/keyvalue/KeyValueHandler.java  |  31 ----
 .../container/keyvalue/TarContainerPacker.java     |  22 +--
 .../container/ozoneimpl/ContainerController.java   |  39 +----
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   4 +
 .../container/replication/ContainerDownloader.java |   7 +-
 .../container/replication/ContainerReplicator.java |   2 +-
 .../replication/DownloadAndImportReplicator.java   | 119 +++++++-------
 .../replication/GrpcReplicationClient.java         |  80 +++-------
 .../OnDemandContainerReplicationSource.java        |  10 +-
 .../replication/ReplicationSupervisor.java         |   3 +
 .../replication/SimpleContainerDownloader.java     |  74 +++++----
 .../container/keyvalue/TestKeyValueContainer.java  | 124 ++++-----------
 .../container/keyvalue/TestTarContainerPacker.java |  29 ++--
 .../replication/TestReplicationSupervisor.java     |  17 +-
 .../replication/TestSimpleContainerDownloader.java |  64 ++++----
 .../ozone/freon/ClosedContainerReplicator.java     |  40 +----
 pom.xml                                            |   2 +-
 27 files changed, 436 insertions(+), 687 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 19cc1e2..f65e54a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -17,31 +17,28 @@
  */
 package org.apache.hadoop.ozone.container.common.impl;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
-import java.util.List;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
-import org.yaml.snakeyaml.Yaml;
 
-import javax.annotation.Nullable;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import static org.apache.hadoop.ozone.OzoneConsts.CHECKSUM;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_TYPE;
@@ -52,6 +49,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.METADATA;
 import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_NODE_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.ORIGIN_PIPELINE_ID;
 import static org.apache.hadoop.ozone.OzoneConsts.STATE;
+import org.yaml.snakeyaml.Yaml;
 
 /**
  * ContainerData is the in-memory representation of container metadata and is
@@ -67,7 +65,7 @@ public abstract class ContainerData {
   private final long containerID;
 
   // Layout version of the container data
-  private final int layOutVersion;
+  private int layOutVersion;
 
   // Metadata of the container will be a key value pair.
   // This can hold information like volume name, owner etc.,
@@ -83,6 +81,9 @@ public abstract class ContainerData {
 
   private boolean committedSpace;
 
+  // Path to Container metadata Level DB/RocksDB Store and .container file.
+  private String metadataPath;
+
   //ID of the pipeline where this container is created
   private String originPipelineId;
   //ID of the datanode where this container is created
@@ -98,6 +99,8 @@ public abstract class ContainerData {
 
   private HddsVolume volume;
 
+  private File containerFile;
+
   private String checksum;
 
   /** Timestamp of last data scan (milliseconds since Unix Epoch).
@@ -630,4 +633,38 @@ public abstract class ContainerData {
     incrWriteBytes(bytesWritten);
   }
 
+  public void setLayoutVersion(int version) {
+    this.layOutVersion = version;
+  }
+
+  public File getContainerFile() {
+    return ContainerData.getContainerFile(getMetadataPath(), getContainerID());
+  }
+
+  public static File getContainerFile(String metadataPath, long containerId) {
+    return new File(metadataPath,
+        containerId + OzoneConsts.CONTAINER_EXTENSION);
+  }
+
+  /**
+   * Returns container metadata path.
+   * @return - Physical path where container file and checksum is stored.
+   */
+  public String getMetadataPath() {
+    return metadataPath;
+  }
+
+  /**
+   * Sets container metadata path.
+   *
+   * @param path - String.
+   */
+  public void setMetadataPath(String path) {
+    this.metadataPath = path;
+  }
+
+
+  public static List<String> getYamlFields() {
+    return YAML_FIELDS;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index 757d7e8..3f54e25 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -32,18 +32,15 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import com.google.common.base.Preconditions;
-import static org.apache.hadoop.ozone.container.keyvalue
-    .KeyValueContainerData.KEYVALUE_YAML_TAG;
+import org.apache.commons.io.IOUtils;
+import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.KEYVALUE_YAML_TAG;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.Yaml;
@@ -156,8 +153,7 @@ public final class ContainerDataYaml {
     Yaml yaml = new Yaml(containerDataConstructor, representer);
     yaml.setBeanAccess(BeanAccess.FIELD);
 
-    containerData = (ContainerData)
-        yaml.load(input);
+    containerData = yaml.load(input);
 
     return containerData;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 13c1780..fa451a3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -648,6 +648,10 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
         .build();
   }
 
+  public String getScmId() {
+    return scmID;
+  }
+
   enum EventType {
     READ,
     WRITE
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 67a7a16..063ac33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -19,18 +19,12 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.time.Instant;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.util.RwLock;
@@ -127,20 +121,6 @@ public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
   void updateDeleteTransactionId(long deleteTransactionId);
 
   /**
-   * Import the container from an external archive.
-   */
-  void importContainerData(InputStream stream,
-      ContainerPacker<CONTAINERDATA> packer) throws IOException;
-
-  /**
-   * Export all the data of the container to one output archive with the help
-   * of the packer.
-   *
-   */
-  void exportContainerData(OutputStream stream,
-      ContainerPacker<CONTAINERDATA> packer) throws IOException;
-
-  /**
    * Returns containerReport for the container.
    */
   ContainerReplicaProto getContainerReport()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
index 8308c23..a76e6f0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
@@ -22,13 +22,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 /**
  * Service to pack/unpack ContainerData container data to/from a single byte
  * stream.
  */
-public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
+public interface ContainerPacker {
 
   /**
    * Extract the container data to the path defined by the container.
@@ -38,7 +38,8 @@ public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
    * @return the byte content of the descriptor (which won't be written to a
    * file but returned).
    */
-  byte[] unpackContainerData(Container<CONTAINERDATA> container,
+  byte[] unpackContainerData(
+      KeyValueContainerData container,
       InputStream inputStream)
       throws IOException;
 
@@ -46,7 +47,7 @@ public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
    * Compress all the container data (chunk data, metadata db AND container
    * descriptor) to one single archive.
    */
-  void pack(Container<CONTAINERDATA> container, OutputStream destination)
+  void pack(KeyValueContainerData containerData, OutputStream destination)
       throws IOException;
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 4ba7572..04317e9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.function.Consumer;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -31,12 +29,10 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -107,23 +103,6 @@ public abstract class Handler {
       DispatcherContext dispatcherContext);
 
   /**
-   * Imports container from a raw input stream.
-   */
-  public abstract Container importContainer(
-      ContainerData containerData, InputStream rawContainerStream,
-      TarContainerPacker packer)
-      throws IOException;
-
-  /**
-   * Exports container to the output stream.
-   */
-  public abstract void exportContainer(
-      Container container,
-      OutputStream outputStream,
-      TarContainerPacker packer)
-      throws IOException;
-
-  /**
    * Stop the Handler.
    */
   public abstract void stop();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a7cb2d3..0969d64 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Dele
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
@@ -126,11 +125,12 @@ public class DatanodeStateMachine implements Closeable {
     nextHB = new AtomicLong(Time.monotonicNow());
 
     ContainerReplicator replicator =
-        new DownloadAndImportReplicator(container.getContainerSet(),
-            container.getController(),
+        new DownloadAndImportReplicator(conf,
+            () -> container.getScmId(),
+            container.getContainerSet(),
             new SimpleContainerDownloader(conf,
                 dnCertClient != null ? dnCertClient.getCACertificate() : null),
-            new TarContainerPacker());
+            container.getVolumeSet());
 
     supervisor =
         new ReplicationSupervisor(container.getContainerSet(), replicator,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 53d6162..d2111f7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
@@ -54,7 +53,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_FILES_CREATE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -108,36 +106,29 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     long maxSize = containerData.getMaxSize();
     volumeSet.readLock();
     try {
+
       HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
           .getVolumesList(), maxSize);
-      String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
 
       long containerID = containerData.getContainerID();
 
-      containerMetaDataPath = KeyValueContainerLocationUtil
-          .getContainerMetaDataPath(hddsVolumeDir, scmId, containerID);
-      containerData.setMetadataPath(containerMetaDataPath.getPath());
-
-      File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
-          hddsVolumeDir, scmId, containerID);
+      containerData.assignToVolume(scmId, containerVolume);
 
+      containerMetaDataPath = new File(containerData.getMetadataPath());
       // Check if it is new Container.
       ContainerUtils.verifyIsNewContainer(containerMetaDataPath);
 
-      //Create Metadata path chunks path and metadata db
-      File dbFile = getContainerDBFile();
-
       // This method is only called when creating new containers.
       // Therefore, always use the newest schema version.
       containerData.setSchemaVersion(OzoneConsts.SCHEMA_LATEST);
       KeyValueContainerUtil.createContainerMetaData(containerID,
-              containerMetaDataPath, chunksPath, dbFile,
-              containerData.getSchemaVersion(), config);
+          containerMetaDataPath,
+          new File(containerData.getChunksPath()),
+          containerData.getDbFile(),
+          containerData.getSchemaVersion(),
+          config);
+
 
-      //Set containerData for the KeyValueContainer.
-      containerData.setChunksPath(chunksPath.getPath());
-      containerData.setDbFile(dbFile);
-      containerData.setVolume(containerVolume);
 
       // Create .container file
       File containerFile = getContainerFile();
@@ -455,100 +446,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     containerData.updateDeleteTransactionId(deleteTransactionId);
   }
 
-  @Override
-  public void importContainerData(InputStream input,
-      ContainerPacker<KeyValueContainerData> packer) throws IOException {
-    writeLock();
-    try {
-      if (getContainerFile().exists()) {
-        String errorMessage = String.format(
-            "Can't import container (cid=%d) data to a specific location"
-                + " as the container descriptor (%s) has already been exist.",
-            getContainerData().getContainerID(),
-            getContainerFile().getAbsolutePath());
-        throw new IOException(errorMessage);
-      }
-      //copy the values from the input stream to the final destination
-      // directory.
-      byte[] descriptorContent = packer.unpackContainerData(this, input);
-
-      Preconditions.checkNotNull(descriptorContent,
-          "Container descriptor is missing from the container archive: "
-              + getContainerData().getContainerID());
-
-      //now, we have extracted the container descriptor from the previous
-      //datanode. We can load it and upload it with the current data
-      // (original metadata + current filepath fields)
-      KeyValueContainerData originalContainerData =
-          (KeyValueContainerData) ContainerDataYaml
-              .readContainer(descriptorContent);
-
-
-      containerData.setState(originalContainerData.getState());
-      containerData
-          .setContainerDBType(originalContainerData.getContainerDBType());
-      containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
-
-      //rewriting the yaml file with new checksum calculation.
-      update(originalContainerData.getMetadata(), true);
-
-      //fill in memory stat counter (keycount, byte usage)
-      KeyValueContainerUtil.parseKVContainerData(containerData, config);
-
-    } catch (Exception ex) {
-      //delete all the temporary data in case of any exception.
-      try {
-        FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
-        FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
-        FileUtils.deleteDirectory(getContainerFile());
-      } catch (Exception deleteex) {
-        LOG.error(
-            "Can not cleanup destination directories after a container import"
-                + " error (cid" +
-                containerData.getContainerID() + ")", deleteex);
-      }
-      throw ex;
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  @Override
-  public void exportContainerData(OutputStream destination,
-      ContainerPacker<KeyValueContainerData> packer) throws IOException {
-    writeLock();
-    try {
-      // Closed/ Quasi closed containers are considered for replication by
-      // replication manager if they are under-replicated.
-      ContainerProtos.ContainerDataProto.State state =
-          getContainerData().getState();
-      if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
-          state == ContainerDataProto.State.QUASI_CLOSED)) {
-        throw new IllegalStateException(
-            "Only (quasi)closed containers can be exported, but " +
-                "ContainerId=" + getContainerData().getContainerID() +
-                " is in state " + state);
-      }
-
-      try {
-        compactDB();
-        // Close DB (and remove from cache) to avoid concurrent modification
-        // while packing it.
-        BlockUtils.removeDB(containerData, config);
-      } finally {
-        readLock();
-        writeUnlock();
-      }
-
-      packer.pack(this, destination);
-    } finally {
-      if (lock.isWriteLockedByCurrentThread()) {
-        writeUnlock();
-      } else {
-        readUnlock();
-      }
-    }
-  }
 
   /**
    * Acquire read lock.
@@ -620,14 +517,9 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
    * @return .container File name
    */
   @Override
+  @Deprecated
   public File getContainerFile() {
-    return getContainerFile(containerData.getMetadataPath(),
-            containerData.getContainerID());
-  }
-
-  static File getContainerFile(String metadataPath, long containerId) {
-    return new File(metadataPath,
-        containerId + OzoneConsts.CONTAINER_EXTENSION);
+    return containerData.getContainerFile();
   }
 
   @Override
@@ -697,15 +589,6 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     return state;
   }
 
-  /**
-   * Returns container DB file.
-   * @return
-   */
-  public File getContainerDBFile() {
-    return new File(containerData.getMetadataPath(), containerData
-        .getContainerID() + OzoneConsts.DN_CONTAINER_DB);
-  }
-
   public boolean scanMetaData() {
     long containerId = containerData.getContainerID();
     KeyValueContainerCheck checker =
@@ -735,6 +618,44 @@ public class KeyValueContainer implements Container<KeyValueContainerData> {
     return checker.fullCheck(throttler, canceler);
   }
 
+  public void exportContainerData(
+      OutputStream destination,
+      ContainerPacker packer
+  ) throws IOException {
+    writeLock();
+    try {
+      // Closed/ Quasi closed containers are considered for replication by
+      // replication manager if they are under-replicated.
+      ContainerProtos.ContainerDataProto.State state =
+          getContainerData().getState();
+      if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
+          state == ContainerDataProto.State.QUASI_CLOSED)) {
+        throw new IllegalStateException(
+            "Only (quasi)closed containers can be exported, but " +
+                "ContainerId=" + getContainerData().getContainerID() +
+                " is in state " + state);
+      }
+
+      try {
+        compactDB();
+        // Close DB (and remove from cache) to avoid concurrent modification
+        // while packing it.
+        BlockUtils.removeDB(containerData, config);
+      } finally {
+        readLock();
+        writeUnlock();
+      }
+
+      packer.pack(containerData, destination);
+    } finally {
+      if (lock.isWriteLockedByCurrentThread()) {
+        writeUnlock();
+      } else {
+        readUnlock();
+      }
+    }
+  }
+
   private enum ContainerCheckLevel {
     NO_CHECK, FAST_CHECK, FULL_CHECK
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 20c1455..0f47e8a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -18,7 +18,12 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.util.Canceler;
@@ -30,26 +35,21 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.interfaces.BlockIterator;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Arrays;
 
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_LEVELDB;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
-
 /**
  * Class to run integrity checks on Datanode Containers.
  * Provide infra for Data Scrubbing
@@ -324,8 +324,8 @@ public class KeyValueContainerCheck {
   }
 
   private void loadContainerData() throws IOException {
-    File containerFile = KeyValueContainer
-        .getContainerFile(metadataPath, containerID);
+    File containerFile =
+        ContainerData.getContainerFile(metadataPath, containerID);
 
     onDiskContainerData = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(containerFile);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 5c20e6e..c9b0b31 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -18,37 +18,36 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
-import org.yaml.snakeyaml.nodes.Tag;
-
-
-import java.io.File;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import static java.lang.Math.max;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
 import static org.apache.hadoop.ozone.OzoneConsts.CHUNKS_PATH;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE;
+import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB_TYPE_ROCKSDB;
 import static org.apache.hadoop.ozone.OzoneConsts.METADATA_PATH;
-import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_BYTES_USED;
-import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_COUNT;
 import static org.apache.hadoop.ozone.OzoneConsts.PENDING_DELETE_BLOCK_COUNT;
+import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_VERSION;
+import org.yaml.snakeyaml.nodes.Tag;
 
 /**
  * This class represents the KeyValueContainer metadata, which is the
@@ -63,9 +62,6 @@ public class KeyValueContainerData extends ContainerData {
   // Fields need to be stored in .container file.
   private static final List<String> KV_YAML_FIELDS;
 
-  // Path to Container metadata Level DB/RocksDB Store and .container file.
-  private String metadataPath;
-
   //Type of DB used to store key to chunks mapping
   private String containerDBType = CONTAINER_DB_TYPE_ROCKSDB;
 
@@ -92,14 +88,26 @@ public class KeyValueContainerData extends ContainerData {
     KV_YAML_FIELDS.add(SCHEMA_VERSION);
   }
 
+  public KeyValueContainerData(long id) {
+    this(id, ChunkLayOutVersion.FILE_PER_BLOCK, 0, "", "");
+  }
+
+  public KeyValueContainerData(long id, String metadataPath) {
+    this(id, ChunkLayOutVersion.FILE_PER_BLOCK, 0, "", "");
+    setMetadataPath(metadataPath);
+  }
+
   /**
    * Constructs KeyValueContainerData object.
-   * @param id - ContainerId
+   *
+   * @param id            - ContainerId
    * @param layOutVersion chunk layout
-   * @param size - maximum size of the container in bytes
+   * @param size          - maximum size of the container in bytes
    */
-  public KeyValueContainerData(long id, ChunkLayOutVersion layOutVersion,
-      long size, String originPipelineId, String originNodeId) {
+  public KeyValueContainerData(
+      long id, ChunkLayOutVersion layOutVersion,
+      long size, String originPipelineId, String originNodeId
+  ) {
     super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion,
         size, originPipelineId, originNodeId);
     this.numPendingDeletionBlocks = new AtomicLong(0);
@@ -148,28 +156,11 @@ public class KeyValueContainerData extends ContainerData {
   }
 
   /**
-   * Returns container metadata path.
-   * @return - Physical path where container file and checksum is stored.
-   */
-  public String getMetadataPath() {
-    return metadataPath;
-  }
-
-  /**
-   * Sets container metadata path.
-   *
-   * @param path - String.
-   */
-  public void setMetadataPath(String path) {
-    this.metadataPath = path;
-  }
-
-  /**
    * Returns the path to base dir of the container.
    * @return Path to base dir
    */
   public String getContainerPath() {
-    return new File(metadataPath).getParent();
+    return new File(getMetadataPath()).getParent();
   }
 
   /**
@@ -290,12 +281,39 @@ public class KeyValueContainerData extends ContainerData {
 
     // Set Bytes used and block count key.
     metadataTable.putWithBatch(batchOperation, CONTAINER_BYTES_USED,
-            getBytesUsed());
+        getBytesUsed());
     metadataTable.putWithBatch(batchOperation, BLOCK_COUNT,
-            getKeyCount() - deletedBlockCount);
+        getKeyCount() - deletedBlockCount);
     metadataTable.putWithBatch(batchOperation, PENDING_DELETE_BLOCK_COUNT,
-            (long)(getNumPendingDeletionBlocks() - deletedBlockCount));
+        (long) (getNumPendingDeletionBlocks() - deletedBlockCount));
 
     db.getStore().getBatchHandler().commitBatchOperation(batchOperation);
   }
+
+  public void assignToVolume(String scmId, HddsVolume containerVolume) {
+    String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+
+    File containerMetaDataPath = KeyValueContainerLocationUtil
+        .getContainerMetaDataPath(hddsVolumeDir, scmId, getContainerID());
+    setMetadataPath(containerMetaDataPath.getPath());
+
+    File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
+        hddsVolumeDir, scmId, getContainerID());
+
+    //Set containerData for the KeyValueContainer.
+    setChunksPath(chunksPath.getPath());
+    setDbFile(getContainerDBFile());
+    setVolume(containerVolume);
+
+  }
+
+  /**
+   * Returns container DB file.
+   *
+   * @return
+   */
+  public File getContainerDBFile() {
+    return new File(getMetadataPath(),
+        getContainerID() + OzoneConsts.DN_CONTAINER_DB);
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index dbc2a97..74c33e4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -54,7 +52,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
@@ -910,34 +907,6 @@ public class KeyValueHandler extends Handler {
   }
 
   @Override
-  public Container importContainer(ContainerData originalContainerData,
-      final InputStream rawContainerStream,
-      final TarContainerPacker packer)
-      throws IOException {
-
-    KeyValueContainerData containerData =
-        new KeyValueContainerData(originalContainerData);
-
-    KeyValueContainer container = new KeyValueContainer(containerData,
-        conf);
-
-    populateContainerPathFields(container);
-    container.importContainerData(rawContainerStream, packer);
-    sendICR(container);
-    return container;
-
-  }
-
-  @Override
-  public void exportContainer(final Container container,
-      final OutputStream outputStream,
-      final TarContainerPacker packer)
-      throws IOException{
-    final KeyValueContainer kvc = (KeyValueContainer) container;
-    kvc.exportContainerData(outputStream, packer);
-  }
-
-  @Override
   public void markContainerForClose(Container container)
       throws IOException {
     container.writeLock();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 349a15d..aaafed4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -32,9 +32,9 @@ import java.util.stream.Stream;
 
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 
+import static java.util.stream.Collectors.toList;
 import org.apache.commons.compress.archivers.ArchiveEntry;
 import org.apache.commons.compress.archivers.ArchiveInputStream;
 import org.apache.commons.compress.archivers.ArchiveOutputStream;
@@ -44,13 +44,11 @@ import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.commons.io.IOUtils;
 
-import static java.util.stream.Collectors.toList;
-
 /**
  * Compress/uncompress KeyValueContainer data to a tar.gz archive.
  */
 public class TarContainerPacker
-    implements ContainerPacker<KeyValueContainerData> {
+    implements ContainerPacker {
 
   static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
 
@@ -62,15 +60,14 @@ public class TarContainerPacker
    * Given an input stream (tar file) extract the data to the specified
    * directories.
    *
-   * @param container container which defines the destination structure.
+   * @param containerData container which defines the destination structure.
    * @param input the input stream.
    */
   @Override
-  public byte[] unpackContainerData(Container<KeyValueContainerData> container,
+  public byte[] unpackContainerData(KeyValueContainerData containerData,
       InputStream input)
       throws IOException {
     byte[] descriptorFileContent = null;
-    KeyValueContainerData containerData = container.getContainerData();
     Path dbRoot = containerData.getDbFile().toPath();
     Path chunksRoot = Paths.get(containerData.getChunksPath());
 
@@ -103,8 +100,7 @@ public class TarContainerPacker
 
     } catch (CompressorException e) {
       throw new IOException(
-          "Can't uncompress the given container: " + container
-              .getContainerData().getContainerID(),
+          "Can't uncompress the given container: " + containerData.getContainerID(),
           e);
     }
   }
@@ -139,16 +135,14 @@ public class TarContainerPacker
    * Given a containerData include all the required container data/metadata
    * in a tar file.
    *
-   * @param container Container to archive (data + metadata).
+   * @param containerData Container to archive
    * @param output   Destination tar file/stream.
    */
   @Override
-  public void pack(Container<KeyValueContainerData> container,
+  public void pack(KeyValueContainerData containerData,
       OutputStream output)
       throws IOException {
 
-    KeyValueContainerData containerData = container.getContainerData();
-
     try (OutputStream compressed = compress(output);
          ArchiveOutputStream archiveOutput = tar(compressed)) {
 
@@ -158,7 +152,7 @@ public class TarContainerPacker
       includePath(Paths.get(containerData.getChunksPath()), CHUNKS_DIR_NAME,
           archiveOutput);
 
-      includeFile(container.getContainerFile(), CONTAINER_FILE_NAME,
+      includeFile(containerData.getContainerFile(), CONTAINER_FILE_NAME,
           archiveOutput);
     } catch (CompressorException e) {
       throw new IOException(
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 2398c85..4d710f5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -17,26 +17,19 @@
  */
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
-import org.apache.hadoop.hdds.protocol.datanode.proto
-    .ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto.State;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.time.Instant;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+
 /**
  * Control plane for container management in datanode.
  */
@@ -130,22 +123,6 @@ public class ContainerController {
     getHandler(container).closeContainer(container);
   }
 
-  public Container importContainer(
-      final ContainerData containerData,
-      final InputStream rawContainerStream,
-      final TarContainerPacker packer)
-      throws IOException {
-    return handlers.get(containerData.getContainerType())
-        .importContainer(containerData, rawContainerStream, packer);
-  }
-
-  public void exportContainer(final ContainerType type,
-      final long containerId, final OutputStream outputStream,
-      final TarContainerPacker packer) throws IOException {
-    handlers.get(type).exportContainer(
-        containerSet.getContainer(containerId), outputStream, packer);
-  }
-
   /**
    * Deletes a container given its Id.
    * @param containerId Id of the container to be deleted
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 3ecddac..4ea2d6d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -347,4 +347,8 @@ public class OzoneContainer {
   public MutableVolumeSet getVolumeSet() {
     return volumeSet;
   }
+
+  public String getScmId() {
+    return hddsDispatcher.getScmId();
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
index 9511241..16578bb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java
@@ -18,11 +18,10 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.Closeable;
-import java.nio.file.Path;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 /**
  * Service to download container data from other datanodes.
@@ -34,7 +33,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  */
 public interface ContainerDownloader extends Closeable {
 
-  CompletableFuture<Path> getContainerDataFromReplicas(long containerId,
-      List<DatanodeDetails> sources);
+  KeyValueContainerData getContainerDataFromReplicas(
+      KeyValueContainerData containerData, List<DatanodeDetails> sources);
 
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
index 827b9d6..939761f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
 
 /**
  * Service to do the real replication task.
- *
+ * <p>
  * An implementation should download the container and im
  */
 public interface ContainerReplicator {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index cdab0fd..06b5b33 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -17,22 +17,25 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,74 +52,78 @@ public class DownloadAndImportReplicator implements ContainerReplicator {
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
   private final ContainerDownloader downloader;
+  private final ConfigurationSource config;
+
+  private final Supplier<String> scmId;
 
-  private final TarContainerPacker packer;
+  private VolumeSet volumeSet;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> scmId,
       ContainerSet containerSet,
-      ContainerController controller,
       ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
     this.downloader = downloader;
-    this.packer = packer;
+    this.config = config;
+    this.scmId = scmId;
+    this.volumeSet = volumeSet;
   }
 
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
-    try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
-    }
-  }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (scmId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-
     try {
-      //wait for the download. This thread pool is limiting the paralell
-      //downloads, so it's ok to block here and wait for the full download.
-      Path path = tempTarFile.get();
+
+      VolumeChoosingPolicy volumeChoosingPolicy = config.getClass(
+          HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy
+              .class, VolumeChoosingPolicy.class).newInstance();
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      final KeyValueContainerData loadedContainerData =
+          downloader
+              .getContainerDataFromReplicas(containerData, sourceDatanodes);
+
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(volumeSet.getVolumesList(), maxContainerSize);
+      loadedContainerData.assignToVolume(scmId.get(), volume);
+
+      //write out container descriptor
+      KeyValueContainer keyValueContainer =
+          new KeyValueContainer(loadedContainerData, config);
+
+      //rewriting the yaml file with new checksum calculation.
+      keyValueContainer.update(loadedContainerData.getMetadata(), true);
+
+      //fill in memory stat counter (keycount, byte usage)
+      KeyValueContainerUtil.parseKVContainerData(containerData, config);
+
+      //load container
+      containerSet.addContainer(keyValueContainer);
+
       LOG.info("Container {} is downloaded, starting to import.",
           containerID);
-      importContainer(containerID, path);
+
       LOG.info("Container {} is replicated successfully", containerID);
       task.setStatus(Status.DONE);
     } catch (Exception e) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 53dac9d..89e23fe 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -19,14 +19,10 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.UncheckedIOException;
-import java.nio.file.Files;
 import java.nio.file.Path;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
@@ -35,8 +31,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServi
 import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
-import com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
@@ -49,7 +45,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Client to read container data from gRPC.
  */
-public class GrpcReplicationClient implements AutoCloseable{
+public class GrpcReplicationClient implements AutoCloseable {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(GrpcReplicationClient.class);
@@ -91,23 +87,18 @@ public class GrpcReplicationClient implements AutoCloseable{
     workingDirectory = workingDir;
   }
 
-  public CompletableFuture<Path> download(long containerId) {
+  public void download(
+      KeyValueContainerData containerData,
+      OutputStream outputStream
+  ) {
     CopyContainerRequestProto request =
         CopyContainerRequestProto.newBuilder()
-            .setContainerID(containerId)
+            .setContainerID(containerData.getContainerID())
             .setLen(-1)
             .setReadOffset(0)
             .build();
 
-    CompletableFuture<Path> response = new CompletableFuture<>();
-
-    Path destinationPath =
-        getWorkingDirectory().resolve("container-" + containerId + ".tar.gz");
-
-    client.download(request,
-        new StreamDownloader(containerId, response, destinationPath));
-
-    return response;
+    client.download(request, new StreamDownloader(outputStream));
   }
 
   private Path getWorkingDirectory() {
@@ -134,74 +125,41 @@ public class GrpcReplicationClient implements AutoCloseable{
   public static class StreamDownloader
       implements StreamObserver<CopyContainerResponseProto> {
 
-    private final CompletableFuture<Path> response;
-    private final long containerId;
-    private final OutputStream stream;
-    private final Path outputPath;
+    private final OutputStream outputStream;
 
-    public StreamDownloader(long containerId, CompletableFuture<Path> response,
-        Path outputPath) {
-      this.response = response;
-      this.containerId = containerId;
-      this.outputPath = outputPath;
-      try {
-        Preconditions.checkNotNull(outputPath, "Output path cannot be null");
-        Path parentPath = Preconditions.checkNotNull(outputPath.getParent());
-        Files.createDirectories(parentPath);
-        stream = new FileOutputStream(outputPath.toFile());
-      } catch (IOException e) {
-        throw new UncheckedIOException(
-            "Output path can't be used: " + outputPath, e);
-      }
+    public StreamDownloader(
+        OutputStream output
+    ) {
+      this.outputStream = output;
     }
 
     @Override
     public void onNext(CopyContainerResponseProto chunk) {
       try {
-        chunk.getData().writeTo(stream);
+        chunk.getData().writeTo(outputStream);
       } catch (IOException e) {
-        response.completeExceptionally(e);
+        e.printStackTrace();
       }
     }
 
     @Override
     public void onError(Throwable throwable) {
       try {
-        LOG.error("Download of container {} was unsuccessful",
-            containerId, throwable);
-        stream.close();
-        deleteOutputOnFailure();
-        response.completeExceptionally(throwable);
+        outputStream.close();
       } catch (IOException e) {
-        LOG.error("Failed to close {} for container {}",
-            outputPath, containerId, e);
-        response.completeExceptionally(e);
+        e.printStackTrace();
       }
     }
 
     @Override
     public void onCompleted() {
       try {
-        stream.close();
-        LOG.info("Container {} is downloaded to {}", containerId, outputPath);
-        response.complete(outputPath);
+        outputStream.close();
       } catch (IOException e) {
-        LOG.error("Downloaded container {} OK, but failed to close {}",
-            containerId, outputPath, e);
-        response.completeExceptionally(e);
+        e.printStackTrace();
       }
 
     }
 
-    private void deleteOutputOnFailure() {
-      try {
-        Files.delete(outputPath);
-      } catch (IOException ex) {
-        LOG.error("Failed to delete temporary destination {} for " +
-                "unsuccessful download of container {}",
-            outputPath, containerId, ex);
-      }
-    }
   }
-
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
index 40d8e48..474333e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java
@@ -20,11 +20,11 @@ package org.apache.hadoop.ozone.container.replication;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 
 /**
  * A naive implementation of the replication source which creates a tar file
@@ -51,13 +51,13 @@ public class OnDemandContainerReplicationSource
   public void copyData(long containerId, OutputStream destination)
       throws IOException {
 
-    Container container = controller.getContainer(containerId);
+    KeyValueContainer container =
+        (KeyValueContainer) controller.getContainer(containerId);
 
     Preconditions.checkNotNull(
         container, "Container is not found " + containerId);
 
-    controller.exportContainer(
-        container.getContainerType(), containerId, destination, packer);
+    container.exportContainerData(destination, packer);
 
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 6becf62..8dcb646 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
 
@@ -45,6 +46,8 @@ public class ReplicationSupervisor {
   private final ContainerReplicator replicator;
   private final ExecutorService executor;
 
+  private ConfigurationSource config;
+
   private final AtomicLong requestCounter = new AtomicLong();
   private final AtomicLong successCounter = new AtomicLong();
   private final AtomicLong failureCounter = new AtomicLong();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
index 0967503..3f0e63a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.cert.X509Certificate;
@@ -32,6 +34,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.jetbrains.annotations.NotNull;
@@ -53,6 +58,7 @@ public class SimpleContainerDownloader implements ContainerDownloader {
   private final Path workingDirectory;
   private final SecurityConfig securityConfig;
   private final X509Certificate caCert;
+  private TarContainerPacker packer = new TarContainerPacker();
 
   public SimpleContainerDownloader(
       ConfigurationSource conf,
@@ -73,40 +79,27 @@ public class SimpleContainerDownloader implements ContainerDownloader {
   }
 
   @Override
-  public CompletableFuture<Path> getContainerDataFromReplicas(
-      long containerId,
+  public KeyValueContainerData getContainerDataFromReplicas(
+      KeyValueContainerData containerData,
       List<DatanodeDetails> sourceDatanodes
   ) {
 
-    CompletableFuture<Path> result = null;
-
     final List<DatanodeDetails> shuffledDatanodes =
         shuffleDatanodes(sourceDatanodes);
 
     for (DatanodeDetails datanode : shuffledDatanodes) {
       try {
-        if (result == null) {
-          result = downloadContainer(containerId, datanode);
-        } else {
-
-          result = result.exceptionally(t -> {
-            LOG.error("Error on replicating container: " + containerId, t);
-            try {
-              return downloadContainer(containerId, datanode).join();
-            } catch (Exception e) {
-              LOG.error("Error on replicating container: " + containerId,
-                  e);
-              return null;
-            }
-          });
-        }
+        return downloadContainer(containerData, datanode);
       } catch (Exception ex) {
         LOG.error(String.format(
             "Container %s download from datanode %s was unsuccessful. "
-                + "Trying the next datanode", containerId, datanode), ex);
+                + "Trying the next datanode", containerData.getContainerID(),
+            datanode), ex);
       }
     }
-    return result;
+    throw new RuntimeException(
+        "Couldn't download container from any of the datanodes " + containerData
+            .getContainerID());
 
   }
 
@@ -127,8 +120,8 @@ public class SimpleContainerDownloader implements ContainerDownloader {
   }
 
   @VisibleForTesting
-  protected CompletableFuture<Path> downloadContainer(
-      long containerId,
+  protected KeyValueContainerData downloadContainer(
+      KeyValueContainerData containerData,
       DatanodeDetails datanode
   ) throws IOException {
     CompletableFuture<Path> result;
@@ -136,17 +129,30 @@ public class SimpleContainerDownloader implements ContainerDownloader {
         new GrpcReplicationClient(datanode.getIpAddress(),
             datanode.getPort(Name.REPLICATION).getValue(),
             workingDirectory, securityConfig, caCert);
-    result = grpcReplicationClient.download(containerId)
-        .thenApply(r -> {
-          try {
-            grpcReplicationClient.close();
-          } catch (Exception e) {
-            LOG.error("Couldn't close Grpc replication client", e);
-          }
-          return r;
-        });
-
-    return result;
+
+    PipedOutputStream outputStream = new PipedOutputStream();
+
+    grpcReplicationClient.download(containerData, outputStream);
+    final byte[] descriptor = packer
+        .unpackContainerData(containerData, new PipedInputStream(outputStream));
+
+    //parse descriptor
+    //now, we have extracted the container descriptor from the previous
+    //datanode. We can load it and upload it with the current data
+    // (original metadata + current filepath fields)
+    KeyValueContainerData originalContainerData =
+        (KeyValueContainerData) ContainerDataYaml
+            .readContainer(descriptor);
+
+    containerData.setState(originalContainerData.getState());
+    containerData
+        .setContainerDBType(originalContainerData.getContainerDBType());
+    containerData.setSchemaVersion(originalContainerData.getSchemaVersion());
+    containerData.setLayoutVersion(
+        originalContainerData.getLayOutVersion().getVersion());
+
+    //update descriptor
+    return containerData;
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 4000e34..6ca5e44 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -18,63 +18,56 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume
-    .RoundRobinVolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
 import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
 
+import static org.apache.ratis.util.Preconditions.assertTrue;
 import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
-import org.rocksdb.ColumnFamilyOptions;
-
-import java.io.File;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.apache.ratis.util.Preconditions.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import org.mockito.Mockito;
 import static org.mockito.Mockito.mock;
+import org.rocksdb.ColumnFamilyOptions;
 
 /**
  * Class to test KeyValue Container operations.
@@ -141,74 +134,11 @@ public class TestKeyValueContainer {
     // Check whether container file and container db file exists or not.
     assertTrue(keyValueContainer.getContainerFile().exists(),
         ".Container File does not exist");
-    assertTrue(keyValueContainer.getContainerDBFile().exists(), "Container " +
-        "DB does not exist");
+//    assertTrue(keyValueContainer.getContainerDBFile().exists(), "Container " +
+//        "DB does not exist");
   }
 
-  @Test
-  public void testContainerImportExport() throws Exception {
-    long containerId = keyValueContainer.getContainerData().getContainerID();
-    createContainer();
-    long numberOfKeysToWrite = 12;
-    closeContainer();
-    populate(numberOfKeysToWrite);
-
-    //destination path
-    File folderToExport = folder.newFile("exported.tar.gz");
-
-    TarContainerPacker packer = new TarContainerPacker();
-
-    //export the container
-    try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
-      keyValueContainer
-          .exportContainerData(fos, packer);
-    }
-
-    //delete the original one
-    keyValueContainer.delete();
 
-    //create a new one
-    KeyValueContainerData containerData =
-        new KeyValueContainerData(containerId,
-            keyValueContainerData.getLayOutVersion(),
-            keyValueContainerData.getMaxSize(), UUID.randomUUID().toString(),
-            datanodeId.toString());
-    KeyValueContainer container = new KeyValueContainer(containerData, CONF);
-
-    HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
-        .getVolumesList(), 1);
-    String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
-
-    container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
-    try (FileInputStream fis = new FileInputStream(folderToExport)) {
-      container.importContainerData(fis, packer);
-    }
-
-    assertEquals("value1", containerData.getMetadata().get("key1"));
-    assertEquals(keyValueContainerData.getContainerDBType(),
-        containerData.getContainerDBType());
-    assertEquals(keyValueContainerData.getState(),
-        containerData.getState());
-    assertEquals(numberOfKeysToWrite,
-        containerData.getKeyCount());
-    assertEquals(keyValueContainerData.getLayOutVersion(),
-        containerData.getLayOutVersion());
-    assertEquals(keyValueContainerData.getMaxSize(),
-        containerData.getMaxSize());
-    assertEquals(keyValueContainerData.getBytesUsed(),
-        containerData.getBytesUsed());
-
-    //Can't overwrite existing container
-    try {
-      try (FileInputStream fis = new FileInputStream(folderToExport)) {
-        container.importContainerData(fis, packer);
-      }
-      fail("Container is imported twice. Previous files are overwritten");
-    } catch (IOException ex) {
-      //all good
-    }
-
-  }
 
   /**
    * Create the container on disk.
@@ -327,8 +257,8 @@ public class TestKeyValueContainer {
 
     assertFalse("Container File still exists",
         keyValueContainer.getContainerFile().exists());
-    assertFalse("Container DB file still exists",
-        keyValueContainer.getContainerDBFile().exists());
+//    assertFalse("Container DB file still exists",
+//        keyValueContainer.getContainerDBFile().exists());
   }
 
   @Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
index d248ac1..15b2526 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -33,21 +33,22 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.compress.archivers.ArchiveOutputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
-import org.apache.commons.compress.compressors.CompressorOutputStream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+import org.apache.hadoop.test.LambdaTestUtils;
 
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.compressors.CompressorException;
 import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
 import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -55,8 +56,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.commons.compress.compressors.CompressorStreamFactory.GZIP;
-
 /**
  * Test the tar/untar for a given container.
  */
@@ -73,7 +72,7 @@ public class TestTarContainerPacker {
 
   private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
 
-  private final ContainerPacker<KeyValueContainerData> packer
+  private final ContainerPacker packer
       = new TarContainerPacker();
 
   private static final Path SOURCE_CONTAINER_ROOT =
@@ -163,7 +162,7 @@ public class TestTarContainerPacker {
 
     //WHEN: pack it
     try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
-      packer.pack(sourceContainer, output);
+      packer.pack(sourceContainer.getContainerData(), output);
     }
 
     //THEN: check the result
@@ -193,15 +192,13 @@ public class TestTarContainerPacker {
     KeyValueContainerData destinationContainerData =
         createContainer(DEST_CONTAINER_ROOT);
 
-    KeyValueContainer destinationContainer =
-        new KeyValueContainer(destinationContainerData, conf);
-
     String descriptor;
 
     //unpackContainerData
     try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
       descriptor =
-          new String(packer.unpackContainerData(destinationContainer, input),
+          new String(
+              packer.unpackContainerData(destinationContainerData, input),
               StandardCharsets.UTF_8);
     }
 
@@ -214,7 +211,7 @@ public class TestTarContainerPacker {
     Assert.assertFalse(
         "Descriptor file should not have been extracted by the "
             + "unpackContainerData Call",
-        destinationContainer.getContainerFile().exists());
+        destinationContainerData.getContainerFile().exists());
     Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
   }
 
@@ -297,14 +294,14 @@ public class TestTarContainerPacker {
     try (FileInputStream input = new FileInputStream(containerFile)) {
       OzoneConfiguration conf = new OzoneConfiguration();
       KeyValueContainerData data = createContainer(DEST_CONTAINER_ROOT);
-      KeyValueContainer container = new KeyValueContainer(data, conf);
-      packer.unpackContainerData(container, input);
+      packer.unpackContainerData(data, input);
       return data;
     }
   }
 
   private void writeDescriptor(KeyValueContainer container) throws IOException {
-    try (FileWriter writer = new FileWriter(container.getContainerFile())) {
+    try (FileWriter writer = new FileWriter(
+        container.getContainerData().getContainerFile())) {
       IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer);
     }
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index df9ffbc..85be438 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.replication;
 
+import javax.annotation.Nonnull;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
@@ -29,14 +30,17 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
+import org.apache.hadoop.hdds.conf.InMemoryConfiguration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-
 import org.apache.hadoop.test.GenericTestUtils;
+
+import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static java.util.Collections.emptyList;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,11 +49,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
-import javax.annotation.Nonnull;
-
-import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
-import static java.util.Collections.emptyList;
-
 /**
  * Test the replication supervisor.
  */
@@ -189,12 +188,10 @@ public class TestReplicationSupervisor {
         Mockito.mock(SimpleContainerDownloader.class);
     CompletableFuture<Path> res = new CompletableFuture<>();
     res.complete(Paths.get("file:/tmp/no-such-file"));
-    Mockito.when(
-        moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList()))
-        .thenReturn(res);
 
     ContainerReplicator replicatorFactory =
-        new DownloadAndImportReplicator(set, null, moc, null);
+        new DownloadAndImportReplicator(new InMemoryConfiguration(),
+            () -> "", set, moc, null);
 
     replicatorRef.set(replicatorFactory);
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
index 7070425..f73b022 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSimpleContainerDownloader.java
@@ -18,19 +18,17 @@
 
 package org.apache.hadoop.ozone.container.replication;
 
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -47,17 +45,17 @@ public class TestSimpleContainerDownloader {
 
     //GIVEN
     List<DatanodeDetails> datanodes = createDatanodes();
+    KeyValueContainerData kvc = new KeyValueContainerData(1L);
 
     SimpleContainerDownloader downloader =
         createDownloaderWithPredefinedFailures(true);
 
     //WHEN
-    final Path result =
-        downloader.getContainerDataFromReplicas(1L, datanodes)
-            .get(1L, TimeUnit.SECONDS);
+    downloader.getContainerDataFromReplicas(kvc, datanodes);
 
     //THEN
-    Assert.assertEquals(datanodes.get(0).getUuidString(), result.toString());
+    //    Assert.assertEquals(datanodes.get(0).getUuidString(), result
+    //   .toString());
   }
 
   @Test
@@ -71,13 +69,14 @@ public class TestSimpleContainerDownloader {
         createDownloaderWithPredefinedFailures(true, datanodes.get(0));
 
     //WHEN
-    final Path result =
-        downloader.getContainerDataFromReplicas(1L, datanodes)
-            .get(1L, TimeUnit.SECONDS);
+
+    final KeyValueContainerData data = downloader
+        .getContainerDataFromReplicas(new KeyValueContainerData(1L), datanodes);
 
     //THEN
     //first datanode is failed, second worked
-    Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString());
+    Assert.assertEquals(datanodes.get(1).getUuidString(),
+        data.getContainerDBType());
   }
 
   @Test
@@ -90,13 +89,13 @@ public class TestSimpleContainerDownloader {
         createDownloaderWithPredefinedFailures(false, datanodes.get(0));
 
     //WHEN
-    final Path result =
-        downloader.getContainerDataFromReplicas(1L, datanodes)
-            .get(1L, TimeUnit.SECONDS);
+    final KeyValueContainerData data = downloader
+        .getContainerDataFromReplicas(new KeyValueContainerData(1L), datanodes);
 
     //THEN
     //first datanode is failed, second worked
-    Assert.assertEquals(datanodes.get(1).getUuidString(), result.toString());
+    Assert.assertEquals(datanodes.get(1).getUuidString(),
+        data.getContainerDBType());
   }
 
   /**
@@ -113,21 +112,22 @@ public class TestSimpleContainerDownloader {
         new SimpleContainerDownloader(new OzoneConfiguration(), null) {
 
           @Override
-          protected CompletableFuture<Path> downloadContainer(
-              long containerId, DatanodeDetails datanode
-          ) {
-            //download is always successful.
-            return CompletableFuture
-                .completedFuture(Paths.get(datanode.getUuidString()));
+          protected KeyValueContainerData downloadContainer(
+              KeyValueContainerData containerData, DatanodeDetails datanode
+          ) throws IOException {
+            containerData.setContainerDBType(datanode.getUuidString());
+            return containerData;
           }
         };
 
     //WHEN executed, THEN at least once the second datanode should be
     //returned.
     for (int i = 0; i < 10000; i++) {
-      Path path =
-          downloader.getContainerDataFromReplicas(1L, datanodes).get();
-      if (path.toString().equals(datanodes.get(1).getUuidString())) {
+      final KeyValueContainerData containerData =
+          downloader.getContainerDataFromReplicas(new KeyValueContainerData(1L),
+              datanodes);
+      if (containerData.getContainerDBType()
+          .equals(datanodes.get(1).getUuidString())) {
         return;
       }
     }
@@ -166,8 +166,8 @@ public class TestSimpleContainerDownloader {
       }
 
       @Override
-      protected CompletableFuture<Path> downloadContainer(
-          long containerId,
+      protected KeyValueContainerData downloadContainer(
+          KeyValueContainerData containerData,
           DatanodeDetails datanode
       ) {
 
@@ -175,15 +175,13 @@ public class TestSimpleContainerDownloader {
           if (directException) {
             throw new RuntimeException("Unavailable datanode");
           } else {
-            return CompletableFuture.supplyAsync(() -> {
-              throw new RuntimeException("Unavailable datanode");
-            });
+            throw new RuntimeException("Unavailable datanode");
+
           }
         } else {
-
           //path includes the dn id to make it possible to assert.
-          return CompletableFuture.completedFuture(
-              Paths.get(datanode.getUuidString()));
+          containerData.setContainerDBType(datanode.getUuidString());
+          return containerData;
         }
 
       }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
index ad2810a..7d7af35 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java
@@ -22,9 +22,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
@@ -33,18 +31,13 @@ import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
-import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.ContainerReplicator;
 import org.apache.hadoop.ozone.container.replication.DownloadAndImportReplicator;
 import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
@@ -168,36 +161,17 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements
     }
 
     ContainerSet containerSet = new ContainerSet();
-
-    ContainerMetrics metrics = ContainerMetrics.create(conf);
-
+    
     MutableVolumeSet volumeSet = new MutableVolumeSet(fakeDatanodeUuid, conf);
 
-    Map<ContainerType, Handler> handlers = new HashMap<>();
-
-    for (ContainerType containerType : ContainerType.values()) {
-      final Handler handler =
-          Handler.getHandlerForContainerType(
-              containerType,
-              conf,
-              fakeDatanodeUuid,
-              containerSet,
-              volumeSet,
-              metrics,
-              containerReplicaProto -> {
-              });
-      handler.setScmID(UUID.randomUUID().toString());
-      handlers.put(containerType, handler);
-    }
-
-    ContainerController controller =
-        new ContainerController(containerSet, handlers);
-
+    final UUID scmId = UUID.randomUUID();
     ContainerReplicator replicator =
-        new DownloadAndImportReplicator(containerSet,
-            controller,
+        new DownloadAndImportReplicator(
+            conf,
+            () -> scmId.toString(),
+            containerSet,
             new SimpleContainerDownloader(conf, null),
-            new TarContainerPacker());
+            volumeSet);
 
     supervisor = new ReplicationSupervisor(containerSet, replicator, 10);
   }
diff --git a/pom.xml b/pom.xml
index 9800005..c2db18e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -153,7 +153,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
     <protobuf.version>2.5.0</protobuf.version>
 
     <findbugs.version>3.0.0</findbugs.version>
-    <spotbugs.version>3.1.12</spotbugs.version>
+    <spotbugs.version>4.2.0</spotbugs.version>
     <dnsjava.version>2.1.7</dnsjava.version>
 
     <guava.version>28.2-jre</guava.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org