You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:29 UTC

[20/37] hadoop git commit: HDDS-183:Integrate Volumeset, ContainerSet and HddsDispatcher. Contributed by Bharat Viswanadham

HDDS-183:Integrate Volumeset, ContainerSet and HddsDispatcher. Contributed by Bharat Viswanadham


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52d1d960
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52d1d960
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52d1d960

Branch: refs/heads/trunk
Commit: 52d1d9603ecc03dbe3ef5fafa60377ef461ecca3
Parents: 13579f9
Author: Bharat Viswanadham <bh...@apache.org>
Authored: Thu Jun 28 14:07:52 2018 -0700
Committer: Bharat Viswanadham <bh...@apache.org>
Committed: Thu Jun 28 14:07:52 2018 -0700

----------------------------------------------------------------------
 .../main/proto/DatanodeContainerProtocol.proto  |   1 +
 .../container/common/impl/ContainerData.java    |   1 +
 .../common/impl/ContainerDataYaml.java          | 276 +++++++++++++
 .../ozone/container/common/impl/Dispatcher.java |  11 +
 .../container/common/impl/HddsDispatcher.java   |  28 +-
 .../common/interfaces/ContainerDispatcher.java  |  14 +
 .../container/common/interfaces/Handler.java    |  15 +-
 .../statemachine/DatanodeStateMachine.java      |   4 +-
 .../background/BlockDeletingService.java        |   1 +
 .../DeleteBlocksCommandHandler.java             |  56 ++-
 .../states/datanode/RunningDatanodeState.java   |   3 +-
 .../states/endpoint/VersionEndpointTask.java    |  34 +-
 .../container/common/volume/HddsVolume.java     |   6 +
 .../container/common/volume/VolumeSet.java      |  48 +++
 .../container/keyvalue/KeyValueContainer.java   |  52 +--
 .../keyvalue/KeyValueContainerData.java         |  22 +-
 .../container/keyvalue/KeyValueHandler.java     |  22 +-
 .../ozone/container/keyvalue/KeyValueYaml.java  | 272 -------------
 .../keyvalue/helpers/KeyValueContainerUtil.java | 134 +++++++
 .../container/ozoneimpl/ContainerReader.java    | 157 ++++++++
 .../container/ozoneimpl/OzoneContainer.java     | 396 ++++++++-----------
 .../hadoop/ozone/protocol/VersionResponse.java  |   4 +
 .../ozone/container/common/SCMTestUtils.java    |  13 +-
 .../ozone/container/common/ScmTestMock.java     |   4 +
 .../common/TestKeyValueContainerData.java       |   3 +-
 .../common/impl/TestContainerDataYaml.java      | 163 ++++++++
 .../container/common/impl/TestContainerSet.java |   6 +-
 .../container/common/impl/TestKeyValueYaml.java | 160 --------
 .../common/interfaces/TestHandler.java          |   8 +-
 .../keyvalue/TestChunkManagerImpl.java          |   3 +-
 .../container/keyvalue/TestKeyManagerImpl.java  |   3 +-
 .../keyvalue/TestKeyValueContainer.java         |  10 +-
 .../container/keyvalue/TestKeyValueHandler.java |   8 +-
 .../container/ozoneimpl/TestOzoneContainer.java | 108 +++++
 .../hadoop/hdds/scm/node/SCMNodeManager.java    |   4 +
 .../ozone/container/common/TestEndPoint.java    |  20 +-
 .../TestStorageContainerManagerHelper.java      |  11 +-
 .../TestCloseContainerByPipeline.java           |   6 +-
 .../TestCloseContainerHandler.java              |   6 +-
 .../container/ozoneimpl/TestOzoneContainer.java |  13 +-
 .../container/server/TestContainerServer.java   |  10 +
 .../ozone/ksm/TestContainerReportWithKeys.java  |  14 +-
 .../hadoop/ozone/web/client/TestKeys.java       |  22 +-
 43 files changed, 1329 insertions(+), 823 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index d29e479..ff1582e 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -137,6 +137,7 @@ enum Result {
   CONTAINER_METADATA_ERROR = 31;
   CONTAINER_FILES_CREATE_ERROR = 32;
   CONTAINER_CHECKSUM_ERROR = 33;
+  UNKNOWN_CONTAINER_TYPE = 34;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 0bd7795..b11b66c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.common.impl;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
new file mode 100644
index 0000000..6b8e6ee
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.yaml.snakeyaml.Yaml;
+
+import java.beans.IntrospectionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.File;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Map;
+
+import org.yaml.snakeyaml.constructor.AbstractConstruct;
+import org.yaml.snakeyaml.constructor.Constructor;
+import org.yaml.snakeyaml.introspector.BeanAccess;
+import org.yaml.snakeyaml.introspector.Property;
+import org.yaml.snakeyaml.introspector.PropertyUtils;
+import org.yaml.snakeyaml.nodes.MappingNode;
+import org.yaml.snakeyaml.nodes.Node;
+import org.yaml.snakeyaml.nodes.ScalarNode;
+import org.yaml.snakeyaml.nodes.Tag;
+import org.yaml.snakeyaml.representer.Representer;
+
+import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_FIELDS;
+import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_TAG;
+
+/**
+ * Class for creating and reading .container files.
+ */
+
+public final class ContainerDataYaml {
+
+  private ContainerDataYaml() {
+
+  }
+  /**
+   * Creates a .container file in yaml format.
+   *
+   * @param containerFile
+   * @param containerData
+   * @throws IOException
+   */
+  public static void createContainerFile(ContainerProtos.ContainerType
+                                             containerType, File containerFile,
+                                         ContainerData containerData) throws
+      IOException {
+
+    Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
+    Preconditions.checkNotNull(containerData, "containerData cannot be null");
+    Preconditions.checkNotNull(containerType, "containerType cannot be null");
+
+    PropertyUtils propertyUtils = new PropertyUtils();
+    propertyUtils.setBeanAccess(BeanAccess.FIELD);
+    propertyUtils.setAllowReadOnlyProperties(true);
+
+    switch(containerType) {
+    case KeyValueContainer:
+      Representer representer = new ContainerDataRepresenter();
+      representer.setPropertyUtils(propertyUtils);
+      representer.addClassTag(KeyValueContainerData.class,
+          KeyValueContainerData.YAML_TAG);
+
+      Constructor keyValueDataConstructor = new ContainerDataConstructor();
+
+      Yaml yaml = new Yaml(keyValueDataConstructor, representer);
+      Writer writer = new OutputStreamWriter(new FileOutputStream(
+          containerFile), "UTF-8");
+      yaml.dump(containerData, writer);
+      writer.close();
+      break;
+    default:
+      throw new StorageContainerException("Unrecognized container Type " +
+          "format " + containerType, ContainerProtos.Result
+          .UNKNOWN_CONTAINER_TYPE);
+    }
+  }
+
+  /**
+   * Read the yaml file, and return containerData.
+   *
+   * @param containerFile
+   * @throws IOException
+   */
+  public static ContainerData readContainerFile(File containerFile)
+      throws IOException {
+    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+
+    InputStream input = null;
+    ContainerData containerData;
+    try {
+      PropertyUtils propertyUtils = new PropertyUtils();
+      propertyUtils.setBeanAccess(BeanAccess.FIELD);
+      propertyUtils.setAllowReadOnlyProperties(true);
+
+      Representer representer = new ContainerDataRepresenter();
+      representer.setPropertyUtils(propertyUtils);
+
+      Constructor containerDataConstructor = new ContainerDataConstructor();
+
+      Yaml yaml = new Yaml(containerDataConstructor, representer);
+      yaml.setBeanAccess(BeanAccess.FIELD);
+
+      input = new FileInputStream(containerFile);
+      containerData = (ContainerData)
+          yaml.load(input);
+    } finally {
+      if (input!= null) {
+        input.close();
+      }
+    }
+    return containerData;
+  }
+
+  /**
+   * Representer class to define which fields need to be stored in yaml file.
+   */
+  private static class ContainerDataRepresenter extends Representer {
+    @Override
+    protected Set<Property> getProperties(Class<? extends Object> type)
+        throws IntrospectionException {
+      Set<Property> set = super.getProperties(type);
+      Set<Property> filtered = new TreeSet<Property>();
+
+      // When a new Container type is added, we need to add what fields need
+      // to be filtered here
+      if (type.equals(KeyValueContainerData.class)) {
+        // filter properties
+        for (Property prop : set) {
+          String name = prop.getName();
+          if (YAML_FIELDS.contains(name)) {
+            filtered.add(prop);
+          }
+        }
+      }
+      return filtered;
+    }
+  }
+
+  /**
+   * Constructor class for KeyValueData, which will be used by Yaml.
+   */
+  private static class ContainerDataConstructor extends Constructor {
+    ContainerDataConstructor() {
+      //Adding our own specific constructors for tags.
+      // When a new Container type is added, we need to add yamlConstructor
+      // for that
+      this.yamlConstructors.put(YAML_TAG, new ConstructKeyValueContainerData());
+      this.yamlConstructors.put(Tag.INT, new ConstructLong());
+    }
+
+    private class ConstructKeyValueContainerData extends AbstractConstruct {
+      public Object construct(Node node) {
+        MappingNode mnode = (MappingNode) node;
+        Map<Object, Object> nodes = constructMapping(mnode);
+
+        //Needed this, as TAG.INT type is by default converted to Long.
+        long layOutVersion = (long) nodes.get("layOutVersion");
+        int lv = (int) layOutVersion;
+
+        //When a new field is added, it needs to be added here.
+        KeyValueContainerData kvData = new KeyValueContainerData((long) nodes
+            .get("containerId"), lv);
+        kvData.setContainerDBType((String)nodes.get("containerDBType"));
+        kvData.setMetadataPath((String) nodes.get(
+            "metadataPath"));
+        kvData.setChunksPath((String) nodes.get("chunksPath"));
+        Map<String, String> meta = (Map) nodes.get("metadata");
+        meta.forEach((key, val) -> {
+          try {
+            kvData.addMetadata(key, val);
+          } catch (IOException e) {
+            throw new IllegalStateException("Unexpected " +
+                "Key Value Pair " + "(" + key + "," + val +")in the metadata " +
+                "for containerId " + (long) nodes.get("containerId"));
+          }
+        });
+        String state = (String) nodes.get("state");
+        switch (state) {
+        case "OPEN":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
+          break;
+        case "CLOSING":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+          break;
+        case "CLOSED":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
+          break;
+        default:
+          throw new IllegalStateException("Unexpected " +
+              "ContainerLifeCycleState " + state + " for the containerId " +
+              (long) nodes.get("containerId"));
+        }
+        return kvData;
+      }
+    }
+
+    //Below code is taken from snake yaml, as snakeyaml tries to fit the
+    // number if it fits in integer, otherwise returns long. So, slightly
+    // modified the code to return long in all cases.
+    private class ConstructLong extends AbstractConstruct {
+      public Object construct(Node node) {
+        String value = constructScalar((ScalarNode) node).toString()
+            .replaceAll("_", "");
+        int sign = +1;
+        char first = value.charAt(0);
+        if (first == '-') {
+          sign = -1;
+          value = value.substring(1);
+        } else if (first == '+') {
+          value = value.substring(1);
+        }
+        int base = 10;
+        if ("0".equals(value)) {
+          return Long.valueOf(0);
+        } else if (value.startsWith("0b")) {
+          value = value.substring(2);
+          base = 2;
+        } else if (value.startsWith("0x")) {
+          value = value.substring(2);
+          base = 16;
+        } else if (value.startsWith("0")) {
+          value = value.substring(1);
+          base = 8;
+        } else if (value.indexOf(':') != -1) {
+          String[] digits = value.split(":");
+          int bes = 1;
+          int val = 0;
+          for (int i = 0, j = digits.length; i < j; i++) {
+            val += (Long.parseLong(digits[(j - i) - 1]) * bes);
+            bes *= 60;
+          }
+          return createNumber(sign, String.valueOf(val), 10);
+        } else {
+          return createNumber(sign, value, 10);
+        }
+        return createNumber(sign, value, base);
+      }
+    }
+
+    private Number createNumber(int sign, String number, int radix) {
+      Number result;
+      if (sign < 0) {
+        number = "-" + number;
+      }
+      result = Long.valueOf(number, radix);
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index b5fb08d..c485caf 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.container.common.helpers
@@ -91,6 +92,16 @@ public class Dispatcher implements ContainerDispatcher {
   }
 
   @Override
+  public Handler getHandler(ContainerProtos.ContainerType containerType) {
+    return null;
+  }
+
+  @Override
+  public void setScmId(String scmId) {
+    // DO nothing, this will be removed when cleanup.
+  }
+
+  @Override
   public ContainerCommandResponseProto dispatch(
       ContainerCommandRequestProto msg) {
     LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index e73b761..cbb48ec 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -52,24 +52,23 @@ public class HddsDispatcher implements ContainerDispatcher {
   private final Configuration conf;
   private final ContainerSet containerSet;
   private final VolumeSet volumeSet;
-  private final String scmID;
+  private String scmID;
 
   /**
    * Constructs an OzoneContainer that receives calls from
    * XceiverServerHandler.
    */
   public HddsDispatcher(Configuration config, ContainerSet contSet,
-      VolumeSet volumes, String scmId) {
-    // TODO: Pass ContainerSet, VolumeSet and scmID, intialize metrics
+      VolumeSet volumes) {
+    //TODO: initialize metrics
     this.conf = config;
     this.containerSet = contSet;
     this.volumeSet = volumes;
-    this.scmID = scmId;
     this.handlers = Maps.newHashMap();
     for (ContainerType containerType : ContainerType.values()) {
       handlers.put(containerType,
           Handler.getHandlerForContainerType(
-              containerType, conf, containerSet, volumeSet, scmID));
+              containerType, conf, containerSet, volumeSet));
     }
   }
 
@@ -103,7 +102,7 @@ public class HddsDispatcher implements ContainerDispatcher {
       return ContainerUtils.logAndReturnError(LOG, ex, msg);
     }
 
-    Handler handler = getHandlerForContainerType(containerType);
+    Handler handler = getHandler(containerType);
     if (handler == null) {
       StorageContainerException ex = new StorageContainerException("Invalid " +
           "ContainerType " + containerType,
@@ -113,9 +112,20 @@ public class HddsDispatcher implements ContainerDispatcher {
     return handler.handle(msg, container);
   }
 
-  @VisibleForTesting
-  public Handler getHandlerForContainerType(ContainerType type) {
-    return handlers.get(type);
+  @Override
+  public Handler getHandler(ContainerProtos.ContainerType containerType) {
+    return handlers.get(containerType);
+  }
+
+  @Override
+  public void setScmId(String scmId) {
+    Preconditions.checkNotNull(scmId, "scmId Cannot be null");
+    if (this.scmID == null) {
+      this.scmID = scmId;
+      for (Map.Entry<ContainerType, Handler> handlerMap : handlers.entrySet()) {
+        handlerMap.getValue().setScmID(scmID);
+      }
+    }
   }
 
   private long getContainerID(ContainerCommandRequestProto request)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 7e12614..18644bb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -48,4 +49,17 @@ public interface ContainerDispatcher {
    * Shutdown Dispatcher services.
    */
   void shutdown();
+
+  /**
+   * Returns the handler for the specified containerType.
+   * @param containerType
+   * @return
+   */
+  Handler getHandler(ContainerProtos.ContainerType containerType);
+
+  /**
+   * If scmId is not set, this will set scmId, otherwise it is a no-op.
+   * @param scmId
+   */
+  void setScmId(String scmId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index d08ad74..8069d71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 
-import java.io.IOException;
 
 /**
  * Dispatcher sends ContainerCommandRequests to Handler. Each Container Type
@@ -42,22 +41,20 @@ public class Handler {
   protected final Configuration conf;
   protected final ContainerSet containerSet;
   protected final VolumeSet volumeSet;
-  protected final String scmID;
+  protected String scmID;
 
   protected Handler(Configuration config, ContainerSet contSet,
-      VolumeSet volumeSet, String scmID) {
+      VolumeSet volumeSet) {
     conf = config;
     containerSet = contSet;
     this.volumeSet = volumeSet;
-    this.scmID = scmID;
   }
 
   public static Handler getHandlerForContainerType(ContainerType containerType,
-      Configuration config, ContainerSet contSet, VolumeSet volumeSet,
-      String scmID) {
+      Configuration config, ContainerSet contSet, VolumeSet volumeSet) {
     switch (containerType) {
     case KeyValueContainer:
-      return KeyValueHandler.getInstance(config, contSet, volumeSet, scmID);
+      return KeyValueHandler.getInstance(config, contSet, volumeSet);
     default:
       throw new IllegalArgumentException("Handler for ContainerType: " +
         containerType + "doesn't exist.");
@@ -68,4 +65,8 @@ public class Handler {
       ContainerCommandRequestProto msg, Container container) {
     return null;
   }
+
+  public void setScmID(String scmId) {
+    this.scmID = scmId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index dc4e673..b6a9bb9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -93,8 +93,8 @@ public class DatanodeStateMachine implements Closeable {
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
         .addHandler(new CloseContainerCommandHandler())
-        .addHandler(new DeleteBlocksCommandHandler(
-            container.getContainerManager(), conf))
+        .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
+            conf))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
index 63f57b4..50dea0a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -97,6 +97,7 @@ public class BlockDeletingService extends BackgroundService{
         OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
   }
 
+
   @Override
   public BackgroundTaskQueue getTasks() {
     BackgroundTaskQueue queue = new BackgroundTaskQueue();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index f954d98..4fc1cd9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -18,8 +18,10 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
@@ -29,11 +31,13 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers
     .DeletedContainerBlocksSummary;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine
@@ -51,6 +55,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+
 /**
  * Handle block deletion commands.
  */
@@ -59,14 +65,14 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
 
-  private ContainerManager containerManager;
-  private Configuration conf;
+  private final ContainerSet containerSet;
+  private final Configuration conf;
   private int invocationCount;
   private long totalTime;
 
-  public DeleteBlocksCommandHandler(ContainerManager containerManager,
+  public DeleteBlocksCommandHandler(ContainerSet cset,
       Configuration conf) {
-    this.containerManager = containerManager;
+    this.containerSet = cset;
     this.conf = conf;
   }
 
@@ -105,8 +111,24 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
           DeleteBlockTransactionResult.newBuilder();
       txResultBuilder.setTxID(entry.getTxID());
       try {
-        deleteContainerBlocks(entry, conf);
-        txResultBuilder.setSuccess(true);
+        long containerId = entry.getContainerID();
+        Container cont = containerSet.getContainer(containerId);
+        if(cont == null) {
+          throw new StorageContainerException("Unable to find the container "
+              + containerId, CONTAINER_NOT_FOUND);
+        }
+        ContainerProtos.ContainerType containerType = cont.getContainerType();
+        switch (containerType) {
+        case KeyValueContainer:
+          KeyValueContainerData containerData = (KeyValueContainerData)
+              cont.getContainerData();
+          deleteKeyValueContainerBlocks(containerData, entry);
+          txResultBuilder.setSuccess(true);
+          break;
+        default:
+          LOG.error("Delete Blocks Command Handler is not implemented for " +
+              "containerType {}", containerType);
+        }
       } catch (IOException e) {
         LOG.warn("Failed to delete blocks for container={}, TXID={}",
             entry.getContainerID(), entry.getTxID(), e);
@@ -145,21 +167,21 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
    * Move a bunch of blocks from a container to deleting state.
    * This is a meta update, the actual deletes happen in async mode.
    *
+   * @param containerData - KeyValueContainerData
    * @param delTX a block deletion transaction.
-   * @param config configuration.
    * @throws IOException if I/O error occurs.
    */
-  private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
-      Configuration config) throws IOException {
+  private void deleteKeyValueContainerBlocks(
+      KeyValueContainerData containerData, DeletedBlocksTransaction delTX)
+      throws IOException {
     long containerId = delTX.getContainerID();
-    ContainerData containerInfo = containerManager.readContainer(containerId);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerInfo.getDBPath());
+          containerData.getMetadataPath());
     }
 
     int newDeletionBlocks = 0;
-    MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
+    MetadataStore containerDB = KeyUtils.getDB(containerData, conf);
     for (Long blk : delTX.getLocalIDList()) {
       BatchOperation batch = new BatchOperation();
       byte[] blkBytes = Longs.toByteArray(blk);
@@ -187,12 +209,12 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
                 + " container {}, skip deleting it.", blk, containerId);
       }
       containerDB.put(DFSUtil.string2Bytes(
-          OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + delTX.getContainerID()),
+          OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerId),
           Longs.toByteArray(delTX.getTxID()));
     }
 
     // update pending deletion blocks count in in-memory container status
-    containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
+    containerData.incrPendingDeletionBlocks(newDeletionBlocks);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
index 3e11d12..1758c03 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java
@@ -95,7 +95,8 @@ public class RunningDatanodeState implements DatanodeState {
       getEndPointTask(EndpointStateMachine endpoint) {
     switch (endpoint.getState()) {
     case GETVERSION:
-      return new VersionEndpointTask(endpoint, conf);
+      return new VersionEndpointTask(endpoint, conf, context.getParent()
+          .getContainer());
     case REGISTER:
       return  RegisterEndpointTask.newBuilder()
           .setConfig(conf)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
index b048ee5..e4cb4d5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java
@@ -16,14 +16,22 @@
  */
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.VersionResponse;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 
 /**
@@ -33,11 +41,13 @@ public class VersionEndpointTask implements
     Callable<EndpointStateMachine.EndPointStates> {
   private final EndpointStateMachine rpcEndPoint;
   private final Configuration configuration;
+  private final OzoneContainer ozoneContainer;
 
   public VersionEndpointTask(EndpointStateMachine rpcEndPoint,
-      Configuration conf) {
+                             Configuration conf, OzoneContainer container) {
     this.rpcEndPoint = rpcEndPoint;
     this.configuration = conf;
+    this.ozoneContainer = container;
   }
 
   /**
@@ -52,7 +62,27 @@ public class VersionEndpointTask implements
     try{
       SCMVersionResponseProto versionResponse =
           rpcEndPoint.getEndPoint().getVersion(null);
-      rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
+      VersionResponse response = VersionResponse.getFromProtobuf(
+          versionResponse);
+      rpcEndPoint.setVersion(response);
+      VolumeSet volumeSet = ozoneContainer.getVolumeSet();
+      Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
+      List<HddsProtos.KeyValue> keyValues =  versionResponse.getKeysList();
+
+      String scmId = response.getValue(OzoneConsts.SCM_ID);
+      String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
+
+      Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
+          "null");
+      Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" +
+          " null");
+
+      // If version file does not exist create version file and also set scmId
+      for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
+        HddsVolume hddsVolume = entry.getValue();
+        hddsVolume.format(clusterId);
+        ozoneContainer.getDispatcher().setScmId(scmId);
+      }
 
       EndpointStateMachine.EndPointStates nextState =
           rpcEndPoint.getState().getNextState();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
index 788e2cf..9006133 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -130,6 +130,10 @@ public final class HddsVolume {
     initialize();
   }
 
+  public VolumeInfo getVolumeInfo() {
+    return volumeInfo;
+  }
+
   /**
    * Initializes the volume.
    * Creates the Version file if not present,
@@ -327,4 +331,6 @@ public final class HddsVolume {
   public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
     volumeInfo.setScmUsageForTesting(scmUsageForTest);
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
index 9e052b0..e35becd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -27,8 +27,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
 import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
@@ -309,4 +314,47 @@ public class VolumeSet {
   public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
     return ImmutableMap.copyOf(volumeStateMap);
   }
+
+  public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport()
+      throws IOException {
+    boolean failed;
+    StorageLocationReport[] reports =
+        new StorageLocationReport[volumeMap.size()];
+    int counter = 0;
+    for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
+      HddsVolume hddsVolume = entry.getValue();
+      VolumeInfo volumeInfo = hddsVolume.getVolumeInfo();
+      long scmUsed = 0;
+      long remaining = 0;
+      failed = false;
+      try {
+        scmUsed = volumeInfo.getScmUsed();
+        remaining = volumeInfo.getAvailable();
+      } catch (IOException ex) {
+        LOG.warn("Failed to get scmUsed and remaining for container " +
+            "storage location {}", volumeInfo.getRootDir());
+        // reset scmUsed and remaining if df/du failed.
+        scmUsed = 0;
+        remaining = 0;
+        failed = true;
+      }
+
+      StorageLocationReport.Builder builder =
+          StorageLocationReport.newBuilder();
+      builder.setStorageLocation(volumeInfo.getRootDir())
+          .setId(hddsVolume.getStorageID())
+          .setFailed(failed)
+          .setCapacity(hddsVolume.getCapacity())
+          .setRemaining(remaining)
+          .setScmUsed(scmUsed)
+          .setStorageType(hddsVolume.getStorageType());
+      StorageLocationReport r = builder.build();
+      reports[counter++] = r;
+    }
+    NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
+    for (int i = 0; i < reports.length; i++) {
+      nrb.addStorageReport(reports[i].getProtoBufMessage());
+    }
+    return nrb.build();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index a1cbb4e..553e3f5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.container.keyvalue;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
@@ -33,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -47,22 +47,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_ALREADY_EXISTS;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_CHECKSUM_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_METADATA_ERROR;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.CONTAINER_INTERNAL_ERROR;
@@ -75,8 +70,6 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.INVALID_CONTAINER_STATE;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .Result.UNSUPPORTED_REQUEST;
 
 /**
@@ -198,10 +191,12 @@ public class KeyValueContainer implements Container {
     try {
       tempContainerFile = createTempFile(containerFile);
       tempCheckSumFile = createTempFile(containerCheckSumFile);
-      KeyValueYaml.createContainerFile(tempContainerFile, containerData);
+      ContainerDataYaml.createContainerFile(ContainerProtos.ContainerType
+              .KeyValueContainer, tempContainerFile, containerData);
 
       //Compute Checksum for container file
-      String checksum = computeCheckSum(tempContainerFile);
+      String checksum = KeyValueContainerUtil.computeCheckSum(containerId,
+          tempContainerFile);
       containerCheckSumStream = new FileOutputStream(tempCheckSumFile);
       writer = new OutputStreamWriter(containerCheckSumStream, "UTF-8");
       writer.write(checksum);
@@ -308,43 +303,6 @@ public class KeyValueContainer implements Container {
   }
 
 
-  /**
-   * Compute checksum of the .container file.
-   * @param containerFile
-   * @throws StorageContainerException
-   */
-  private String computeCheckSum(File containerFile) throws
-      StorageContainerException {
-
-    MessageDigest sha;
-    FileInputStream containerFileStream = null;
-    try {
-      sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    } catch (NoSuchAlgorithmException e) {
-      throw new StorageContainerException("Unable to create Message Digest,"
-          + " usually this is a java configuration issue.",
-          NO_SUCH_ALGORITHM);
-    }
-
-    try {
-      containerFileStream = new FileInputStream(containerFile);
-      byte[] byteArray = new byte[1024];
-      int bytesCount = 0;
-
-      while ((bytesCount = containerFileStream.read(byteArray)) != -1) {
-        sha.update(byteArray, 0, bytesCount);
-      }
-      String checksum = DigestUtils.sha256Hex(sha.digest());
-      return checksum;
-    } catch (IOException ex) {
-      throw new StorageContainerException("Error during update of " +
-          "check sum file. Container Name: " + containerData.getContainerId(),
-          ex, CONTAINER_CHECKSUM_ERROR);
-    } finally {
-      IOUtils.closeStream(containerFileStream);
-    }
-  }
-
   @Override
   public void delete(boolean forceDelete)
       throws StorageContainerException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 8da4084..3b24468 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.yaml.snakeyaml.nodes.Tag;
 
 
 import java.io.File;
-import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,6 +35,14 @@ import java.util.Map;
  */
 public class KeyValueContainerData extends ContainerData {
 
+  // Yaml Tag used for KeyValueContainerData.
+  public static final Tag YAML_TAG = new Tag("KeyValueContainerData");
+
+  // Fields need to be stored in .container file.
+  public static final List<String> YAML_FIELDS = Lists.newArrayList(
+      "containerType", "containerId", "layOutVersion", "state", "metadata",
+      "metadataPath", "chunksPath", "containerDBType");
+
   // Path to Container metadata Level DB/RocksDB Store and .container file.
   private String metadataPath;
 
@@ -49,23 +59,21 @@ public class KeyValueContainerData extends ContainerData {
 
   /**
    * Constructs KeyValueContainerData object.
-   * @param type - containerType
    * @param id - ContainerId
    */
-  public KeyValueContainerData(ContainerProtos.ContainerType type, long id) {
-    super(type, id);
+  public KeyValueContainerData(long id) {
+    super(ContainerProtos.ContainerType.KeyValueContainer, id);
     this.numPendingDeletionBlocks = 0;
   }
 
   /**
    * Constructs KeyValueContainerData object.
-   * @param type - containerType
    * @param id - ContainerId
    * @param layOutVersion
    */
-  public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
+  public KeyValueContainerData(long id,
                                int layOutVersion) {
-    super(type, id, layOutVersion);
+    super(ContainerProtos.ContainerType.KeyValueContainer, id, layOutVersion);
     this.numPendingDeletionBlocks = 0;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index d9ee7fd..ffe0f21 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.sun.jersey.spi.resource.Singleton;
@@ -93,16 +94,16 @@ public class KeyValueHandler extends Handler {
   // TODO : Add metrics and populate it.
 
   public static KeyValueHandler getInstance(Configuration config,
-      ContainerSet contSet, VolumeSet volSet, String scmID) {
+      ContainerSet contSet, VolumeSet volSet) {
     if (INSTANCE == null) {
-      INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID);
+      INSTANCE = new KeyValueHandler(config, contSet, volSet);
     }
     return INSTANCE;
   }
 
   private KeyValueHandler(Configuration config, ContainerSet contSet,
-      VolumeSet volSet, String scmID) {
-    super(config, contSet, volSet, scmID);
+      VolumeSet volSet) {
+    super(config, contSet, volSet);
     containerType = ContainerType.KeyValueContainer;
     keyManager = new KeyManagerImpl(config);
     chunkManager = new ChunkManagerImpl();
@@ -156,6 +157,16 @@ public class KeyValueHandler extends Handler {
     return null;
   }
 
+  @VisibleForTesting
+  public ChunkManager getChunkManager() {
+    return this.chunkManager;
+  }
+
+  @VisibleForTesting
+  public KeyManager getKeyManager() {
+    return this.keyManager;
+  }
+
   /**
    * Handles Create Container Request. If successful, adds the container to
    * ContainerSet.
@@ -180,7 +191,7 @@ public class KeyValueHandler extends Handler {
     }
 
     KeyValueContainerData newContainerData = new KeyValueContainerData(
-        containerType, containerID);
+        containerID);
     // TODO: Add support to add metadataList to ContainerData. Add metadata
     // to container during creation.
     KeyValueContainer newContainer = new KeyValueContainer(
@@ -262,7 +273,6 @@ public class KeyValueHandler extends Handler {
 
     boolean forceDelete = request.getDeleteContainer().getForceDelete();
     kvContainer.writeLock();
-
     try {
       // Check if container is open
       if (kvContainer.getContainerData().isOpen()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
deleted file mode 100644
index 64f7152..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.yaml.snakeyaml.Yaml;
-
-import java.beans.IntrospectionException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.File;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.Map;
-
-import org.yaml.snakeyaml.constructor.AbstractConstruct;
-import org.yaml.snakeyaml.constructor.Constructor;
-import org.yaml.snakeyaml.introspector.BeanAccess;
-import org.yaml.snakeyaml.introspector.Property;
-import org.yaml.snakeyaml.introspector.PropertyUtils;
-import org.yaml.snakeyaml.nodes.MappingNode;
-import org.yaml.snakeyaml.nodes.Node;
-import org.yaml.snakeyaml.nodes.ScalarNode;
-import org.yaml.snakeyaml.nodes.Tag;
-import org.yaml.snakeyaml.representer.Representer;
-
-/**
- * Class for creating and reading .container files.
- */
-
-public final class KeyValueYaml {
-
-  private KeyValueYaml() {
-
-  }
-  /**
-   * Creates a .container file in yaml format.
-   *
-   * @param containerFile
-   * @param containerData
-   * @throws IOException
-   */
-  public static void createContainerFile(File containerFile, ContainerData
-      containerData) throws IOException {
-
-    Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
-    Preconditions.checkNotNull(containerData, "containerData cannot be null");
-
-    PropertyUtils propertyUtils = new PropertyUtils();
-    propertyUtils.setBeanAccess(BeanAccess.FIELD);
-    propertyUtils.setAllowReadOnlyProperties(true);
-
-    Representer representer = new KeyValueContainerDataRepresenter();
-    representer.setPropertyUtils(propertyUtils);
-    representer.addClassTag(
-        KeyValueContainerData.class, new Tag("KeyValueContainerData"));
-
-    Constructor keyValueDataConstructor = new KeyValueDataConstructor();
-
-    Yaml yaml = new Yaml(keyValueDataConstructor, representer);
-
-    Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile),
-        "UTF-8");
-    yaml.dump(containerData, writer);
-    writer.close();
-  }
-
-  /**
-   * Read the yaml file, and return containerData.
-   *
-   * @param containerFile
-   * @throws IOException
-   */
-  public static KeyValueContainerData readContainerFile(File containerFile)
-      throws IOException {
-    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
-
-    InputStream input = null;
-    KeyValueContainerData keyValueContainerData;
-    try {
-      PropertyUtils propertyUtils = new PropertyUtils();
-      propertyUtils.setBeanAccess(BeanAccess.FIELD);
-      propertyUtils.setAllowReadOnlyProperties(true);
-
-      Representer representer = new KeyValueContainerDataRepresenter();
-      representer.setPropertyUtils(propertyUtils);
-      representer.addClassTag(
-          KeyValueContainerData.class, new Tag("KeyValueContainerData"));
-
-      Constructor keyValueDataConstructor = new KeyValueDataConstructor();
-
-      Yaml yaml = new Yaml(keyValueDataConstructor, representer);
-      yaml.setBeanAccess(BeanAccess.FIELD);
-
-      input = new FileInputStream(containerFile);
-      keyValueContainerData = (KeyValueContainerData)
-          yaml.load(input);
-    } finally {
-      if (input!= null) {
-        input.close();
-      }
-    }
-    return keyValueContainerData;
-  }
-
-  /**
-   * Representer class to define which fields need to be stored in yaml file.
-   */
-  private static class KeyValueContainerDataRepresenter extends Representer {
-    @Override
-    protected Set<Property> getProperties(Class<? extends Object> type)
-        throws IntrospectionException {
-      Set<Property> set = super.getProperties(type);
-      Set<Property> filtered = new TreeSet<Property>();
-      if (type.equals(KeyValueContainerData.class)) {
-        // filter properties
-        for (Property prop : set) {
-          String name = prop.getName();
-          // When a new field needs to be added, it needs to be added here.
-          if (name.equals("containerType") || name.equals("containerId") ||
-              name.equals("layOutVersion") || name.equals("state") ||
-              name.equals("metadata") || name.equals("metadataPath") ||
-              name.equals("chunksPath") || name.equals(
-                  "containerDBType")) {
-            filtered.add(prop);
-          }
-        }
-      }
-      return filtered;
-    }
-  }
-
-  /**
-   * Constructor class for KeyValueData, which will be used by Yaml.
-   */
-  private static class KeyValueDataConstructor extends Constructor {
-    KeyValueDataConstructor() {
-      //Adding our own specific constructors for tags.
-      this.yamlConstructors.put(new Tag("KeyValueContainerData"),
-          new ConstructKeyValueContainerData());
-      this.yamlConstructors.put(Tag.INT, new ConstructLong());
-    }
-
-    private class ConstructKeyValueContainerData extends AbstractConstruct {
-      public Object construct(Node node) {
-        MappingNode mnode = (MappingNode) node;
-        Map<Object, Object> nodes = constructMapping(mnode);
-        String type = (String) nodes.get("containerType");
-
-        ContainerProtos.ContainerType containerType = ContainerProtos
-            .ContainerType.KeyValueContainer;
-        if (type.equals("KeyValueContainer")) {
-          containerType = ContainerProtos.ContainerType.KeyValueContainer;
-        }
-
-        //Needed this, as TAG.INT type is by default converted to Long.
-        long layOutVersion = (long) nodes.get("layOutVersion");
-        int lv = (int) layOutVersion;
-
-        //When a new field is added, it needs to be added here.
-        KeyValueContainerData kvData = new KeyValueContainerData(containerType,
-            (long) nodes.get("containerId"), lv);
-        kvData.setContainerDBType((String)nodes.get("containerDBType"));
-        kvData.setMetadataPath((String) nodes.get(
-            "metadataPath"));
-        kvData.setChunksPath((String) nodes.get("chunksPath"));
-        Map<String, String> meta = (Map) nodes.get("metadata");
-        meta.forEach((key, val) -> {
-          try {
-            kvData.addMetadata(key, val);
-          } catch (IOException e) {
-            throw new IllegalStateException("Unexpected " +
-                "Key Value Pair " + "(" + key + "," + val +")in the metadata " +
-                "for containerId " + (long) nodes.get("containerId"));
-          }
-        });
-        String state = (String) nodes.get("state");
-        switch (state) {
-        case "OPEN":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
-          break;
-        case "CLOSING":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
-          break;
-        case "CLOSED":
-          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
-          break;
-        default:
-          throw new IllegalStateException("Unexpected " +
-              "ContainerLifeCycleState " + state + " for the containerId " +
-              (long) nodes.get("containerId"));
-        }
-        return kvData;
-      }
-    }
-
-    //Below code is taken from snake yaml, as snakeyaml tries to fit the
-    // number if it fits in integer, otherwise returns long. So, slightly
-    // modified the code to return long in all cases.
-    private class ConstructLong extends AbstractConstruct {
-      public Object construct(Node node) {
-        String value = constructScalar((ScalarNode) node).toString()
-            .replaceAll("_", "");
-        int sign = +1;
-        char first = value.charAt(0);
-        if (first == '-') {
-          sign = -1;
-          value = value.substring(1);
-        } else if (first == '+') {
-          value = value.substring(1);
-        }
-        int base = 10;
-        if ("0".equals(value)) {
-          return Long.valueOf(0);
-        } else if (value.startsWith("0b")) {
-          value = value.substring(2);
-          base = 2;
-        } else if (value.startsWith("0x")) {
-          value = value.substring(2);
-          base = 16;
-        } else if (value.startsWith("0")) {
-          value = value.substring(1);
-          base = 8;
-        } else if (value.indexOf(':') != -1) {
-          String[] digits = value.split(":");
-          int bes = 1;
-          int val = 0;
-          for (int i = 0, j = digits.length; i < j; i++) {
-            val += (Long.parseLong(digits[(j - i) - 1]) * bes);
-            bes *= 60;
-          }
-          return createNumber(sign, String.valueOf(val), 10);
-        } else {
-          return createNumber(sign, value, 10);
-        }
-        return createNumber(sign, value, base);
-      }
-    }
-
-    private Number createNumber(int sign, String number, int radix) {
-      Number result;
-      if (sign < 0) {
-        number = "-" + number;
-      }
-      result = Long.valueOf(number, radix);
-      return result;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index b868f1d..029e94d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -28,15 +30,29 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
 
 /**
  * Class which defines utility methods for KeyValueContainer.
@@ -170,4 +186,122 @@ public final class KeyValueContainerUtil {
     builder.setReadContainer(response);
     return builder.build();
   }
+
+  /**
+   * Compute checksum of the .container file.
+   * @param containerId
+   * @param containerFile
+   * @throws StorageContainerException
+   */
+  public static String computeCheckSum(long containerId, File
+      containerFile) throws StorageContainerException {
+    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+    MessageDigest sha;
+    FileInputStream containerFileStream = null;
+    try {
+      sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    } catch (NoSuchAlgorithmException e) {
+      throw new StorageContainerException("Unable to create Message Digest, " +
+          "usually this is a java configuration issue.", NO_SUCH_ALGORITHM);
+    }
+    try {
+      containerFileStream = new FileInputStream(containerFile);
+      byte[] byteArray = new byte[1024];
+      int bytesCount = 0;
+      while ((bytesCount = containerFileStream.read(byteArray)) != -1) {
+        sha.update(byteArray, 0, bytesCount);
+      }
+      String checksum = DigestUtils.sha256Hex(sha.digest());
+      return checksum;
+    } catch (IOException ex) {
+      throw new StorageContainerException("Error during computing checksum: " +
+          "for container " + containerId, ex, CONTAINER_CHECKSUM_ERROR);
+    } finally {
+      IOUtils.closeStream(containerFileStream);
+    }
+  }
+
+  /**
+   * Verify checksum of the container.
+   * @param containerId
+   * @param checksumFile
+   * @param checksum
+   * @throws StorageContainerException
+   */
+  public static void verifyCheckSum(long containerId, File checksumFile,
+                                    String checksum)
+      throws StorageContainerException {
+    try {
+      Preconditions.checkNotNull(checksum);
+      Preconditions.checkNotNull(checksumFile);
+      Path path = Paths.get(checksumFile.getAbsolutePath());
+      List<String> fileCheckSum = Files.readAllLines(path);
+      Preconditions.checkState(fileCheckSum.size() == 1, "checksum " +
+          "should be 32 byte string");
+      if (!checksum.equals(fileCheckSum.get(0))) {
+        LOG.error("Checksum mismatch for the container {}", containerId);
+        throw new StorageContainerException("Checksum mismatch for " +
+            "the container " + containerId, CHECKSUM_MISMATCH);
+      }
+    } catch (StorageContainerException ex) {
+      throw ex;
+    } catch (IOException ex) {
+      LOG.error("Error during verify checksum for container {}", containerId);
+      throw new StorageContainerException("Error during verify checksum" +
+          " for container " + containerId, IO_EXCEPTION);
+    }
+  }
+
+  /**
+   * Parse KeyValueContainerData and verify checksum.
+   * @param containerData
+   * @param containerFile
+   * @param checksumFile
+   * @param dbFile
+   * @param config
+   * @throws IOException
+   */
+  public static void parseKeyValueContainerData(
+      KeyValueContainerData containerData, File containerFile, File
+      checksumFile, File dbFile, OzoneConfiguration config) throws IOException {
+
+    Preconditions.checkNotNull(containerData, "containerData cannot be null");
+    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+    Preconditions.checkNotNull(checksumFile, "checksumFile cannot be null");
+    Preconditions.checkNotNull(dbFile, "dbFile cannot be null");
+    Preconditions.checkNotNull(config, "ozone config cannot be null");
+
+    long containerId = containerData.getContainerId();
+    String containerName = String.valueOf(containerId);
+    File metadataPath = new File(containerData.getMetadataPath());
+
+    Preconditions.checkNotNull(containerName, "container Name cannot be " +
+        "null");
+    Preconditions.checkNotNull(metadataPath, "metadata path cannot be " +
+        "null");
+
+    // Verify Checksum
+    String checksum = KeyValueContainerUtil.computeCheckSum(
+        containerData.getContainerId(), containerFile);
+    KeyValueContainerUtil.verifyCheckSum(containerId, checksumFile, checksum);
+
+    containerData.setDbFile(dbFile);
+
+    MetadataStore metadata = KeyUtils.getDB(containerData, config);
+    long bytesUsed = 0;
+    List<Map.Entry<byte[], byte[]>> liveKeys = metadata
+        .getRangeKVs(null, Integer.MAX_VALUE,
+            MetadataKeyFilters.getNormalKeyFilter());
+    bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
+      KeyData keyData;
+      try {
+        keyData = KeyUtils.getKeyData(e.getValue());
+        return keyData.getSize();
+      } catch (IOException ex) {
+        return 0L;
+      }
+    }).sum();
+    containerData.setBytesUsed(bytesUsed);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52d1d960/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
new file mode 100644
index 0000000..68823bc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.ozoneimpl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+
+/**
+ * Class used to read .container files from Volume and build container map.
+ */
+public class ContainerReader implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ContainerReader.class);
+  private File hddsVolumeDir;
+  private final ContainerSet containerSet;
+  private final OzoneConfiguration config;
+
+  ContainerReader(File volumeRoot, ContainerSet cset, OzoneConfiguration conf) {
+    Preconditions.checkNotNull(volumeRoot);
+    this.hddsVolumeDir = volumeRoot;
+    this.containerSet = cset;
+    this.config = conf;
+  }
+
+  @Override
+  public void run() {
+    try {
+      readVolume(hddsVolumeDir);
+    } catch (RuntimeException ex) {
+      LOG.info("Caught an Run time exception during reading container files" +
+          " from Volume {}", hddsVolumeDir);
+    }
+  }
+
+  public void readVolume(File hddsVolumeRootDir) {
+    Preconditions.checkNotNull(hddsVolumeRootDir, "hddsVolumeRootDir" +
+        "cannot be null");
+
+
+    /**
+     *
+     * layout of the container directory on the disk.
+     * /hdds/<<scmUuid>>/current/<<containerdir>>/</containerID>/metadata
+     * /<<containerID>>.container
+     * /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/metadata
+     * /<<containerID>>.checksum
+     * /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/metadata
+     * /<<containerID>>.db
+     * /hdds/<<scmUuid>>/current/<<containerdir>>/<<containerID>>/chunks
+     * /<<chunkFile>>
+     *
+     **/
+
+    //filtering scm directory
+    File[] scmDir = hddsVolumeRootDir.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File pathname) {
+        return pathname.isDirectory();
+      }
+    });
+
+    for (File scmLoc : scmDir) {
+      File currentDir = null;
+      currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT);
+      File[] containerTopDirs = currentDir.listFiles();
+      if (containerTopDirs != null) {
+        for (File containerTopDir : containerTopDirs) {
+          if (containerTopDir.isDirectory()) {
+            File[] containerDirs = containerTopDir.listFiles();
+            for (File containerDir : containerDirs) {
+              File metadataPath = new File(containerDir + File.separator +
+                  OzoneConsts.CONTAINER_META_PATH);
+              String containerName = containerDir.getName();
+              if (metadataPath.exists()) {
+                File containerFile = KeyValueContainerLocationUtil
+                    .getContainerFile(metadataPath, containerName);
+                File checksumFile = KeyValueContainerLocationUtil
+                    .getContainerCheckSumFile(metadataPath, containerName);
+                File dbFile = KeyValueContainerLocationUtil
+                    .getContainerDBFile(metadataPath, containerName);
+                if (containerFile.exists() && checksumFile.exists() &&
+                    dbFile.exists()) {
+                  verifyContainerFile(containerFile, checksumFile, dbFile);
+                } else {
+                  LOG.error("Missing container metadata files for Container: " +
+                      "{}", containerName);
+                }
+              } else {
+                LOG.error("Missing container metadata directory for " +
+                    "Container: {}", containerName);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void verifyContainerFile(File containerFile, File checksumFile,
+                                   File dbFile) {
+    try {
+      ContainerData containerData =  ContainerDataYaml.readContainerFile(
+          containerFile);
+
+      switch (containerData.getContainerType()) {
+      case KeyValueContainer:
+        KeyValueContainerData keyValueContainerData = (KeyValueContainerData)
+            containerData;
+        KeyValueContainerUtil.parseKeyValueContainerData(keyValueContainerData,
+            containerFile, checksumFile, dbFile, config);
+        KeyValueContainer keyValueContainer = new KeyValueContainer(
+            keyValueContainerData, config);
+        containerSet.addContainer(keyValueContainer);
+        break;
+      default:
+        LOG.error("Unrecognized ContainerType {} format during verify " +
+            "ContainerFile", containerData.getContainerType());
+      }
+    } catch (IOException ex) {
+      LOG.error("Error during reading container file {}", containerFile);
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org