You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:37 UTC

[20/50] [abbrv] hadoop git commit: HDDS-328. Support export and import of the KeyValueContainer. Contributed by Elek Marton.

HDDS-328. Support export and import of the KeyValueContainer. Contributed by Elek Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca29fb75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca29fb75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca29fb75

Branch: refs/heads/YARN-3409
Commit: ca29fb754e8a162edba380a5f1deb48699e14d8b
Parents: 585ebd8
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Thu Aug 23 11:30:28 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Thu Aug 23 11:30:28 2018 -0700

----------------------------------------------------------------------
 .../common/impl/ContainerDataYaml.java          |  94 ++++---
 .../container/common/interfaces/Container.java  |  33 ++-
 .../common/interfaces/ContainerPacker.java      |  58 +++++
 .../container/keyvalue/KeyValueContainer.java   | 128 ++++++++--
 .../container/keyvalue/KeyValueHandler.java     |  19 +-
 .../container/keyvalue/TarContainerPacker.java  | 249 +++++++++++++++++++
 .../keyvalue/helpers/KeyValueContainerUtil.java |  22 +-
 .../keyvalue/TestKeyValueContainer.java         |  95 ++++++-
 .../keyvalue/TestTarContainerPacker.java        | 231 +++++++++++++++++
 9 files changed, 849 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index aed75d3..ec6d642 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -18,31 +18,34 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-
 import java.beans.IntrospectionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
+import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStreamWriter;
-import java.io.File;
+import java.io.Writer;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.Map;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.ozone.container.keyvalue
+    .KeyValueContainerData.KEYVALUE_YAML_TAG;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.AbstractConstruct;
 import org.yaml.snakeyaml.constructor.Constructor;
 import org.yaml.snakeyaml.introspector.BeanAccess;
@@ -54,9 +57,6 @@ import org.yaml.snakeyaml.nodes.ScalarNode;
 import org.yaml.snakeyaml.nodes.Tag;
 import org.yaml.snakeyaml.representer.Representer;
 
-import static org.apache.hadoop.ozone.container.keyvalue
-    .KeyValueContainerData.KEYVALUE_YAML_TAG;
-
 /**
  * Class for creating and reading .container files.
  */
