You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:24 UTC

[15/37] hadoop git commit: HDDS-160:Refactor KeyManager, ChunkManager. Contributed by Bharat Viswanadham

HDDS-160:Refactor KeyManager, ChunkManager. Contributed by Bharat Viswanadham


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca192cb7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca192cb7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca192cb7

Branch: refs/heads/trunk
Commit: ca192cb7c9d76163391e88461716938fdb41c4d3
Parents: 998e285
Author: Bharat Viswanadham <bh...@apache.org>
Authored: Fri Jun 15 14:35:33 2018 -0700
Committer: Bharat Viswanadham <bh...@apache.org>
Committed: Fri Jun 15 14:35:33 2018 -0700

----------------------------------------------------------------------
 .../common/interfaces/ChunkManager.java         |   2 +-
 .../container/keyvalue/ChunkManagerImpl.java    | 240 +++++++++++++++
 .../container/keyvalue/KeyManagerImpl.java      | 188 ++++++++++++
 .../container/keyvalue/helpers/ChunkUtils.java  | 295 +++++++++++++++++++
 .../container/keyvalue/helpers/KeyUtils.java    |  37 ++-
 .../keyvalue/interfaces/ChunkManager.java       |  80 +++++
 .../keyvalue/interfaces/KeyManager.java         |  76 +++++
 .../keyvalue/TestChunkManagerImpl.java          | 237 +++++++++++++++
 .../container/keyvalue/TestKeyManagerImpl.java  | 179 +++++++++++
 9 files changed, 1331 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
