You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:26 UTC

[17/37] hadoop git commit: HDDS-173. Refactor Dispatcher and implement Handler for new ContainerIO design.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
deleted file mode 100644
index 87565ce..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
-
-/**
- * This class is for performing key related operations on the KeyValue
- * Container.
- */
-public class KeyManagerImpl implements KeyManager {
-
-  static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
-
-  private Configuration config;
-
-  /**
-   * Constructs a key Manager.
-   *
-   * @param conf - Ozone configuration
-   */
-  public KeyManagerImpl(Configuration conf) {
-    Preconditions.checkNotNull(conf, "Config cannot be null");
-    this.config = conf;
-  }
-
-  /**
-   * Puts or overwrites a key.
-   *
-   * @param container - Container for which key need to be added.
-   * @param data     - Key Data.
-   * @throws IOException
-   */
-  public void putKey(Container container, KeyData data) throws IOException {
-    Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
-        "operation.");
-    Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
-        "cannot be negative");
-    // We are not locking the key manager since LevelDb serializes all actions
-    // against a single DB. We rely on DB level locking to avoid conflicts.
-    MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
-        .getContainerData(), config);
-
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
-        .toByteArray());
-  }
-
-  /**
-   * Gets an existing key.
-   *
-   * @param container - Container from which key need to be get.
-   * @param data - Key Data.
-   * @return Key Data.
-   * @throws IOException
-   */
-  public KeyData getKey(Container container, KeyData data) throws IOException {
-    Preconditions.checkNotNull(data, "Key data cannot be null");
-    Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
-        " be null");
-    KeyValueContainerData containerData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = KeyUtils.getDB(containerData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the key.",
-          NO_SUCH_KEY);
-    }
-    ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
-    return KeyData.getFromProtoBuf(keyData);
-  }
-
-  /**
-   * Deletes an existing Key.
-   *
-   * @param container - Container from which key need to be deleted.
-   * @param blockID - ID of the block.
-   * @throws StorageContainerException
-   */
-  public void deleteKey(Container container, BlockID blockID) throws
-      IOException {
-    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
-    Preconditions.checkState(blockID.getContainerID() >= 0,
-        "Container ID cannot be negative.");
-    Preconditions.checkState(blockID.getLocalID() >= 0,
-        "Local ID cannot be negative.");
-
-    KeyValueContainerData cData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = KeyUtils.getDB(cData, config);
-    // This is a post condition that acts as a hint to the user.
-    // Should never fail.
-    Preconditions.checkNotNull(db, "DB cannot be null here");
-    // Note : There is a race condition here, since get and delete
-    // are not atomic. Leaving it here since the impact is refusing
-    // to delete a key which might have just gotten inserted after
-    // the get check.
-    byte[] kKey = Longs.toByteArray(blockID.getLocalID());
-    byte[] kData = db.get(kKey);
-    if (kData == null) {
-      throw new StorageContainerException("Unable to find the key.",
-          NO_SUCH_KEY);
-    }
-    db.delete(kKey);
-  }
-
-  /**
-   * List keys in a container.
-   *
-   * @param container - Container from which keys need to be listed.
-   * @param startLocalID  - Key to start from, 0 to begin.
-   * @param count    - Number of keys to return.
-   * @return List of Keys that match the criteria.
-   */
-  public List<KeyData> listKey(Container container, long startLocalID, int
-      count) throws IOException {
-    Preconditions.checkNotNull(container, "container cannot be null");
-    Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
-        "negative");
-    Preconditions.checkArgument(count > 0,
-        "Count must be a positive number.");
-    container.readLock();
-    List<KeyData> result = null;
-    KeyValueContainerData cData = (KeyValueContainerData) container
-        .getContainerData();
-    MetadataStore db = KeyUtils.getDB(cData, config);
-    result = new ArrayList<>();
-    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
-    List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
-        startKeyInBytes, count, null);
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      KeyData value = KeyUtils.getKeyData(entry.getValue());
-      KeyData data = new KeyData(value.getBlockID());
-      result.add(data);
-    }
-    return result;
-  }
-
-  /**
-   * Shutdown KeyValueContainerManager.
-   */
-  public void shutdown() {
-    KeyUtils.shutdownCache(ContainerCache.getInstance(config));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 740967b..a1cbb4e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -18,29 +18,29 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
-
 import com.google.common.base.Preconditions;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerLifeCycleState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+    .KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
@@ -116,9 +116,9 @@ public class KeyValueContainer implements Container {
     Preconditions.checkNotNull(scmId, "scmId cannot be null");
 
     File containerMetaDataPath = null;
+    //acquiring volumeset lock and container lock
+    volumeSet.acquireLock();
     try {
-      //acquiring volumeset lock and container lock
-      volumeSet.acquireLock();
       HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
           .getVolumesList(), containerMaxSize);
       String containerBasePath = containerVolume.getHddsRootDir().toString();
@@ -404,10 +404,19 @@ public class KeyValueContainer implements Container {
   }
 
   @Override
-  public ContainerData getContainerData()  {
+  public KeyValueContainerData getContainerData()  {
     return containerData;
   }
 
+  @Override
+  public ContainerLifeCycleState getContainerState() {
+    return containerData.getState();
+  }
+
+  @Override
+  public ContainerProtos.ContainerType getContainerType() {
+    return ContainerProtos.ContainerType.KeyValueContainer;
+  }
 
   @Override
   public void update(Map<String, String> metadata, boolean forceUpdate)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
new file mode 100644
index 0000000..8da4084
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class represents the KeyValueContainer metadata, which is the
+ * in-memory representation of container metadata and is represented on disk
+ * by the .container file.
+ */
+public class KeyValueContainerData extends ContainerData {
+
+  // Path to Container metadata Level DB/RocksDB Store and .container file.
+  private String metadataPath;
+
+  // Path to Physical file system where chunks are stored.
+  private String chunksPath;
+
+  //Type of DB used to store key to chunks mapping
+  private String containerDBType;
+
+  //Number of pending deletion blocks in container.
+  private int numPendingDeletionBlocks;
+
+  private File dbFile = null;
+
+  /**
+   * Constructs KeyValueContainerData object.
+   * @param type - containerType
+   * @param id - ContainerId
+   */
+  public KeyValueContainerData(ContainerProtos.ContainerType type, long id) {
+    super(type, id);
+    this.numPendingDeletionBlocks = 0;
+  }
+
+  /**
+   * Constructs KeyValueContainerData object.
+   * @param type - containerType
+   * @param id - ContainerId
+   * @param layOutVersion
+   */
+  public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
+                               int layOutVersion) {
+    super(type, id, layOutVersion);
+    this.numPendingDeletionBlocks = 0;
+  }
+
+
+  /**
+   * Sets Container dbFile. This should be called only during creation of
+   * KeyValue container.
+   * @param containerDbFile
+   */
+  public void setDbFile(File containerDbFile) {
+    dbFile = containerDbFile;
+  }
+
+  /**
+   * Returns container DB file.
+   * @return dbFile
+   */
+  public File getDbFile() {
+    return dbFile;
+  }
+  /**
+   * Returns container metadata path.
+   *
+   * @return - path
+   */
+  public String getMetadataPath() {
+    return metadataPath;
+  }
+
+  /**
+   * Sets container metadata path.
+   *
+   * @param path - String.
+   */
+  public void setMetadataPath(String path) {
+    this.metadataPath = path;
+  }
+
+  /**
+   * Get chunks path.
+   * @return - Physical path where container file and checksum is stored.
+   */
+  public String getChunksPath() {
+    return chunksPath;
+  }
+
+  /**
+   * Set chunks Path.
+   * @param chunkPath - File path.
+   */
+  public void setChunksPath(String chunkPath) {
+    this.chunksPath = chunkPath;
+  }
+
+  /**
+   * Returns the DBType used for the container.
+   * @return containerDBType
+   */
+  public String getContainerDBType() {
+    return containerDBType;
+  }
+
+  /**
+   * Sets the DBType used for the container.
+   * @param containerDBType
+   */
+  public void setContainerDBType(String containerDBType) {
+    this.containerDBType = containerDBType;
+  }
+
+  /**
+   * Returns the number of pending deletion blocks in container.
+   * @return numPendingDeletionBlocks
+   */
+  public int getNumPendingDeletionBlocks() {
+    return numPendingDeletionBlocks;
+  }
+
+
+  /**
+   * Increase the count of pending deletion blocks.
+   *
+   * @param numBlocks increment number
+   */
+  public void incrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks += numBlocks;
+  }
+
+  /**
+   * Decrease the count of pending deletion blocks.
+   *
+   * @param numBlocks decrement number
+   */
+  public void decrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks -= numBlocks;
+  }
+
+  /**
+   * Returns a ProtoBuf Message from ContainerData.
+   *
+   * @return Protocol Buffer Message
+   */
+  public ContainerProtos.ContainerData getProtoBufMessage() {
+    ContainerProtos.ContainerData.Builder builder = ContainerProtos
+        .ContainerData.newBuilder();
+    builder.setContainerID(this.getContainerId());
+    builder.setDbPath(this.getDbFile().getPath());
+    builder.setContainerPath(this.getMetadataPath());
+    builder.setState(this.getState());
+
+    for (Map.Entry<String, String> entry : getMetadata().entrySet()) {
+      ContainerProtos.KeyValue.Builder keyValBuilder =
+          ContainerProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+
+    if (this.getBytesUsed() >= 0) {
+      builder.setBytesUsed(this.getBytesUsed());
+    }
+
+    if(this.getContainerType() != null) {
+      builder.setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
+    }
+
+    if(this.getContainerDBType() != null) {
+      builder.setContainerDBType(containerDBType);
+    }
+
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
deleted file mode 100644
index 2c15c94..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.Storage;
-
-import java.io.File;
-
-/**
- * Class which provides utility methods for container locations.
- */
-public final class KeyValueContainerLocationUtil {
-
-  /* Never constructed. */
-  private KeyValueContainerLocationUtil() {
-
-  }
-  /**
-   * Returns Container Metadata Location.
-   * @param baseDir
-   * @param scmId
-   * @param containerId
-   * @return containerMetadata Path
-   */
-  public static File getContainerMetaDataPath(String baseDir, String scmId,
-                                              long containerId) {
-    String containerMetaDataPath = getBaseContainerLocation(baseDir, scmId,
-        containerId);
-    containerMetaDataPath = containerMetaDataPath + File.separator +
-        OzoneConsts.CONTAINER_META_PATH;
-    return new File(containerMetaDataPath);
-  }
-
-
-  /**
-   * Returns Container Chunks Location.
-   * @param baseDir
-   * @param scmId
-   * @param containerId
-   * @return chunksPath
-   */
-  public static File getChunksLocationPath(String baseDir, String scmId,
-                                           long containerId) {
-    String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
-        + File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
-    return new File(chunksPath);
-  }
-
-  /**
-   * Returns base directory for specified container.
-   * @param baseDir
-   * @param scmId
-   * @param containerId
-   * @return base directory for container.
-   */
-  private static String getBaseContainerLocation(String baseDir, String scmId,
-                                        long containerId) {
-    Preconditions.checkNotNull(baseDir, "Base Directory cannot be null");
-    Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
-    Preconditions.checkState(containerId >= 0,
-        "Container Id cannot be negative.");
-
-    String containerSubDirectory = getContainerSubDirectory(containerId);
-
-    String containerMetaDataPath = baseDir  + File.separator + scmId +
-        File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
-        containerSubDirectory + File.separator + containerId;
-
-    return containerMetaDataPath;
-  }
-
-  /**
-   * Returns subdirectory, where this container needs to be placed.
-   * @param containerId
-   * @return container sub directory
-   */
-  private static String getContainerSubDirectory(long containerId){
-    int directory = (int) ((containerId >> 9) & 0xFF);
-    return Storage.CONTAINER_DIR + directory;
-  }
-
-  /**
-   * Returns containerFile.
-   * @param containerMetaDataPath
-   * @param containerName
-   * @return .container File name
-   */
-  public static File getContainerFile(File containerMetaDataPath, String
-      containerName) {
-    Preconditions.checkNotNull(containerMetaDataPath);
-    Preconditions.checkNotNull(containerName);
-    return new File(containerMetaDataPath, containerName +
-        OzoneConsts.CONTAINER_EXTENSION);
-  }
-
-  /**
-   * Return containerDB File.
-   * @param containerMetaDataPath
-   * @param containerName
-   * @return containerDB File name
-   */
-  public static File getContainerDBFile(File containerMetaDataPath, String
-      containerName) {
-    Preconditions.checkNotNull(containerMetaDataPath);
-    Preconditions.checkNotNull(containerName);
-    return new File(containerMetaDataPath, containerName + OzoneConsts
-        .DN_CONTAINER_DB);
-  }
-
-  /**
-   * Returns container checksum file.
-   * @param containerMetaDataPath
-   * @param containerName
-   * @return container checksum file
-   */
-  public static File getContainerCheckSumFile(File containerMetaDataPath,
-                                              String containerName) {
-    Preconditions.checkNotNull(containerMetaDataPath);
-    Preconditions.checkNotNull(containerName);
-    return new File(containerMetaDataPath, containerName + OzoneConsts
-        .CONTAINER_FILE_CHECKSUM_EXTENSION);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
deleted file mode 100644
index 55e2ab0..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Class which defines utility methods for KeyValueContainer.
- */
-
-public final class KeyValueContainerUtil {
-
-  /* Never constructed. */
-  private KeyValueContainerUtil() {
-
-  }
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      KeyValueContainerUtil.class);
-
-
-  public static void verifyIsNewContainer(File containerFile) throws
-      FileAlreadyExistsException {
-    Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
-        "null");
-    if (containerFile.getParentFile().exists()) {
-      LOG.error("container already exists on disk. File: {}", containerFile
-          .toPath());
-      throw new FileAlreadyExistsException("container already exists on " +
-            "disk.");
-    }
-  }
-
-  /**
-   * creates metadata path, chunks path and  metadata DB for the specified
-   * container.
-   *
-   * @param containerMetaDataPath
-   * @throws IOException
-   */
-  public static void createContainerMetaData(File containerMetaDataPath, File
-      chunksPath, File dbFile, String containerName, Configuration conf) throws
-      IOException {
-    Preconditions.checkNotNull(containerMetaDataPath);
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkNotNull(conf);
-
-    if (!containerMetaDataPath.mkdirs()) {
-      LOG.error("Unable to create directory for metadata storage. Path: {}",
-          containerMetaDataPath);
-      throw new IOException("Unable to create directory for metadata storage." +
-          " Path: " + containerMetaDataPath);
-    }
-    MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
-        .setCreateIfMissing(true).setDbFile(dbFile).build();
-
-    // we close since the SCM pre-creates containers.
-    // we will open and put Db handle into a cache when keys are being created
-    // in a container.
-
-    store.close();
-
-    if (!chunksPath.mkdirs()) {
-      LOG.error("Unable to create chunks directory Container {}",
-          chunksPath);
-      //clean up container metadata path and metadata db
-      FileUtils.deleteDirectory(containerMetaDataPath);
-      FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
-      throw new IOException("Unable to create directory for data storage." +
-          " Path: " + chunksPath);
-    }
-  }
-
-  /**
-   * remove Container if it is empty.
-   * <p/>
-   * There are three things we need to delete.
-   * <p/>
-   * 1. Container file and metadata file. 2. The Level DB file 3. The path that
-   * we created on the data location.
-   *
-   * @param containerData - Data of the container to remove.
-   * @param conf - configuration of the cluster.
-   * @param forceDelete - whether this container should be deleted forcibly.
-   * @throws IOException
-   */
-  public static void removeContainer(KeyValueContainerData containerData,
-                                     Configuration conf, boolean forceDelete)
-      throws IOException {
-    Preconditions.checkNotNull(containerData);
-    File containerMetaDataPath = new File(containerData
-        .getMetadataPath());
-    File chunksPath = new File(containerData.getChunksPath());
-
-    MetadataStore db = KeyUtils.getDB(containerData, conf);
-
-    // If the container is not empty and cannot be deleted forcibly,
-    // then throw a SCE to stop deleting.
-    if(!forceDelete && !db.isEmpty()) {
-      throw new StorageContainerException(
-          "Container cannot be deleted because it is not empty.",
-          ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
-    }
-
-    // Close the DB connection and remove the DB handler from cache
-    KeyUtils.removeDB(containerData, conf);
-
-    // Delete the Container MetaData path.
-    FileUtils.deleteDirectory(containerMetaDataPath);
-
-    //Delete the Container Chunks Path.
-    FileUtils.deleteDirectory(chunksPath);
-
-    //Delete Container directory
-    FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
new file mode 100644
index 0000000..d9ee7fd
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .CreateContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .GetSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .PutSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Type;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume
+    .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.*;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Stage;
+
+/**
+ * Handler for KeyValue Container type.
+ */
+@Singleton
+public class KeyValueHandler extends Handler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      KeyValueHandler.class);
+
+  private static volatile KeyValueHandler INSTANCE = null; // Singleton class
+
+  private final ContainerType containerType;
+  private final KeyManager keyManager;
+  private final ChunkManager chunkManager;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+
+  // TODO : Add metrics and populate it.
+
+  public static KeyValueHandler getInstance(Configuration config,
+      ContainerSet contSet, VolumeSet volSet, String scmID) {
+    if (INSTANCE == null) {
+      INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID);
+    }
+    return INSTANCE;
+  }
+
+  private KeyValueHandler(Configuration config, ContainerSet contSet,
+      VolumeSet volSet, String scmID) {
+    super(config, contSet, volSet, scmID);
+    containerType = ContainerType.KeyValueContainer;
+    keyManager = new KeyManagerImpl(config);
+    chunkManager = new ChunkManagerImpl();
+    // TODO: Add supoort for different volumeChoosingPolicies.
+    volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
+  }
+
+  @Override
+  public ContainerCommandResponseProto handle(
+      ContainerCommandRequestProto request, Container container) {
+
+    Type cmdType = request.getCmdType();
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    switch(cmdType) {
+    case CreateContainer:
+      return handleCreateContainer(request, kvContainer);
+    case ReadContainer:
+      return handleReadContainer(request, kvContainer);
+    case UpdateContainer:
+      return handleUpdateContainer(request, kvContainer);
+    case DeleteContainer:
+      return handleDeleteContainer(request, kvContainer);
+    case ListContainer:
+      return handleUnsupportedOp(request);
+    case CloseContainer:
+      return handleCloseContainer(request, kvContainer);
+    case PutKey:
+      return handlePutKey(request, kvContainer);
+    case GetKey:
+      return handleGetKey(request, kvContainer);
+    case DeleteKey:
+      return handleDeleteKey(request, kvContainer);
+    case ListKey:
+      return handleUnsupportedOp(request);
+    case ReadChunk:
+      return handleReadChunk(request, kvContainer);
+    case DeleteChunk:
+      return handleDeleteChunk(request, kvContainer);
+    case WriteChunk:
+      return handleWriteChunk(request, kvContainer);
+    case ListChunk:
+      return handleUnsupportedOp(request);
+    case CompactChunk:
+      return handleUnsupportedOp(request);
+    case PutSmallFile:
+      return handlePutSmallFile(request, kvContainer);
+    case GetSmallFile:
+      return handleGetSmallFile(request, kvContainer);
+    }
+
+    return null;
+  }
+
+  /**
+   * Handles Create Container Request. If successful, adds the container to
+   * ContainerSet.
+   */
+  ContainerCommandResponseProto handleCreateContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    if (!request.hasCreateContainer()) {
+      LOG.debug("Malformed Create Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+    // Create Container request should be passed a null container as the
+    // container would be created here.
+    Preconditions.checkArgument(kvContainer == null);
+
+    CreateContainerRequestProto createContainerReq =
+        request.getCreateContainer();
+    long containerID = createContainerReq.getContainerID();
+    if (createContainerReq.hasContainerType()) {
+      Preconditions.checkArgument(createContainerReq.getContainerType()
+          .equals(ContainerType.KeyValueContainer));
+    }
+
+    KeyValueContainerData newContainerData = new KeyValueContainerData(
+        containerType, containerID);
+    // TODO: Add support to add metadataList to ContainerData. Add metadata
+    // to container during creation.
+    KeyValueContainer newContainer = new KeyValueContainer(
+        newContainerData, conf);
+
+    try {
+      newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
+      containerSet.addContainer(newContainer);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Read Container Request. Returns the ContainerData as response.
+   */
+  ContainerCommandResponseProto handleReadContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    if (!request.hasReadContainer()) {
+      LOG.debug("Malformed Read Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    KeyValueContainerData containerData = kvContainer.getContainerData();
+    return KeyValueContainerUtil.getReadContainerResponse(
+        request, containerData);
+  }
+
+
+  /**
+   * Handles Update Container Request. If successful, the container metadata
+   * is updated.
+   */
+  ContainerCommandResponseProto handleUpdateContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasUpdateContainer()) {
+      LOG.debug("Malformed Update Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    boolean forceUpdate = request.getUpdateContainer().getForceUpdate();
+    List<KeyValue> keyValueList =
+        request.getUpdateContainer().getMetadataList();
+    Map<String, String> metadata = new HashMap<>();
+    for (KeyValue keyValue : keyValueList) {
+      metadata.put(keyValue.getKey(), keyValue.getValue());
+    }
+
+    try {
+      if (!metadata.isEmpty()) {
+        kvContainer.update(metadata, forceUpdate);
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Delete Container Request.
+   * Open containers cannot be deleted.
+   * Holds writeLock on ContainerSet till the container is removed from
+   * containerMap. On disk deletion of container files will happen
+   * asynchornously without the lock.
+   */
+  ContainerCommandResponseProto handleDeleteContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteContainer()) {
+      LOG.debug("Malformed Delete container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    boolean forceDelete = request.getDeleteContainer().getForceDelete();
+    kvContainer.writeLock();
+
+    try {
+      // Check if container is open
+      if (kvContainer.getContainerData().isOpen()) {
+        kvContainer.writeUnlock();
+        throw new StorageContainerException(
+            "Deletion of Open Container is not allowed.",
+            DELETE_ON_OPEN_CONTAINER);
+      } else {
+        containerSet.removeContainer(
+            kvContainer.getContainerData().getContainerId());
+        // Release the lock first.
+        // Avoid holding write locks for disk operations
+        kvContainer.writeUnlock();
+
+        kvContainer.delete(forceDelete);
+      }
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } finally {
+      if (kvContainer.hasWriteLock()) {
+        kvContainer.writeUnlock();
+      }
+    }
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handles Close Container Request. An open container is closed.
+   */
+  ContainerCommandResponseProto handleCloseContainer(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasCloseContainer()) {
+      LOG.debug("Malformed Update Container request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      kvContainer.close();
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    }
+
+    return ContainerUtils.getSuccessResponse(request);
+  }
+
+  /**
+   * Handle Put Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handlePutKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasPutKey()) {
+      LOG.debug("Malformed Put Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      KeyData keyData = KeyData.getFromProtoBuf(
+          request.getPutKey().getKeyData());
+      Preconditions.checkNotNull(keyData);
+
+      keyManager.putKey(kvContainer, keyData);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Put Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyResponseSuccess(request);
+  }
+
+  /**
+   * Handle Get Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handleGetKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasGetKey()) {
+      LOG.debug("Malformed Get Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    KeyData responseData;
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getGetKey().getBlockID());
+      responseData = keyManager.getKey(kvContainer, blockID);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Get Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyDataResponse(request, responseData);
+  }
+
+  /**
+   * Handle Delete Key operation. Calls KeyManager to process the request.
+   */
+  ContainerCommandResponseProto handleDeleteKey(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteKey()) {
+      LOG.debug("Malformed Delete Key request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getDeleteKey().getBlockID());
+
+      keyManager.deleteKey(kvContainer, blockID);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return KeyUtils.getKeyResponseSuccess(request);
+  }
+
+  /**
+   * Handle Read Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleReadChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasReadChunk()) {
+      LOG.debug("Malformed Read Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    ChunkInfo chunkInfo;
+    byte[] data;
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getReadChunk().getBlockID());
+      chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
+  }
+
+  /**
+   * Handle Delete Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleDeleteChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasDeleteChunk()) {
+      LOG.debug("Malformed Delete Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getDeleteChunk().getBlockID());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Delete Chunk failed", ex,
+              IO_EXCEPTION), request);
+    }
+
+    return ChunkUtils.getChunkResponseSuccess(request);
+  }
+
+  /**
+   * Handle Write Chunk operation. Calls ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleWriteChunk(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasWriteChunk()) {
+      LOG.debug("Malformed Write Chunk request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+          request.getWriteChunk().getBlockID());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
+          .getChunkData());
+      Preconditions.checkNotNull(chunkInfo);
+
+      byte[] data = null;
+      if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
+          request.getWriteChunk().getStage() == Stage.COMBINED) {
+        data = request.getWriteChunk().getData().toByteArray();
+      }
+
+      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
+          request.getWriteChunk().getStage());
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
+          request);
+    }
+
+    return ChunkUtils.getChunkResponseSuccess(request);
+  }
+
+  /**
+   * Handle Put Small File operation. Writes the chunk and associated key
+   * using a single RPC. Calls KeyManager and ChunkManager to process the
+   * request.
+   */
+  ContainerCommandResponseProto handlePutSmallFile(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasPutSmallFile()) {
+      LOG.debug("Malformed Put Small File request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+    PutSmallFileRequestProto putSmallFileReq =
+        request.getPutSmallFile();
+
+    try {
+      checkContainerOpen(kvContainer);
+
+      BlockID blockID = BlockID.getFromProtobuf(
+        putSmallFileReq.getKey().getKeyData().getBlockID());
+      KeyData keyData = KeyData.getFromProtoBuf(
+          putSmallFileReq.getKey().getKeyData());
+      Preconditions.checkNotNull(keyData);
+
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
+          putSmallFileReq.getChunkInfo());
+      Preconditions.checkNotNull(chunkInfo);
+
+      byte[] data = putSmallFileReq.getData().toByteArray();
+      chunkManager.writeChunk(
+          kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
+
+      List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+      chunks.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunks);
+      keyManager.putKey(kvContainer, keyData);
+
+    } catch (StorageContainerException ex) {
+      return ContainerUtils.logAndReturnError(LOG, ex, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Read Chunk failed", ex,
+              PUT_SMALL_FILE_ERROR), request);
+    }
+
+    return SmallFileUtils.getPutFileResponseSuccess(request);
+  }
+
+  /**
+   * Handle Get Small File operation. Gets a data stream using a key. This
+   * helps in reducing the RPC overhead for small files. Calls KeyManager and
+   * ChunkManager to process the request.
+   */
+  ContainerCommandResponseProto handleGetSmallFile(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    if (!request.hasGetSmallFile()) {
+      LOG.debug("Malformed Get Small File request. trace ID: {}",
+          request.getTraceID());
+      return ContainerUtils.malformedRequest(request);
+    }
+
+    GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
+
+    try {
+      BlockID blockID = BlockID.getFromProtobuf(
+        getSmallFileReq.getKey().getBlockID());
+      KeyData responseData = keyManager.getKey(kvContainer, blockID);
+
+      ContainerProtos.ChunkInfo chunkInfo = null;
+      ByteString dataBuf = ByteString.EMPTY;
+      for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
+        byte[] data = chunkManager.readChunk(kvContainer, blockID,
+            ChunkInfo.getFromProtoBuf(chunk));
+        ByteString current = ByteString.copyFrom(data);
+        dataBuf = dataBuf.concat(current);
+        chunkInfo = chunk;
+      }
+
+      return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf
+          .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo));
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, request);
+    } catch (IOException ex) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Write Chunk failed", ex,
+              GET_SMALL_FILE_ERROR), request);
+    }
+  }
+
+  /**
+   * Handle unsupported operation.
+   */
+  ContainerCommandResponseProto handleUnsupportedOp(
+      ContainerCommandRequestProto request) {
+    // TODO : remove all unsupported operations or handle them.
+    return ContainerUtils.unsupportedRequest(request);
+  }
+
+  /**
+   * Check if container is open. Throw exception otherwise.
+   * @param kvContainer
+   * @throws StorageContainerException
+   */
+  private void checkContainerOpen(KeyValueContainer kvContainer)
+      throws StorageContainerException {
+
+    ContainerProtos.ContainerLifeCycleState containerState =
+        kvContainer.getContainerState();
+
+    if (containerState == ContainerProtos.ContainerLifeCycleState.OPEN) {
+      return;
+    } else {
+      String msg = "Requested operation not allowed as ContainerState is " +
+          containerState;
+      ContainerProtos.Result result = null;
+      switch (containerState) {
+      case CLOSING:
+      case CLOSED:
+        result = CLOSED_CONTAINER_IO;
+        break;
+      case INVALID:
+        result = INVALID_CONTAINER_STATE;
+        break;
+      default:
+        result = CONTAINER_INTERNAL_ERROR;
+      }
+
+      throw new StorageContainerException(msg, result);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
new file mode 100644
index 0000000..64f7152
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.yaml.snakeyaml.Yaml;
+
+import java.beans.IntrospectionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.File;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Map;
+
+import org.yaml.snakeyaml.constructor.AbstractConstruct;
+import org.yaml.snakeyaml.constructor.Constructor;
+import org.yaml.snakeyaml.introspector.BeanAccess;
+import org.yaml.snakeyaml.introspector.Property;
+import org.yaml.snakeyaml.introspector.PropertyUtils;
+import org.yaml.snakeyaml.nodes.MappingNode;
+import org.yaml.snakeyaml.nodes.Node;
+import org.yaml.snakeyaml.nodes.ScalarNode;
+import org.yaml.snakeyaml.nodes.Tag;
+import org.yaml.snakeyaml.representer.Representer;
+
+/**
+ * Class for creating and reading .container files.
+ */
+
+public final class KeyValueYaml {
+
+  private KeyValueYaml() {
+
+  }
+  /**
+   * Creates a .container file in yaml format.
+   *
+   * @param containerFile
+   * @param containerData
+   * @throws IOException
+   */
+  public static void createContainerFile(File containerFile, ContainerData
+      containerData) throws IOException {
+
+    Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
+    Preconditions.checkNotNull(containerData, "containerData cannot be null");
+
+    PropertyUtils propertyUtils = new PropertyUtils();
+    propertyUtils.setBeanAccess(BeanAccess.FIELD);
+    propertyUtils.setAllowReadOnlyProperties(true);
+
+    Representer representer = new KeyValueContainerDataRepresenter();
+    representer.setPropertyUtils(propertyUtils);
+    representer.addClassTag(
+        KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+
+    Constructor keyValueDataConstructor = new KeyValueDataConstructor();
+
+    Yaml yaml = new Yaml(keyValueDataConstructor, representer);
+
+    Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile),
+        "UTF-8");
+    yaml.dump(containerData, writer);
+    writer.close();
+  }
+
+  /**
+   * Read the yaml file, and return containerData.
+   *
+   * @param containerFile
+   * @throws IOException
+   */
+  public static KeyValueContainerData readContainerFile(File containerFile)
+      throws IOException {
+    Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+
+    InputStream input = null;
+    KeyValueContainerData keyValueContainerData;
+    try {
+      PropertyUtils propertyUtils = new PropertyUtils();
+      propertyUtils.setBeanAccess(BeanAccess.FIELD);
+      propertyUtils.setAllowReadOnlyProperties(true);
+
+      Representer representer = new KeyValueContainerDataRepresenter();
+      representer.setPropertyUtils(propertyUtils);
+      representer.addClassTag(
+          KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+
+      Constructor keyValueDataConstructor = new KeyValueDataConstructor();
+
+      Yaml yaml = new Yaml(keyValueDataConstructor, representer);
+      yaml.setBeanAccess(BeanAccess.FIELD);
+
+      input = new FileInputStream(containerFile);
+      keyValueContainerData = (KeyValueContainerData)
+          yaml.load(input);
+    } finally {
+      if (input!= null) {
+        input.close();
+      }
+    }
+    return keyValueContainerData;
+  }
+
+  /**
+   * Representer class to define which fields need to be stored in yaml file.
+   */
+  private static class KeyValueContainerDataRepresenter extends Representer {
+    @Override
+    protected Set<Property> getProperties(Class<? extends Object> type)
+        throws IntrospectionException {
+      Set<Property> set = super.getProperties(type);
+      Set<Property> filtered = new TreeSet<Property>();
+      if (type.equals(KeyValueContainerData.class)) {
+        // filter properties
+        for (Property prop : set) {
+          String name = prop.getName();
+          // When a new field needs to be added, it needs to be added here.
+          if (name.equals("containerType") || name.equals("containerId") ||
+              name.equals("layOutVersion") || name.equals("state") ||
+              name.equals("metadata") || name.equals("metadataPath") ||
+              name.equals("chunksPath") || name.equals(
+                  "containerDBType")) {
+            filtered.add(prop);
+          }
+        }
+      }
+      return filtered;
+    }
+  }
+
+  /**
+   * Constructor class for KeyValueData, which will be used by Yaml.
+   */
+  private static class KeyValueDataConstructor extends Constructor {
+    KeyValueDataConstructor() {
+      //Adding our own specific constructors for tags.
+      this.yamlConstructors.put(new Tag("KeyValueContainerData"),
+          new ConstructKeyValueContainerData());
+      this.yamlConstructors.put(Tag.INT, new ConstructLong());
+    }
+
+    private class ConstructKeyValueContainerData extends AbstractConstruct {
+      public Object construct(Node node) {
+        MappingNode mnode = (MappingNode) node;
+        Map<Object, Object> nodes = constructMapping(mnode);
+        String type = (String) nodes.get("containerType");
+
+        ContainerProtos.ContainerType containerType = ContainerProtos
+            .ContainerType.KeyValueContainer;
+        if (type.equals("KeyValueContainer")) {
+          containerType = ContainerProtos.ContainerType.KeyValueContainer;
+        }
+
+        //Needed this, as TAG.INT type is by default converted to Long.
+        long layOutVersion = (long) nodes.get("layOutVersion");
+        int lv = (int) layOutVersion;
+
+        //When a new field is added, it needs to be added here.
+        KeyValueContainerData kvData = new KeyValueContainerData(containerType,
+            (long) nodes.get("containerId"), lv);
+        kvData.setContainerDBType((String)nodes.get("containerDBType"));
+        kvData.setMetadataPath((String) nodes.get(
+            "metadataPath"));
+        kvData.setChunksPath((String) nodes.get("chunksPath"));
+        Map<String, String> meta = (Map) nodes.get("metadata");
+        meta.forEach((key, val) -> {
+          try {
+            kvData.addMetadata(key, val);
+          } catch (IOException e) {
+            throw new IllegalStateException("Unexpected " +
+                "Key Value Pair " + "(" + key + "," + val +")in the metadata " +
+                "for containerId " + (long) nodes.get("containerId"));
+          }
+        });
+        String state = (String) nodes.get("state");
+        switch (state) {
+        case "OPEN":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
+          break;
+        case "CLOSING":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+          break;
+        case "CLOSED":
+          kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
+          break;
+        default:
+          throw new IllegalStateException("Unexpected " +
+              "ContainerLifeCycleState " + state + " for the containerId " +
+              (long) nodes.get("containerId"));
+        }
+        return kvData;
+      }
+    }
+
+    //Below code is taken from snake yaml, as snakeyaml tries to fit the
+    // number if it fits in integer, otherwise returns long. So, slightly
+    // modified the code to return long in all cases.
+    private class ConstructLong extends AbstractConstruct {
+      public Object construct(Node node) {
+        String value = constructScalar((ScalarNode) node).toString()
+            .replaceAll("_", "");
+        int sign = +1;
+        char first = value.charAt(0);
+        if (first == '-') {
+          sign = -1;
+          value = value.substring(1);
+        } else if (first == '+') {
+          value = value.substring(1);
+        }
+        int base = 10;
+        if ("0".equals(value)) {
+          return Long.valueOf(0);
+        } else if (value.startsWith("0b")) {
+          value = value.substring(2);
+          base = 2;
+        } else if (value.startsWith("0x")) {
+          value = value.substring(2);
+          base = 16;
+        } else if (value.startsWith("0")) {
+          value = value.substring(1);
+          base = 8;
+        } else if (value.indexOf(':') != -1) {
+          String[] digits = value.split(":");
+          int bes = 1;
+          int val = 0;
+          for (int i = 0, j = digits.length; i < j; i++) {
+            val += (Long.parseLong(digits[(j - i) - 1]) * bes);
+            bes *= 60;
+          }
+          return createNumber(sign, String.valueOf(val), 10);
+        } else {
+          return createNumber(sign, value, 10);
+        }
+        return createNumber(sign, value, base);
+      }
+    }
+
+    private Number createNumber(int sign, String number, int radix) {
+      Number result;
+      if (sign < 0) {
+        number = "-" + number;
+      }
+      result = Long.valueOf(number, radix);
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index c837ccc..872d84d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -21,12 +21,21 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -292,4 +301,41 @@ public final class ChunkUtils {
         (Boolean.valueOf(overWrite));
   }
 
+  /**
+   * Returns a CreateContainer Response. This call is used by create and delete
+   * containers which have null success responses.
+   *
+   * @param msg Request
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getChunkResponseSuccess(
+      ContainerCommandRequestProto msg) {
+    return ContainerUtils.getSuccessResponse(msg);
+  }
+
+  /**
+   * Gets a response to the read chunk calls.
+   *
+   * @param msg - Msg
+   * @param data - Data
+   * @param info - Info
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getReadChunkResponse(
+      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+    Preconditions.checkNotNull("Chunk data is null", data);
+    Preconditions.checkNotNull("Chunk Info is null", info);
+
+    ReadChunkResponseProto.Builder response =
+        ReadChunkResponseProto.newBuilder();
+    response.setChunkData(info.getProtoBufMessage());
+    response.setData(ByteString.copyFrom(data));
+    response.setBlockID(msg.getReadChunk().getBlockID());
+
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setReadChunk(response);
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index d45f598..714f445 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -21,9 +21,17 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .GetKeyResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.utils.MetadataStore;
 
@@ -112,4 +120,27 @@ public final class KeyUtils {
           " bytes array.", NO_SUCH_KEY);
     }
   }
+
+  /**
+   * Returns successful keyResponse.
+   * @param msg - Request.
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getKeyResponseSuccess(
+      ContainerCommandRequestProto msg) {
+    return ContainerUtils.getSuccessResponse(msg);
+  }
+
+
+  public static ContainerCommandResponseProto getKeyDataResponse(
+      ContainerCommandRequestProto msg, KeyData data) {
+    GetKeyResponseProto.Builder getKey = ContainerProtos
+        .GetKeyResponseProto
+        .newBuilder();
+    getKey.setKeyData(data.getProtoBufMessage());
+    ContainerProtos.ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setGetKey(getKey);
+    return  builder.build();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
new file mode 100644
index 0000000..4710c51
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Storage;
+
+import java.io.File;
+
+/**
+ * Class which provides utility methods for container locations.
+ */
+public final class KeyValueContainerLocationUtil {
+
+  /* Never constructed. */
+  private KeyValueContainerLocationUtil() {
+
+  }
+  /**
+   * Returns Container Metadata Location.
+   * @param baseDir
+   * @param scmId
+   * @param containerId
+   * @return containerMetadata Path
+   */
+  public static File getContainerMetaDataPath(String baseDir, String scmId,
+                                              long containerId) {
+    String containerMetaDataPath = getBaseContainerLocation(baseDir, scmId,
+        containerId);
+    containerMetaDataPath = containerMetaDataPath + File.separator +
+        OzoneConsts.CONTAINER_META_PATH;
+    return new File(containerMetaDataPath);
+  }
+
+
+  /**
+   * Returns Container Chunks Location.
+   * @param baseDir
+   * @param scmId
+   * @param containerId
+   * @return chunksPath
+   */
+  public static File getChunksLocationPath(String baseDir, String scmId,
+                                           long containerId) {
+    String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
+        + File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
+    return new File(chunksPath);
+  }
+
+  /**
+   * Returns base directory for specified container.
+   * @param baseDir
+   * @param scmId
+   * @param containerId
+   * @return base directory for container.
+   */
+  private static String getBaseContainerLocation(String baseDir, String scmId,
+                                        long containerId) {
+    Preconditions.checkNotNull(baseDir, "Base Directory cannot be null");
+    Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
+    Preconditions.checkState(containerId >= 0,
+        "Container Id cannot be negative.");
+
+    String containerSubDirectory = getContainerSubDirectory(containerId);
+
+    String containerMetaDataPath = baseDir  + File.separator + scmId +
+        File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
+        containerSubDirectory + File.separator + containerId;
+
+    return containerMetaDataPath;
+  }
+
+  /**
+   * Returns subdirectory, where this container needs to be placed.
+   * @param containerId
+   * @return container sub directory
+   */
+  private static String getContainerSubDirectory(long containerId){
+    int directory = (int) ((containerId >> 9) & 0xFF);
+    return Storage.CONTAINER_DIR + directory;
+  }
+
+  /**
+   * Returns containerFile.
+   * @param containerMetaDataPath
+   * @param containerName
+   * @return .container File name
+   */
+  public static File getContainerFile(File containerMetaDataPath, String
+      containerName) {
+    Preconditions.checkNotNull(containerMetaDataPath);
+    Preconditions.checkNotNull(containerName);
+    return new File(containerMetaDataPath, containerName +
+        OzoneConsts.CONTAINER_EXTENSION);
+  }
+
+  /**
+   * Return containerDB File.
+   * @param containerMetaDataPath
+   * @param containerName
+   * @return containerDB File name
+   */
+  public static File getContainerDBFile(File containerMetaDataPath, String
+      containerName) {
+    Preconditions.checkNotNull(containerMetaDataPath);
+    Preconditions.checkNotNull(containerName);
+    return new File(containerMetaDataPath, containerName + OzoneConsts
+        .DN_CONTAINER_DB);
+  }
+
+  /**
+   * Returns container checksum file.
+   * @param containerMetaDataPath
+   * @param containerName
+   * @return container checksum file
+   */
+  public static File getContainerCheckSumFile(File containerMetaDataPath,
+                                              String containerName) {
+    Preconditions.checkNotNull(containerMetaDataPath);
+    Preconditions.checkNotNull(containerName);
+    return new File(containerMetaDataPath, containerName + OzoneConsts
+        .CONTAINER_FILE_CHECKSUM_EXTENSION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
new file mode 100644
index 0000000..b868f1d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class which defines utility methods for KeyValueContainer.
+ */
+
+public final class KeyValueContainerUtil {
+
+  /* Never constructed. */
+  private KeyValueContainerUtil() {
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      KeyValueContainerUtil.class);
+
+
+  public static void verifyIsNewContainer(File containerFile) throws
+      FileAlreadyExistsException {
+    Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
+        "null");
+    if (containerFile.getParentFile().exists()) {
+      LOG.error("container already exists on disk. File: {}", containerFile
+          .toPath());
+      throw new FileAlreadyExistsException("container already exists on " +
+            "disk.");
+    }
+  }
+
+  /**
+   * creates metadata path, chunks path and  metadata DB for the specified
+   * container.
+   *
+   * @param containerMetaDataPath
+   * @throws IOException
+   */
+  public static void createContainerMetaData(File containerMetaDataPath, File
+      chunksPath, File dbFile, String containerName, Configuration conf) throws
+      IOException {
+    Preconditions.checkNotNull(containerMetaDataPath);
+    Preconditions.checkNotNull(containerName);
+    Preconditions.checkNotNull(conf);
+
+    if (!containerMetaDataPath.mkdirs()) {
+      LOG.error("Unable to create directory for metadata storage. Path: {}",
+          containerMetaDataPath);
+      throw new IOException("Unable to create directory for metadata storage." +
+          " Path: " + containerMetaDataPath);
+    }
+    MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
+        .setCreateIfMissing(true).setDbFile(dbFile).build();
+
+    // we close since the SCM pre-creates containers.
+    // we will open and put Db handle into a cache when keys are being created
+    // in a container.
+
+    store.close();
+
+    if (!chunksPath.mkdirs()) {
+      LOG.error("Unable to create chunks directory Container {}",
+          chunksPath);
+      //clean up container metadata path and metadata db
+      FileUtils.deleteDirectory(containerMetaDataPath);
+      FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
+      throw new IOException("Unable to create directory for data storage." +
+          " Path: " + chunksPath);
+    }
+  }
+
+  /**
+   * remove Container if it is empty.
+   * <p/>
+   * There are three things we need to delete.
+   * <p/>
+   * 1. Container file and metadata file. 2. The Level DB file 3. The path that
+   * we created on the data location.
+   *
+   * @param containerData - Data of the container to remove.
+   * @param conf - configuration of the cluster.
+   * @param forceDelete - whether this container should be deleted forcibly.
+   * @throws IOException
+   */
+  public static void removeContainer(KeyValueContainerData containerData,
+                                     Configuration conf, boolean forceDelete)
+      throws IOException {
+    Preconditions.checkNotNull(containerData);
+    File containerMetaDataPath = new File(containerData
+        .getMetadataPath());
+    File chunksPath = new File(containerData.getChunksPath());
+
+    MetadataStore db = KeyUtils.getDB(containerData, conf);
+
+    // If the container is not empty and cannot be deleted forcibly,
+    // then throw a SCE to stop deleting.
+    if(!forceDelete && !db.isEmpty()) {
+      throw new StorageContainerException(
+          "Container cannot be deleted because it is not empty.",
+          ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+    }
+
+    // Close the DB connection and remove the DB handler from cache
+    KeyUtils.removeDB(containerData, conf);
+
+    // Delete the Container MetaData path.
+    FileUtils.deleteDirectory(containerMetaDataPath);
+
+    //Delete the Container Chunks Path.
+    FileUtils.deleteDirectory(chunksPath);
+
+    //Delete Container directory
+    FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
+  }
+
+  /**
+   * Returns a ReadContainer Response.
+   *
+   * @param request Request
+   * @param containerData - data
+   * @return Response.
+   */
+  public static ContainerCommandResponseProto getReadContainerResponse(
+      ContainerCommandRequestProto request,
+      KeyValueContainerData containerData) {
+    Preconditions.checkNotNull(containerData);
+
+    ContainerProtos.ReadContainerResponseProto.Builder response =
+        ContainerProtos.ReadContainerResponseProto.newBuilder();
+    response.setContainerData(containerData.getProtoBufMessage());
+
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(request);
+    builder.setReadContainer(response);
+    return builder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
new file mode 100644
index 0000000..df60c60
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+
+/**
+ * File Utils are helper routines used by putSmallFile and getSmallFile
+ * RPCs.
+ */
+public final class SmallFileUtils {
+  /**
+   * Never Constructed.
+   */
+  private SmallFileUtils() {
+  }
+
+  /**
+   * Gets a response for the putSmallFile RPC.
+   * @param msg - ContainerCommandRequestProto
+   * @return - ContainerCommandResponseProto
+   */
+  public static ContainerCommandResponseProto getPutFileResponseSuccess(
+      ContainerCommandRequestProto msg) {
+    ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
+        ContainerProtos.PutSmallFileResponseProto.newBuilder();
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setCmdType(ContainerProtos.Type.PutSmallFile);
+    builder.setPutSmallFile(getResponse);
+    return  builder.build();
+  }
+
+  /**
+   * Gets a response to the read small file call.
+   * @param msg - Msg
+   * @param data  - Data
+   * @param info  - Info
+   * @return    Response.
+   */
+  public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
+      ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
+    Preconditions.checkNotNull(msg);
+
+    ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
+        ContainerProtos.ReadChunkResponseProto.newBuilder();
+    readChunkresponse.setChunkData(info.getProtoBufMessage());
+    readChunkresponse.setData(ByteString.copyFrom(data));
+    readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().getBlockID());
+
+    ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
+        ContainerProtos.GetSmallFileResponseProto.newBuilder();
+    getSmallFile.setData(readChunkresponse.build());
+    ContainerCommandResponseProto.Builder builder =
+        ContainerUtils.getSuccessResponseBuilder(msg);
+    builder.setCmdType(ContainerProtos.Type.GetSmallFile);
+    builder.setGetSmallFile(getSmallFile);
+    return builder.build();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org