@@ -106,36 +106,52 @@ public final class ContainerDataYaml {
   /**
    * Read the yaml file, and return containerData.
    *
-   * @param containerFile
    * @throws IOException
    */
   public static ContainerData readContainerFile(File containerFile)
       throws IOException {
     Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+    try (FileInputStream inputFileStream = new FileInputStream(containerFile)) {
+      return readContainer(inputFileStream);
+    }
+
+  }
+
+  /**
+   * Read the yaml file content, and return containerData.
+   *
+   * @throws IOException
+   */
+  public static ContainerData readContainer(byte[] containerFileContent)
+      throws IOException {
+    return readContainer(
+        new ByteArrayInputStream(containerFileContent));
+  }
+
+  /**
+   * Read the yaml content, and return containerData.
+   *
+   * @throws IOException
+   */
+  public static ContainerData readContainer(InputStream input)
+      throws IOException {
 
-    InputStream input = null;
     ContainerData containerData;
-    try {
-      PropertyUtils propertyUtils = new PropertyUtils();
-      propertyUtils.setBeanAccess(BeanAccess.FIELD);
-      propertyUtils.setAllowReadOnlyProperties(true);
+    PropertyUtils propertyUtils = new PropertyUtils();
+    propertyUtils.setBeanAccess(BeanAccess.FIELD);
+    propertyUtils.setAllowReadOnlyProperties(true);
 
-      Representer representer = new ContainerDataRepresenter();
-      representer.setPropertyUtils(propertyUtils);
+    Representer representer = new ContainerDataRepresenter();
+    representer.setPropertyUtils(propertyUtils);
 
-      Constructor containerDataConstructor = new ContainerDataConstructor();
+    Constructor containerDataConstructor = new ContainerDataConstructor();
 
-      Yaml yaml = new Yaml(containerDataConstructor, representer);
-      yaml.setBeanAccess(BeanAccess.FIELD);
+    Yaml yaml = new Yaml(containerDataConstructor, representer);
+    yaml.setBeanAccess(BeanAccess.FIELD);
+
+    containerData = (ContainerData)
+        yaml.load(input);
 
-      input = new FileInputStream(containerFile);
-      containerData = (ContainerData)
-          yaml.load(input);
-    } finally {
-      if (input!= null) {
-        input.close();
-      }
-    }
     return containerData;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 7f706b5..9380f0c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -18,26 +18,27 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerLifeCycleState;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
-    StorageContainerException;
 
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-
-
 /**
  * Interface for Container Operations.
  */
-public interface Container extends RwLock {
+public interface Container<CONTAINERDATA extends ContainerData> extends RwLock {
 
   /**
    * Creates a container.
@@ -71,7 +72,7 @@ public interface Container extends RwLock {
    * @return ContainerData - Container Data.
    * @throws StorageContainerException
    */
-  ContainerData getContainerData();
+  CONTAINERDATA getContainerData();
 
   /**
    * Get the Container Lifecycle state.
@@ -113,6 +114,20 @@ public interface Container extends RwLock {
   BlockIterator blockIterator() throws IOException;
 
   /**
+   * Import the container from an external archive.
+   */
+  void importContainerData(InputStream stream,
+      ContainerPacker<CONTAINERDATA> packer) throws IOException;
+
+  /**
+   * Export all the data of the container to one output archive with the help
+   * of the packer.
+   *
+   */
+  void exportContainerData(OutputStream stream,
+      ContainerPacker<CONTAINERDATA> packer) throws IOException;
+
+  /**
    * Returns containerReport for the container.
    */
   StorageContainerDatanodeProtocolProtos.ContainerInfo getContainerReport()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
new file mode 100644
index 0000000..8308c23
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerPacker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+
+/**
+ * Service to pack/unpack ContainerData container data to/from a single byte
+ * stream.
+ */
+public interface ContainerPacker<CONTAINERDATA extends ContainerData> {
+
+  /**
+   * Extract the container data to the path defined by the container.
+   * <p>
+   * This doesn't contain the extraction of the container descriptor file.
+   *
+   * @return the byte content of the descriptor (which won't be written to a
+   * file but returned).
+   */
+  byte[] unpackContainerData(Container<CONTAINERDATA> container,
+      InputStream inputStream)
+      throws IOException;
+
+  /**
+   * Compress all the container data (chunk data, metadata db AND container
+   * descriptor) to one single archive.
+   */
+  void pack(Container<CONTAINERDATA> container, OutputStream destination)
+      throws IOException;
+
+  /**
+   * Read the descriptor from the finished archive to get the data before
+   * importing the container.
+   */
+  byte[] unpackContainerDescriptor(InputStream inputStream)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 0ea748a..8108a11 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -18,9 +18,15 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
@@ -37,32 +43,27 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
-import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.keyvalue.helpers
     .KeyValueContainerLocationUtil;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_ALREADY_EXISTS;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_FILES_CREATE_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.DISK_OUT_OF_SPACE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.ERROR_IN_COMPACT_DB;
@@ -70,11 +71,13 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.INVALID_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.UNSUPPORTED_REQUEST;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class to perform KeyValue Container operations.
  */
-public class KeyValueContainer implements Container {
+public class KeyValueContainer implements Container<KeyValueContainerData> {
 
   private static final Logger LOG = LoggerFactory.getLogger(Container.class);
 
@@ -167,6 +170,34 @@ public class KeyValueContainer implements Container {
   }
 
   /**
+   * Set all of the path realted container data fields based on the name
+   * conventions.
+   *
+   * @param scmId
+   * @param containerVolume
+   * @param hddsVolumeDir
+   */
+  public void populatePathFields(String scmId,
+      HddsVolume containerVolume, String hddsVolumeDir) {
+
+    long containerId = containerData.getContainerID();
+
+    File containerMetaDataPath = KeyValueContainerLocationUtil
+        .getContainerMetaDataPath(hddsVolumeDir, scmId, containerId);
+
+    File chunksPath = KeyValueContainerLocationUtil.getChunksLocationPath(
+        hddsVolumeDir, scmId, containerId);
+    File dbFile = KeyValueContainerLocationUtil.getContainerDBFile(
+        containerMetaDataPath, containerId);
+
+    //Set containerData for the KeyValueContainer.
+    containerData.setMetadataPath(containerMetaDataPath.getPath());
+    containerData.setChunksPath(chunksPath.getPath());
+    containerData.setDbFile(dbFile);
+    containerData.setVolume(containerVolume);
+  }
+
+  /**
    * Writes to .container file.
    *
    * @param containerFile container file name
@@ -334,6 +365,75 @@ public class KeyValueContainer implements Container {
         containerData.getContainerPath()));
   }
 
+  @Override
+  public void importContainerData(InputStream input,
+      ContainerPacker<KeyValueContainerData> packer) throws IOException {
+    writeLock();
+    try {
+      if (getContainerFile().exists()) {
+        String errorMessage = String.format(
+            "Can't import container (cid=%d) data to a specific location"
+                + " as the container descriptor (%s) has already been exist.",
+            getContainerData().getContainerID(),
+            getContainerFile().getAbsolutePath());
+        throw new IOException(errorMessage);
+      }
+      //copy the values from the input stream to the final destination
+      // directory.
+      byte[] descriptorContent = packer.unpackContainerData(this, input);
+
+      Preconditions.checkNotNull(descriptorContent,
+          "Container descriptor is missing from the container archive: "
+              + getContainerData().getContainerID());
+
+      //now, we have extracted the container descriptor from the previous
+      //datanode. We can load it and upload it with the current data
+      // (original metadata + current filepath fields)
+      KeyValueContainerData originalContainerData =
+          (KeyValueContainerData) ContainerDataYaml
+              .readContainer(descriptorContent);
+
+
+      containerData.setState(originalContainerData.getState());
+      containerData
+          .setContainerDBType(originalContainerData.getContainerDBType());
+      containerData.setBytesUsed(originalContainerData.getBytesUsed());
+
+      //rewriting the yaml file with new checksum calculation.
+      update(originalContainerData.getMetadata(), true);
+
+      //fill in memory stat counter (keycount, byte usage)
+      KeyValueContainerUtil.parseKVContainerData(containerData, config);
+
+    } catch (Exception ex) {
+      //delete all the temporary data in case of any exception.
+      try {
+        FileUtils.deleteDirectory(new File(containerData.getMetadataPath()));
+        FileUtils.deleteDirectory(new File(containerData.getChunksPath()));
+        FileUtils.deleteDirectory(getContainerFile());
+      } catch (Exception deleteex) {
+        LOG.error(
+            "Can not cleanup destination directories after a container import"
+                + " error (cid" +
+                containerData.getContainerID() + ")", deleteex);
+      }
+      throw ex;
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  @Override
+  public void exportContainerData(OutputStream destination,
+      ContainerPacker<KeyValueContainerData> packer) throws IOException {
+    if (getContainerData().getState() != ContainerLifeCycleState.CLOSED) {
+      throw new IllegalStateException(
+          "Only closed containers could be exported: ContainerId="
+              + getContainerData().getContainerID());
+    }
+    packer.pack(this, destination);
+  }
+
   /**
    * Acquire read lock.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 9ddb474..29c359e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.OpenContainerBlockMap;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
@@ -162,7 +163,8 @@ public class KeyValueHandler extends Handler {
     return volumeChoosingPolicy;
   }
   /**
-   * Returns OpenContainerBlockMap instance
+   * Returns OpenContainerBlockMap instance.
+   *
    * @return OpenContainerBlockMap
    */
   public OpenContainerBlockMap getOpenContainerBlockMap() {
@@ -269,6 +271,19 @@ public class KeyValueHandler extends Handler {
     return ContainerUtils.getSuccessResponse(request);
   }
 
+  public void populateContainerPathFields(KeyValueContainer container,
+      long maxSize) throws IOException {
+    volumeSet.acquireLock();
+    try {
+      HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
+          .getVolumesList(), maxSize);
+      String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+      container.populatePathFields(scmID, containerVolume, hddsVolumeDir);
+    } finally {
+      volumeSet.releaseLock();
+    }
+  }
+
   /**
    * Handles Read Container Request. Returns the ContainerData as response.
    */
@@ -322,7 +337,7 @@ public class KeyValueHandler extends Handler {
    * Open containers cannot be deleted.
    * Holds writeLock on ContainerSet till the container is removed from
    * containerMap. On disk deletion of container files will happen
-   * asynchornously without the lock.
+   * asynchronously without the lock.
    */
   ContainerCommandResponseProto handleDeleteContainer(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
new file mode 100644
index 0000000..13689a7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.ArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Compress/uncompress KeyValueContainer data to a tar.gz archive.
+ */
+public class TarContainerPacker
+    implements ContainerPacker<KeyValueContainerData> {
+
+  private static final String CHUNKS_DIR_NAME = OzoneConsts.STORAGE_DIR_CHUNKS;
+
+  private static final String DB_DIR_NAME = "db";
+
+  private static final String CONTAINER_FILE_NAME = "container.yaml";
+
+
+
+  /**
+   * Given an input stream (tar file) extract the data to the specified
+   * directories.
+   *
+   * @param container container which defines the destination structure.
+   * @param inputStream the input stream.
+   * @throws IOException
+   */
+  @Override
+  public byte[] unpackContainerData(Container<KeyValueContainerData> container,
+      InputStream inputStream)
+      throws IOException {
+    byte[] descriptorFileContent = null;
+    try {
+      KeyValueContainerData containerData = container.getContainerData();
+      CompressorInputStream compressorInputStream =
+          new CompressorStreamFactory()
+              .createCompressorInputStream(CompressorStreamFactory.GZIP,
+                  inputStream);
+
+      TarArchiveInputStream tarInput =
+          new TarArchiveInputStream(compressorInputStream);
+
+      TarArchiveEntry entry = tarInput.getNextTarEntry();
+      while (entry != null) {
+        String name = entry.getName();
+        if (name.startsWith(DB_DIR_NAME + "/")) {
+          Path destinationPath = containerData.getDbFile().toPath()
+              .resolve(name.substring(DB_DIR_NAME.length() + 1));
+          extractEntry(tarInput, entry.getSize(), destinationPath);
+        } else if (name.startsWith(CHUNKS_DIR_NAME + "/")) {
+          Path destinationPath = Paths.get(containerData.getChunksPath())
+              .resolve(name.substring(CHUNKS_DIR_NAME.length() + 1));
+          extractEntry(tarInput, entry.getSize(), destinationPath);
+        } else if (name.equals(CONTAINER_FILE_NAME)) {
+          //Don't do anything. Container file should be unpacked in a
+          //separated step by unpackContainerDescriptor call.
+          descriptorFileContent = readEntry(tarInput, entry);
+        } else {
+          throw new IllegalArgumentException(
+              "Unknown entry in the tar file: " + "" + name);
+        }
+        entry = tarInput.getNextTarEntry();
+      }
+      return descriptorFileContent;
+
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't uncompress the given container: " + container
+              .getContainerData().getContainerID(),
+          e);
+    }
+  }
+
+  private void extractEntry(TarArchiveInputStream tarInput, long size,
+      Path path) throws IOException {
+    Preconditions.checkNotNull(path, "Path element should not be null");
+    Path parent = Preconditions.checkNotNull(path.getParent(),
+        "Path element should have a parent directory");
+    Files.createDirectories(parent);
+    try (BufferedOutputStream bos = new BufferedOutputStream(
+        new FileOutputStream(path.toAbsolutePath().toString()))) {
+      int bufferSize = 1024;
+      byte[] buffer = new byte[bufferSize + 1];
+      long remaining = size;
+      while (remaining > 0) {
+        int read =
+            tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
+        if (read >= 0) {
+          remaining -= read;
+          bos.write(buffer, 0, read);
+        } else {
+          remaining = 0;
+        }
+      }
+    }
+
+  }
+
+  /**
+   * Given a containerData include all the required container data/metadata
+   * in a tar file.
+   *
+   * @param container Container to archive (data + metadata).
+   * @param destination   Destination tar file/stream.
+   * @throws IOException
+   */
+  @Override
+  public void pack(Container<KeyValueContainerData> container,
+      OutputStream destination)
+      throws IOException {
+
+    KeyValueContainerData containerData = container.getContainerData();
+
+    try (CompressorOutputStream gzippedOut = new CompressorStreamFactory()
+          .createCompressorOutputStream(CompressorStreamFactory.GZIP,
+              destination)) {
+
+      try (ArchiveOutputStream archiveOutputStream = new TarArchiveOutputStream(
+          gzippedOut)) {
+
+        includePath(containerData.getDbFile().toString(), DB_DIR_NAME,
+            archiveOutputStream);
+
+        includePath(containerData.getChunksPath(), CHUNKS_DIR_NAME,
+            archiveOutputStream);
+
+        includeFile(container.getContainerFile(),
+            CONTAINER_FILE_NAME,
+            archiveOutputStream);
+      }
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't compress the container: " + containerData.getContainerID(),
+          e);
+    }
+
+  }
+
+  @Override
+  public byte[] unpackContainerDescriptor(InputStream inputStream)
+      throws IOException {
+    try {
+      CompressorInputStream compressorInputStream =
+          new CompressorStreamFactory()
+              .createCompressorInputStream(CompressorStreamFactory.GZIP,
+                  inputStream);
+
+      TarArchiveInputStream tarInput =
+          new TarArchiveInputStream(compressorInputStream);
+
+      TarArchiveEntry entry = tarInput.getNextTarEntry();
+      while (entry != null) {
+        String name = entry.getName();
+        if (name.equals(CONTAINER_FILE_NAME)) {
+          return readEntry(tarInput, entry);
+        }
+        entry = tarInput.getNextTarEntry();
+      }
+
+    } catch (CompressorException e) {
+      throw new IOException(
+          "Can't read the container descriptor from the container archive",
+          e);
+    }
+    throw new IOException(
+        "Container descriptor is missing from the container archive.");
+  }
+
+  private byte[] readEntry(TarArchiveInputStream tarInput,
+      TarArchiveEntry entry) throws IOException {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    int bufferSize = 1024;
+    byte[] buffer = new byte[bufferSize + 1];
+    long remaining = entry.getSize();
+    while (remaining > 0) {
+      int read =
+          tarInput.read(buffer, 0, (int) Math.min(remaining, bufferSize));
+      remaining -= read;
+      bos.write(buffer, 0, read);
+    }
+    return bos.toByteArray();
+  }
+
+  private void includePath(String containerPath, String subdir,
+      ArchiveOutputStream archiveOutputStream) throws IOException {
+
+    for (Path path : Files.list(Paths.get(containerPath))
+        .collect(Collectors.toList())) {
+
+      includeFile(path.toFile(), subdir + "/" + path.getFileName(),
+          archiveOutputStream);
+    }
+  }
+
+  private void includeFile(File file, String entryName,
+      ArchiveOutputStream archiveOutputStream) throws IOException {
+    ArchiveEntry archiveEntry =
+        archiveOutputStream.createArchiveEntry(file, entryName);
+    archiveOutputStream.putArchiveEntry(archiveEntry);
+    try (FileInputStream fis = new FileInputStream(file)) {
+      IOUtils.copy(fis, archiveOutputStream);
+    }
+    archiveOutputStream.closeArchiveEntry();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 2352cf6..ed4536f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -32,16 +36,12 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Class which defines utility methods for KeyValueContainer.
  */
@@ -157,7 +157,7 @@ public final class KeyValueContainerUtil {
    * @throws IOException
    */
   public static void parseKVContainerData(KeyValueContainerData kvContainerData,
-      OzoneConfiguration config) throws IOException {
+      Configuration config) throws IOException {
 
     long containerID = kvContainerData.getContainerID();
     File metadataPath = new File(kvContainerData.getMetadataPath());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
index 6ff2eca..7359868 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 
-
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerLifeCycleState;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -37,6 +38,8 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.utils.MetadataStore;
+
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -46,6 +49,8 @@ import org.mockito.Mockito;
 
 import java.io.File;
 
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -74,7 +79,6 @@ public class TestKeyValueContainer {
   private String scmId = UUID.randomUUID().toString();
   private VolumeSet volumeSet;
   private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
-  private long containerID = 1L;
   private KeyValueContainerData keyValueContainerData;
   private KeyValueContainer keyValueContainer;
 
@@ -141,13 +145,14 @@ public class TestKeyValueContainer {
 
   }
 
+  @SuppressWarnings("RedundantCast")
   @Test
   public void testCreateContainer() throws Exception {
 
     // Create Container.
     keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
 
-    keyValueContainerData = (KeyValueContainerData) keyValueContainer
+    keyValueContainerData = keyValueContainer
         .getContainerData();
 
     String containerMetaDataPath = keyValueContainerData
@@ -167,6 +172,86 @@ public class TestKeyValueContainer {
   }
 
   @Test
+  public void testContainerImportExport() throws Exception {
+
+    long containerId = keyValueContainer.getContainerData().getContainerID();
+    // Create Container.
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+
+    keyValueContainerData = keyValueContainer
+        .getContainerData();
+
+    keyValueContainerData.setState(ContainerLifeCycleState.CLOSED);
+
+    int numberOfKeysToWrite = 12;
+    //write one few keys to check the key count after import
+    MetadataStore metadataStore = KeyUtils.getDB(keyValueContainerData, conf);
+    for (int i = 0; i < numberOfKeysToWrite; i++) {
+      metadataStore.put(("test" + i).getBytes(), "test".getBytes());
+    }
+    metadataStore.close();
+
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put("key1", "value1");
+    keyValueContainer.update(metadata, true);
+
+    //destination path
+    File folderToExport = folder.newFile("exported.tar.gz");
+
+    TarContainerPacker packer = new TarContainerPacker();
+
+    //export the container
+    try (FileOutputStream fos = new FileOutputStream(folderToExport)) {
+      keyValueContainer
+          .exportContainerData(fos, packer);
+    }
+
+    //delete the original one
+    keyValueContainer.delete(true);
+
+    //create a new one
+    KeyValueContainerData containerData =
+        new KeyValueContainerData(containerId, 1,
+            keyValueContainerData.getMaxSizeGB());
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+
+    HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
+        .getVolumesList(), 1);
+    String hddsVolumeDir = containerVolume.getHddsRootDir().toString();
+
+    container.populatePathFields(scmId, containerVolume, hddsVolumeDir);
+    try (FileInputStream fis = new FileInputStream(folderToExport)) {
+      container.importContainerData(fis, packer);
+    }
+
+    Assert.assertEquals("value1", containerData.getMetadata().get("key1"));
+    Assert.assertEquals(keyValueContainerData.getContainerDBType(),
+        containerData.getContainerDBType());
+    Assert.assertEquals(keyValueContainerData.getState(),
+        containerData.getState());
+    Assert.assertEquals(numberOfKeysToWrite,
+        containerData.getKeyCount());
+    Assert.assertEquals(keyValueContainerData.getLayOutVersion(),
+        containerData.getLayOutVersion());
+    Assert.assertEquals(keyValueContainerData.getMaxSizeGB(),
+        containerData.getMaxSizeGB());
+    Assert.assertEquals(keyValueContainerData.getBytesUsed(),
+        containerData.getBytesUsed());
+
+    //Can't overwrite existing container
+    try {
+      try (FileInputStream fis = new FileInputStream(folderToExport)) {
+        container.importContainerData(fis, packer);
+      }
+      fail("Container is imported twice. Previous files are overwritten");
+    } catch (Exception ex) {
+      //all good
+    }
+
+  }
+
+  @Test
   public void testDuplicateContainer() throws Exception {
     try {
       // Create Container.
@@ -224,7 +309,7 @@ public class TestKeyValueContainer {
     keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
     keyValueContainer.close();
 
-    keyValueContainerData = (KeyValueContainerData) keyValueContainer
+    keyValueContainerData = keyValueContainer
         .getContainerData();
 
     assertEquals(ContainerProtos.ContainerLifeCycleState.CLOSED,
@@ -249,7 +334,7 @@ public class TestKeyValueContainer {
     metadata.put("OWNER", "hdfs");
     keyValueContainer.update(metadata, true);
 
-    keyValueContainerData = (KeyValueContainerData) keyValueContainer
+    keyValueContainerData = keyValueContainer
         .getContainerData();
 
     assertEquals(2, keyValueContainerData.getMetadata().size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca29fb75/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
new file mode 100644
index 0000000..a599f72
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestTarContainerPacker.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test the tar/untar for a given container.
+ */
+public class TestTarContainerPacker {
+
+  private static final String TEST_DB_FILE_NAME = "test1";
+
+  private static final String TEST_DB_FILE_CONTENT = "test1";
+
+  private static final String TEST_CHUNK_FILE_NAME = "chunk1";
+
+  private static final String TEST_CHUNK_FILE_CONTENT = "This is a chunk";
+
+  private static final String TEST_DESCRIPTOR_FILE_CONTENT = "descriptor";
+
+  private ContainerPacker packer = new TarContainerPacker();
+
+  private static final Path SOURCE_CONTAINER_ROOT =
+      Paths.get("target/test/data/packer-source-dir");
+
+  private static final Path DEST_CONTAINER_ROOT =
+      Paths.get("target/test/data/packer-dest-dir");
+
+  @BeforeClass
+  public static void init() throws IOException {
+    initDir(SOURCE_CONTAINER_ROOT);
+    initDir(DEST_CONTAINER_ROOT);
+  }
+
+  private static void initDir(Path path) throws IOException {
+    if (path.toFile().exists()) {
+      FileUtils.deleteDirectory(path.toFile());
+    }
+    path.toFile().mkdirs();
+  }
+
+  private KeyValueContainerData createContainer(long id, Path dir,
+      OzoneConfiguration conf) throws IOException {
+
+    Path containerDir = dir.resolve("container" + id);
+    Path dbDir = containerDir.resolve("db");
+    Path dataDir = containerDir.resolve("data");
+    Files.createDirectories(dbDir);
+    Files.createDirectories(dataDir);
+
+    KeyValueContainerData containerData = new KeyValueContainerData(id, -1);
+    containerData.setChunksPath(dataDir.toString());
+    containerData.setMetadataPath(dbDir.getParent().toString());
+    containerData.setDbFile(dbDir.toFile());
+
+
+    return containerData;
+  }
+
+  @Test
+  public void pack() throws IOException, CompressorException {
+
+    //GIVEN
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    KeyValueContainerData sourceContainerData =
+        createContainer(1L, SOURCE_CONTAINER_ROOT, conf);
+
+    KeyValueContainer sourceContainer =
+        new KeyValueContainer(sourceContainerData, conf);
+
+    //sample db file in the metadata directory
+    try (FileWriter writer = new FileWriter(
+        sourceContainerData.getDbFile().toPath()
+            .resolve(TEST_DB_FILE_NAME)
+            .toFile())) {
+      IOUtils.write(TEST_DB_FILE_CONTENT, writer);
+    }
+
+    //sample chunk file in the chunk directory
+    try (FileWriter writer = new FileWriter(
+        Paths.get(sourceContainerData.getChunksPath())
+            .resolve(TEST_CHUNK_FILE_NAME)
+            .toFile())) {
+      IOUtils.write(TEST_CHUNK_FILE_CONTENT, writer);
+    }
+
+    //sample container descriptor file
+    try (FileWriter writer = new FileWriter(
+        sourceContainer.getContainerFile())) {
+      IOUtils.write(TEST_DESCRIPTOR_FILE_CONTENT, writer);
+    }
+
+    Path targetFile =
+        SOURCE_CONTAINER_ROOT.getParent().resolve("container.tar.gz");
+
+    //WHEN: pack it
+    try (FileOutputStream output = new FileOutputStream(targetFile.toFile())) {
+      packer.pack(sourceContainer, output);
+    }
+
+    //THEN: check the result
+    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
+      CompressorInputStream uncompressed = new CompressorStreamFactory()
+          .createCompressorInputStream(CompressorStreamFactory.GZIP, input);
+      TarArchiveInputStream tarStream = new TarArchiveInputStream(uncompressed);
+
+      TarArchiveEntry entry;
+      Map<String, TarArchiveEntry> entries = new HashMap<>();
+      while ((entry = tarStream.getNextTarEntry()) != null) {
+        entries.put(entry.getName(), entry);
+      }
+
+      Assert.assertTrue(
+          entries.containsKey("container.yaml"));
+
+    }
+
+    //read the container descriptor only
+    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
+      String containerYaml = new String(packer.unpackContainerDescriptor(input),
+          Charset.forName(StandardCharsets.UTF_8.name()));
+      Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, containerYaml);
+    }
+
+    KeyValueContainerData destinationContainerData =
+        createContainer(2L, DEST_CONTAINER_ROOT, conf);
+
+    KeyValueContainer destinationContainer =
+        new KeyValueContainer(destinationContainerData, conf);
+
+    String descriptor = "";
+
+    //unpackContainerData
+    try (FileInputStream input = new FileInputStream(targetFile.toFile())) {
+      descriptor =
+          new String(packer.unpackContainerData(destinationContainer, input),
+              Charset.forName(StandardCharsets.UTF_8.name()));
+    }
+
+    assertExampleMetadataDbIsGood(
+        destinationContainerData.getDbFile().toPath());
+    assertExampleChunkFileIsGood(
+        Paths.get(destinationContainerData.getChunksPath()));
+    Assert.assertFalse(
+        "Descriptor file should not been exctarcted by the "
+            + "unpackContainerData Call",
+        destinationContainer.getContainerFile().exists());
+    Assert.assertEquals(TEST_DESCRIPTOR_FILE_CONTENT, descriptor);
+
+  }
+
+
+  private void assertExampleMetadataDbIsGood(Path dbPath)
+      throws IOException {
+
+    Path dbFile = dbPath.resolve(TEST_DB_FILE_NAME);
+
+    Assert.assertTrue(
+        "example DB file is missing after pack/unpackContainerData: " + dbFile,
+        Files.exists(dbFile));
+
+    try (FileInputStream testFile = new FileInputStream(dbFile.toFile())) {
+      List<String> strings = IOUtils
+          .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name()));
+      Assert.assertEquals(1, strings.size());
+      Assert.assertEquals(TEST_DB_FILE_CONTENT, strings.get(0));
+    }
+  }
+
+  private void assertExampleChunkFileIsGood(Path chunkDirPath)
+      throws IOException {
+
+    Path chunkFile = chunkDirPath.resolve(TEST_CHUNK_FILE_NAME);
+
+    Assert.assertTrue(
+        "example chunk file is missing after pack/unpackContainerData: "
+            + chunkFile,
+        Files.exists(chunkFile));
+
+    try (FileInputStream testFile = new FileInputStream(chunkFile.toFile())) {
+      List<String> strings = IOUtils
+          .readLines(testFile, Charset.forName(StandardCharsets.UTF_8.name()));
+      Assert.assertEquals(1, strings.size());
+      Assert.assertEquals(TEST_CHUNK_FILE_CONTENT, strings.get(0));
+    }
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org