index c58fb9d..9de84da 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
@@ -70,4 +70,4 @@ public interface ChunkManager {
    */
   void shutdown();
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
new file mode 100644
index 0000000..6ee0fd3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.NO_SUCH_ALGORITHM;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
+
+/**
+ * This class is for performing chunk related operations.
+ */
+public class ChunkManagerImpl implements ChunkManager {
+  static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+  /**
+   * writes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block
+   * @param info - ChunkInfo
+   * @param data - data of the chunk
+   * @param stage - Stage of the Chunk operation
+   * @throws StorageContainerException
+   */
+  public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
+                         byte[] data, ContainerProtos.Stage stage)
+      throws StorageContainerException {
+
+    try {
+
+      KeyValueContainerData containerData = (KeyValueContainerData) container
+          .getContainerData();
+
+      File chunkFile = ChunkUtils.validateChunk(containerData, info);
+      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
+
+      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
+          info.getChunkName(), stage, chunkFile, tmpChunkFile);
+
+      switch (stage) {
+      case WRITE_DATA:
+        // Initially writes to temporary chunk file.
+        ChunkUtils.writeData(tmpChunkFile, info, data);
+        break;
+      case COMMIT_DATA:
+        // commit the data, means move chunk data from temporary chunk file
+        // to actual chunk file.
+        long sizeDiff = tmpChunkFile.length() - chunkFile.length();
+        commitChunk(tmpChunkFile, chunkFile);
+        containerData.incrBytesUsed(sizeDiff);
+        containerData.incrWriteCount();
+        containerData.incrWriteBytes(sizeDiff);
+        break;
+      case COMBINED:
+        // directly write to the chunk file
+        ChunkUtils.writeData(chunkFile, info, data);
+        containerData.incrBytesUsed(info.getLen());
+        containerData.incrWriteCount();
+        containerData.incrWriteBytes(info.getLen());
+        break;
+      default:
+        throw new IOException("Can not identify write operation.");
+      }
+    } catch (StorageContainerException ex) {
+      throw ex;
+    } catch (NoSuchAlgorithmException ex) {
+      LOG.error("write data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ", ex,
+          NO_SUCH_ALGORITHM);
+    } catch (ExecutionException  | IOException ex) {
+      LOG.error("write data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ", ex,
+          CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("write data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ", e,
+          CONTAINER_INTERNAL_ERROR);
+    }
+  }
+
+  /**
+   * reads the data defined by a chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block.
+   * @param info - ChunkInfo.
+   * @return byte array
+   * @throws StorageContainerException
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
+      throws StorageContainerException {
+    try {
+      KeyValueContainerData containerData = (KeyValueContainerData) container
+          .getContainerData();
+      ByteBuffer data;
+
+      // Checking here, which layout version the container is, and reading
+      // the chunk file in that format.
+      // In version1, we verify checksum if it is available and return data
+      // of the chunk file.
+      if (containerData.getLayOutVersion() == ChunkLayOutVersion
+          .getLatestVersion().getVersion()) {
+        File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+        data = ChunkUtils.readData(chunkFile, info);
+        containerData.incrReadCount();
+        containerData.incrReadBytes(chunkFile.length());
+        return data.array();
+      }
+    } catch(NoSuchAlgorithmException ex) {
+      LOG.error("read data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ",
+          ex, NO_SUCH_ALGORITHM);
+    } catch (ExecutionException ex) {
+      LOG.error("read data failed. error: {}", ex);
+      throw new StorageContainerException("Internal error: ",
+          ex, CONTAINER_INTERNAL_ERROR);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.error("read data failed. error: {}", e);
+      throw new StorageContainerException("Internal error: ",
+          e, CONTAINER_INTERNAL_ERROR);
+    }
+    return null;
+  }
+
+  /**
+   * Deletes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block
+   * @param info - Chunk Info
+   * @throws StorageContainerException
+   */
+  public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
+      throws StorageContainerException {
+    Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    // Checking here, which layout version the container is, and performing
+    // deleting chunk operation.
+    // In version1, we have only chunk file.
+    if (containerData.getLayOutVersion() == ChunkLayOutVersion
+        .getLatestVersion().getVersion()) {
+      File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
+        FileUtil.fullyDelete(chunkFile);
+        containerData.decrBytesUsed(chunkFile.length());
+      } else {
+        LOG.error("Not Supported Operation. Trying to delete a " +
+            "chunk that is in shared file. chunk info : " + info.toString());
+        throw new StorageContainerException("Not Supported Operation. " +
+            "Trying to delete a chunk that is in shared file. chunk info : "
+            + info.toString(), UNSUPPORTED_REQUEST);
+      }
+    }
+  }
+
+  /**
+   * Shutdown the chunkManager.
+   *
+   * In the chunkManager we haven't acquired any resources, so nothing to do
+   * here.
+   */
+
+  public void shutdown() {
+    //TODO: need to revisit this during integration of container IO.
+  }
+
+  /**
+   * Returns the temporary chunkFile path.
+   * @param chunkFile
+   * @param info
+   * @return temporary chunkFile path
+   * @throws StorageContainerException
+   */
+  private File getTmpChunkFile(File chunkFile, ChunkInfo info)
+      throws StorageContainerException {
+    return new File(chunkFile.getParent(),
+        chunkFile.getName() +
+            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+  }
+
+  /**
+   * Commit the chunk by renaming the temporary chunk file to chunk file.
+   * @param tmpChunkFile
+   * @param chunkFile
+   * @throws IOException
+   */
+  private void commitChunk(File tmpChunkFile, File chunkFile) throws
+      IOException {
+    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
+        StandardCopyOption.REPLACE_EXISTING);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
new file mode 100644
index 0000000..87565ce
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
+
+/**
+ * This class is for performing key related operations on the KeyValue
+ * Container.
+ */
+public class KeyManagerImpl implements KeyManager {
+
+  static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
+
+  private Configuration config;
+
+  /**
+   * Constructs a key Manager.
+   *
+   * @param conf - Ozone configuration
+   */
+  public KeyManagerImpl(Configuration conf) {
+    Preconditions.checkNotNull(conf, "Config cannot be null");
+    this.config = conf;
+  }
+
+  /**
+   * Puts or overwrites a key.
+   *
+   * @param container - Container for which key need to be added.
+   * @param data     - Key Data.
+   * @throws IOException
+   */
+  public void putKey(Container container, KeyData data) throws IOException {
+    Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
+        "operation.");
+    Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
+        "cannot be negative");
+    // We are not locking the key manager since LevelDb serializes all actions
+    // against a single DB. We rely on DB level locking to avoid conflicts.
+    MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
+        .getContainerData(), config);
+
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
+        .toByteArray());
+  }
+
+  /**
+   * Gets an existing key.
+   *
+   * @param container - Container from which key need to be get.
+   * @param data - Key Data.
+   * @return Key Data.
+   * @throws IOException
+   */
+  public KeyData getKey(Container container, KeyData data) throws IOException {
+    Preconditions.checkNotNull(data, "Key data cannot be null");
+    Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
+        " be null");
+    KeyValueContainerData containerData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(containerData, config);
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
+    if (kData == null) {
+      throw new StorageContainerException("Unable to find the key.",
+          NO_SUCH_KEY);
+    }
+    ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
+    return KeyData.getFromProtoBuf(keyData);
+  }
+
+  /**
+   * Deletes an existing Key.
+   *
+   * @param container - Container from which key need to be deleted.
+   * @param blockID - ID of the block.
+   * @throws StorageContainerException
+   */
+  public void deleteKey(Container container, BlockID blockID) throws
+      IOException {
+    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
+    Preconditions.checkState(blockID.getContainerID() >= 0,
+        "Container ID cannot be negative.");
+    Preconditions.checkState(blockID.getLocalID() >= 0,
+        "Local ID cannot be negative.");
+
+    KeyValueContainerData cData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(cData, config);
+    // This is a post condition that acts as a hint to the user.
+    // Should never fail.
+    Preconditions.checkNotNull(db, "DB cannot be null here");
+    // Note : There is a race condition here, since get and delete
+    // are not atomic. Leaving it here since the impact is refusing
+    // to delete a key which might have just gotten inserted after
+    // the get check.
+    byte[] kKey = Longs.toByteArray(blockID.getLocalID());
+    byte[] kData = db.get(kKey);
+    if (kData == null) {
+      throw new StorageContainerException("Unable to find the key.",
+          NO_SUCH_KEY);
+    }
+    db.delete(kKey);
+  }
+
+  /**
+   * List keys in a container.
+   *
+   * @param container - Container from which keys need to be listed.
+   * @param startLocalID  - Key to start from, 0 to begin.
+   * @param count    - Number of keys to return.
+   * @return List of Keys that match the criteria.
+   */
+  public List<KeyData> listKey(Container container, long startLocalID, int
+      count) throws IOException {
+    Preconditions.checkNotNull(container, "container cannot be null");
+    Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
+        "negative");
+    Preconditions.checkArgument(count > 0,
+        "Count must be a positive number.");
+    container.readLock();
+    List<KeyData> result = null;
+    KeyValueContainerData cData = (KeyValueContainerData) container
+        .getContainerData();
+    MetadataStore db = KeyUtils.getDB(cData, config);
+    result = new ArrayList<>();
+    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
+    List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
+        startKeyInBytes, count, null);
+    for (Map.Entry<byte[], byte[]> entry : range) {
+      KeyData value = KeyUtils.getKeyData(entry.getValue());
+      KeyData data = new KeyData(value.getBlockID());
+      result.add(data);
+    }
+    return result;
+  }
+
+  /**
+   * Shutdown KeyValueContainerManager.
+   */
+  public void shutdown() {
+    KeyUtils.shutdownCache(ContainerCache.getInstance(config));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
new file mode 100644
index 0000000..c837ccc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
+
+/**
+ * Utility methods for chunk operations for KeyValue container.
+ */
+public final class ChunkUtils {
+
+  /** Never constructed. **/
+  private ChunkUtils() {
+
+  }
+
+  /**
+   * Writes the data in chunk Info to the specified location in the chunkfile.
+   *
+   * @param chunkFile - File to write data to.
+   * @param chunkInfo - Data stream to write.
+   * @param data - The data buffer.
+   * @throws StorageContainerException
+   */
+  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
+                               byte[] data) throws
+      StorageContainerException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+    if (data.length != chunkInfo.getLen()) {
+      String err = String.format("data array does not match the length " +
+              "specified. DataLen: %d Byte Array: %d",
+          chunkInfo.getLen(), data.length);
+      log.error(err);
+      throw new StorageContainerException(err, INVALID_WRITE_SIZE);
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.CREATE,
+              StandardOpenOption.WRITE,
+              StandardOpenOption.SPARSE,
+              StandardOpenOption.SYNC);
+      lock = file.lock().get();
+      if (chunkInfo.getChecksum() != null &&
+          !chunkInfo.getChecksum().isEmpty()) {
+        verifyChecksum(chunkInfo, data, log);
+      }
+      int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
+      if (size != data.length) {
+        log.error("Invalid write size found. Size:{}  Expected: {} ", size,
+            data.length);
+        throw new StorageContainerException("Invalid write size found. " +
+            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
+      }
+    } catch (StorageContainerException ex) {
+      throw ex;
+    } catch(IOException e) {
+      throw new StorageContainerException(e, IO_EXCEPTION);
+
+    } finally {
+      if (lock != null) {
+        try {
+          lock.release();
+        } catch (IOException e) {
+          log.error("Unable to release lock ??, Fatal Error.");
+          throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
+
+        }
+      }
+      if (file != null) {
+        try {
+          file.close();
+        } catch (IOException e) {
+          throw new StorageContainerException("Error closing chunk file",
+              e, CONTAINER_INTERNAL_ERROR);
+        }
+      }
+    }
+  }
+
+  /**
+   * Reads data from an existing chunk file.
+   *
+   * @param chunkFile - file where data lives.
+   * @param data - chunk definition.
+   * @return ByteBuffer
+   * @throws StorageContainerException
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
+      StorageContainerException, ExecutionException, InterruptedException,
+      NoSuchAlgorithmException {
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    if (!chunkFile.exists()) {
+      log.error("Unable to find the chunk file. chunk info : {}",
+          data.toString());
+      throw new StorageContainerException("Unable to find the chunk file. " +
+          "chunk info " +
+          data.toString(), UNABLE_TO_FIND_CHUNK);
+    }
+
+    AsynchronousFileChannel file = null;
+    FileLock lock = null;
+    try {
+      file =
+          AsynchronousFileChannel.open(chunkFile.toPath(),
+              StandardOpenOption.READ);
+      lock = file.lock(data.getOffset(), data.getLen(), true).get();
+
+      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
+      file.read(buf, data.getOffset()).get();
+
+      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
+        verifyChecksum(data, buf.array(), log);
+      }
+
+      return buf;
+    } catch (IOException e) {
+      throw new StorageContainerException(e, IO_EXCEPTION);
+    } finally {
+      if (lock != null) {
+        try {
+          lock.release();
+        } catch (IOException e) {
+          log.error("I/O error is lock release.");
+        }
+      }
+      if (file != null) {
+        IOUtils.closeStream(file);
+      }
+    }
+  }
+
+  /**
+   * Verifies the checksum of a chunk against the data buffer.
+   *
+   * @param chunkInfo - Chunk Info.
+   * @param data - data buffer
+   * @param log - log
+   * @throws NoSuchAlgorithmException
+   * @throws StorageContainerException
+   */
+  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
+      log) throws NoSuchAlgorithmException, StorageContainerException {
+    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha.update(data);
+    if (!Hex.encodeHexString(sha.digest()).equals(
+        chunkInfo.getChecksum())) {
+      log.error("Checksum mismatch. Provided: {} , computed: {}",
+          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
+      throw new StorageContainerException("Checksum mismatch. Provided: " +
+          chunkInfo.getChecksum() + " , computed: " +
+          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
+    }
+  }
+
+  /**
+   * Validates chunk data and returns a file object to Chunk File that we are
+   * expected to write data to.
+   *
+   * @param data - container data.
+   * @param info - chunk info.
+   * @return File
+   * @throws StorageContainerException
+   */
+  public static File validateChunk(KeyValueContainerData data, ChunkInfo info)
+      throws StorageContainerException {
+
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    File chunkFile = getChunkFile(data, info);
+    if (isOverWriteRequested(chunkFile, info)) {
+      if (!isOverWritePermitted(info)) {
+        log.error("Rejecting write chunk request. Chunk overwrite " +
+            "without explicit request. {}", info.toString());
+        throw new StorageContainerException("Rejecting write chunk request. " +
+            "OverWrite flag required." + info.toString(),
+            OVERWRITE_FLAG_REQUIRED);
+      }
+    }
+    return chunkFile;
+  }
+
+  /**
+   * Validates that Path to chunk file exists.
+   *
+   * @param containerData - Container Data
+   * @param info - Chunk info
+   * @return - File.
+   * @throws StorageContainerException
+   */
+  public static File getChunkFile(KeyValueContainerData containerData,
+                                  ChunkInfo info) throws
+      StorageContainerException {
+
+    Preconditions.checkNotNull(containerData, "Container data can't be null");
+    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+    String chunksPath = containerData.getChunksPath();
+    if (chunksPath == null) {
+      log.error("Chunks path is null in the container data");
+      throw new StorageContainerException("Unable to get Chunks directory.",
+          UNABLE_TO_FIND_DATA_DIR);
+    }
+    File chunksLoc = new File(chunksPath);
+    if (!chunksLoc.exists()) {
+      log.error("Chunks path does not exist");
+      throw new StorageContainerException("Unable to get Chunks directory.",
+          UNABLE_TO_FIND_DATA_DIR);
+    }
+
+    return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
+  }
+
+  /**
+   * Checks if we are getting a request to overwrite an existing range of
+   * chunk.
+   *
+   * @param chunkFile - File
+   * @param chunkInfo - Buffer to write
+   * @return bool
+   */
+  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
+      chunkInfo) {
+
+    if (!chunkFile.exists()) {
+      return false;
+    }
+
+    long offset = chunkInfo.getOffset();
+    return offset < chunkFile.length();
+  }
+
+  /**
+   * Overwrite is permitted if an only if the user explicitly asks for it. We
+   * permit this iff the key/value pair contains a flag called
+   * [OverWriteRequested, true].
+   *
+   * @param chunkInfo - Chunk info
+   * @return true if the user asks for it.
+   */
+  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
+    String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
+    return (overWrite != null) &&
+        (!overWrite.isEmpty()) &&
+        (Boolean.valueOf(overWrite));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index 7d9f0e6..d45f598 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -20,14 +20,19 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
 import org.apache.hadoop.utils.MetadataStore;
 
 import java.io.IOException;
 
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.NO_SUCH_KEY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .Result.UNABLE_TO_READ_METADATA_DB;
 
 /**
  * Utils functions to help key functions.
@@ -79,4 +84,32 @@ public final class KeyUtils {
     cache.removeDB(container.getContainerId());
   }
 
-}
+  /**
+   * Shutdown all DB Handles.
+   *
+   * @param cache - Cache for DB Handles.
+   */
+  @SuppressWarnings("unchecked")
+  public static void shutdownCache(ContainerCache cache)  {
+    cache.shutdownCache();
+  }
+
+  /**
+   * Parses the {@link KeyData} from a bytes array.
+   *
+   * @param bytes key data in bytes.
+   * @return key data.
+   * @throws IOException if the bytes array is malformed or invalid.
+   */
+  public static KeyData getKeyData(byte[] bytes) throws IOException {
+    try {
+      ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(
+          bytes);
+      KeyData data = KeyData.getFromProtoBuf(keyData);
+      return data;
+    } catch (IOException e) {
+      throw new StorageContainerException("Failed to parse key data from the" +
+          " bytes array.", NO_SUCH_KEY);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
new file mode 100644
index 0000000..7134be1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -0,0 +1,80 @@
+package org.apache.hadoop.ozone.container.keyvalue.interfaces;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+
+/**
+ * Chunk Manager allows read, write, delete and listing of chunks in
+ * a container.
+ */
+
+public interface ChunkManager {
+
+  /**
+   * writes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block.
+   * @param info - ChunkInfo.
+   * @param stage - Chunk Stage write.
+   * @throws StorageContainerException
+   */
+  void writeChunk(Container container, BlockID blockID, ChunkInfo info,
+                  byte[] data, ContainerProtos.Stage stage)
+      throws StorageContainerException;
+
+  /**
+   * reads the data defined by a chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block.
+   * @param info - ChunkInfo.
+   * @return  byte array
+   * @throws StorageContainerException
+   *
+   * TODO: Right now we do not support partial reads and writes of chunks.
+   * TODO: Explore if we need to do that for ozone.
+   */
+  byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws
+      StorageContainerException;
+
+  /**
+   * Deletes a given chunk.
+   *
+   * @param container - Container for the chunk
+   * @param blockID - ID of the block.
+   * @param info  - Chunk Info
+   * @throws StorageContainerException
+   */
+  void deleteChunk(Container container, BlockID blockID, ChunkInfo info) throws
+      StorageContainerException;
+
+  // TODO : Support list operations.
+
+  /**
+   * Shutdown the chunkManager.
+   */
+  void shutdown();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
new file mode 100644
index 0000000..ebda97e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.interfaces;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * KeyManager is for performing key related operations on the container.
+ */
+public interface KeyManager {
+
+  /**
+   * Puts or overwrites a key.
+   *
+   * @param container - Container for which key need to be added.
+   * @param data     - Key Data.
+   * @throws IOException
+   */
+  void putKey(Container container, KeyData data) throws IOException;
+
+  /**
+   * Gets an existing key.
+   *
+   * @param container - Container from which key need to be get.
+   * @param data - Key Data.
+   * @return Key Data.
+   * @throws IOException
+   */
+  KeyData getKey(Container container, KeyData data) throws IOException;
+
+  /**
+   * Deletes an existing Key.
+   *
+   * @param container - Container from which key need to be deleted.
+   * @param blockID - ID of the block.
+   * @throws StorageContainerException
+   */
+  void deleteKey(Container container, BlockID blockID) throws IOException;
+
+  /**
+   * List keys in a container.
+   *
+   * @param container - Container from which keys need to be listed.
+   * @param startLocalID  - Key to start from, 0 to begin.
+   * @param count    - Number of keys to return.
+   * @return List of Keys that match the criteria.
+   */
+  List<KeyData> listKey(Container container, long startLocalID, int count) throws
+      IOException;
+
+  /**
+   * Shutdown ContainerManager.
+   */
+  void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
new file mode 100644
index 0000000..ca936c7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class is used to test ChunkManager operations.
+ */
+public class TestChunkManagerImpl {
+
+  private OzoneConfiguration config;
+  private String scmId = UUID.randomUUID().toString();
+  private VolumeSet volumeSet;
+  private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+  private KeyValueContainerData keyValueContainerData;
+  private KeyValueContainer keyValueContainer;
+  private KeyData keyData;
+  private BlockID blockID;
+  private ChunkManagerImpl chunkManager;
+  private ChunkInfo chunkInfo;
+  private byte[] data;
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception {
+    config = new OzoneConfiguration();
+
+    HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+        .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
+        .toString()).build();
+
+    volumeSet = mock(VolumeSet.class);
+
+    volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+    Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+        .thenReturn(hddsVolume);
+
+    keyValueContainerData = new KeyValueContainerData(
+        ContainerProtos.ContainerType.KeyValueContainer, 1L);
+
+    keyValueContainer = new KeyValueContainer(
+        keyValueContainerData, config);
+
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+    data = "testing write chunks".getBytes();
+    // Creating KeyData
+    blockID = new BlockID(1L, 1L);
+    keyData = new KeyData(blockID);
+    keyData.addMetadata("VOLUME", "ozone");
+    keyData.addMetadata("OWNER", "hdfs");
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+        .getLocalID(), 0), 0, data.length);
+    chunkList.add(chunkInfo.getProtoBufMessage());
+    keyData.setChunks(chunkList);
+
+    // Create a ChunkManager object.
+    chunkManager = new ChunkManagerImpl();
+
+  }
+
+  @Test
+  public void testWriteChunkStageWriteAndCommit() throws Exception {
+    //As in Setup, we try to create container, these paths should exist.
+    assertTrue(keyValueContainerData.getChunksPath() != null);
+    File chunksPath = new File(keyValueContainerData.getChunksPath());
+    assertTrue(chunksPath.exists());
+    // Initially chunks folder should be empty.
+    assertTrue(chunksPath.listFiles().length == 0);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+        ContainerProtos.Stage.WRITE_DATA);
+    // Now a chunk file is being written with Stage WRITE_DATA, so it should
+    // create a temporary chunk file.
+    assertTrue(chunksPath.listFiles().length == 1);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+        ContainerProtos.Stage.COMMIT_DATA);
+    // Old temp file should have been renamed to chunk file.
+    assertTrue(chunksPath.listFiles().length == 1);
+
+  }
+
+  @Test
+  public void testWriteChunkIncorrectLength() throws Exception {
+    try {
+      long randomLength = 200L;
+      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+          .getLocalID(), 0), 0, randomLength);
+      List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+      chunkList.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunkList);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+          ContainerProtos.Stage.WRITE_DATA);
+      fail("testWriteChunkIncorrectLength failed");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("data array does not match " +
+          "the length ", ex);
+      assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testWriteChunkStageCombinedData() throws Exception {
+    //As in Setup, we try to create container, these paths should exist.
+    assertTrue(keyValueContainerData.getChunksPath() != null);
+    File chunksPath = new File(keyValueContainerData.getChunksPath());
+    assertTrue(chunksPath.exists());
+    // Initially chunks folder should be empty.
+    assertTrue(chunksPath.listFiles().length == 0);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+        ContainerProtos.Stage.COMBINED);
+    // Now a chunk file is being written with Stage WRITE_DATA, so it should
+    // create a temporary chunk file.
+    assertTrue(chunksPath.listFiles().length == 1);
+  }
+
+  @Test
+  public void testReadChunk() throws Exception {
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+        ContainerProtos.Stage.COMBINED);
+    byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
+        chunkInfo);
+    assertEquals(expectedData.length, data.length);
+    assertTrue(Arrays.equals(expectedData, data));
+  }
+
+  @Test
+  public void testDeleteChunk() throws Exception {
+    File chunksPath = new File(keyValueContainerData.getChunksPath());
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+        ContainerProtos.Stage.COMBINED);
+    assertTrue(chunksPath.listFiles().length == 1);
+    chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
+    assertTrue(chunksPath.listFiles().length == 0);
+  }
+
+  @Test
+  public void testDeleteChunkUnsupportedRequest() throws Exception {
+    try {
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+          ContainerProtos.Stage.COMBINED);
+      long randomLength = 200L;
+      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+          .getLocalID(), 0), 0, randomLength);
+      List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+      chunkList.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunkList);
+      chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
+      fail("testDeleteChunkUnsupportedRequest");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Not Supported Operation.", ex);
+      assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testWriteChunkChecksumMismatch() throws Exception {
+    try {
+      chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+          .getLocalID(), 0), 0, data.length);
+      //Setting checksum to some value.
+      chunkInfo.setChecksum("some garbage");
+      List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+      chunkList.add(chunkInfo.getProtoBufMessage());
+      keyData.setChunks(chunkList);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+          ContainerProtos.Stage.COMBINED);
+      fail("testWriteChunkChecksumMismatch failed");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
+      assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult());
+    }
+  }
+
+  @Test
+  public void testReadChunkFileNotExists() throws Exception {
+    try {
+      // trying to read a chunk, where chunk file does not exist
+      byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
+          chunkInfo);
+      fail("testReadChunkFileNotExists failed");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
+          "file.", ex);
+      assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult());
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
new file mode 100644
index 0000000..a6f50c4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.volume
+    .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class is used to test key related operations on the container.
+ */
+public class TestKeyManagerImpl {
+
+  private OzoneConfiguration config;
+  private String scmId = UUID.randomUUID().toString();
+  private VolumeSet volumeSet;
+  private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+  private KeyValueContainerData keyValueContainerData;
+  private KeyValueContainer keyValueContainer;
+  private KeyData keyData;
+  private KeyManagerImpl keyValueContainerManager;
+  private BlockID blockID;
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+
+  @Before
+  public void setUp() throws Exception {
+    config = new OzoneConfiguration();
+
+    HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+        .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
+        .toString()).build();
+
+    volumeSet = mock(VolumeSet.class);
+
+    volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+    Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+        .thenReturn(hddsVolume);
+
+    keyValueContainerData = new KeyValueContainerData(
+        ContainerProtos.ContainerType.KeyValueContainer, 1L);
+
+    keyValueContainer = new KeyValueContainer(
+        keyValueContainerData, config);
+
+    keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+    // Creating KeyData
+    blockID = new BlockID(1L, 1L);
+    keyData = new KeyData(blockID);
+    keyData.addMetadata("VOLUME", "ozone");
+    keyData.addMetadata("OWNER", "hdfs");
+    List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+    ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
+        .getLocalID(), 0), 0, 1024);
+    chunkList.add(info.getProtoBufMessage());
+    keyData.setChunks(chunkList);
+
+    // Create KeyValueContainerManager
+    keyValueContainerManager = new KeyManagerImpl(config);
+
+  }
+
+  @Test
+  public void testPutAndGetKey() throws Exception {
+    //Put Key
+    keyValueContainerManager.putKey(keyValueContainer, keyData);
+
+    //Get Key
+    KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
+        keyData);
+
+    assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
+    assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
+    assertEquals(keyData.getChunks().size(), fromGetKeyData.getChunks().size());
+    assertEquals(keyData.getMetadata().size(), fromGetKeyData.getMetadata()
+        .size());
+
+  }
+
+
+  @Test
+  public void testDeleteKey() throws Exception {
+    try {
+      //Put Key
+      keyValueContainerManager.putKey(keyValueContainer, keyData);
+      //Delete Key
+      keyValueContainerManager.deleteKey(keyValueContainer, blockID);
+    } catch (IOException ex) {
+      fail("testDeleteKey failed");
+    }
+  }
+
+  @Test
+  public void testListKey() throws Exception {
+    try {
+      keyValueContainerManager.putKey(keyValueContainer, keyData);
+      List<KeyData> listKeyData = keyValueContainerManager.listKey(
+          keyValueContainer, 1, 10);
+      assertNotNull(listKeyData);
+      assertTrue(listKeyData.size() == 1);
+
+      for (long i = 2; i <= 10; i++) {
+        blockID = new BlockID(1L, i);
+        keyData = new KeyData(blockID);
+        keyData.addMetadata("VOLUME", "ozone");
+        keyData.addMetadata("OWNER", "hdfs");
+        List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+        ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
+            .getLocalID(), 0), 0, 1024);
+        chunkList.add(info.getProtoBufMessage());
+        keyData.setChunks(chunkList);
+        keyValueContainerManager.putKey(keyValueContainer, keyData);
+      }
+
+      listKeyData = keyValueContainerManager.listKey(
+          keyValueContainer, 1, 10);
+      assertNotNull(listKeyData);
+      assertTrue(listKeyData.size() == 10);
+
+    } catch (IOException ex) {
+      fail("testListKey failed");
+    }
+  }
+
+  @Test
+  public void testGetNoSuchKey() throws Exception {
+    try {
+      keyData = new KeyData(new BlockID(1L, 2L));
+      keyValueContainerManager.getKey(keyValueContainer, keyData);
+      fail("testGetNoSuchKey failed");
+    } catch (StorageContainerException ex) {
+      GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);
+      assertEquals(ContainerProtos.Result.NO_SUCH_KEY, ex.getResult());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org