You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:39 UTC

[30/37] hadoop git commit: HDDS-182:CleanUp Reimplemented classes. Contributed by Hansiha Koneru

HDDS-182:CleanUp Reimplemented classes. Contributed by Hansiha Koneru


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2c2351e8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2c2351e8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2c2351e8

Branch: refs/heads/trunk
Commit: 2c2351e87b60d3e8b50b94e9ca5ab78d7afae783
Parents: a404164
Author: Bharat Viswanadham <bh...@apache.org>
Authored: Thu Jul 5 15:23:39 2018 -0700
Committer: Bharat Viswanadham <bh...@apache.org>
Committed: Thu Jul 5 15:23:39 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   10 +
 .../container/common/helpers/ChunkUtils.java    |  343 ------
 .../container/common/helpers/ContainerData.java |  512 --------
 .../common/helpers/ContainerUtils.java          |  179 +--
 .../container/common/helpers/KeyUtils.java      |  148 ---
 .../container/common/impl/ChunkManagerImpl.java |  233 ----
 .../container/common/impl/ContainerData.java    |  131 +-
 .../common/impl/ContainerDataYaml.java          |   29 +-
 .../impl/ContainerLocationManagerImpl.java      |  158 ---
 .../common/impl/ContainerManagerImpl.java       | 1115 ------------------
 .../container/common/impl/ContainerSet.java     |   19 +-
 .../common/impl/ContainerStorageLocation.java   |  212 ----
 .../ozone/container/common/impl/Dispatcher.java |  695 -----------
 .../container/common/impl/KeyManagerImpl.java   |  204 ----
 .../RandomContainerDeletionChoosingPolicy.java  |    1 -
 ...NOrderedContainerDeletionChoosingPolicy.java |    1 -
 .../common/interfaces/ChunkManager.java         |   73 --
 .../container/common/interfaces/Container.java  |    1 -
 .../ContainerDeletionChoosingPolicy.java        |    3 +-
 .../common/interfaces/ContainerManager.java     |  267 -----
 .../container/common/interfaces/KeyManager.java |   73 --
 .../background/BlockDeletingService.java        |  247 ----
 .../statemachine/background/package-info.java   |   18 -
 .../container/common/volume/VolumeSet.java      |   13 +-
 .../container/keyvalue/KeyValueContainer.java   |   36 +-
 .../keyvalue/KeyValueContainerData.java         |   80 +-
 .../container/keyvalue/KeyValueHandler.java     |    2 +-
 .../container/keyvalue/helpers/ChunkUtils.java  |   18 +-
 .../container/keyvalue/helpers/KeyUtils.java    |   19 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java |   48 +-
 .../keyvalue/impl/ChunkManagerImpl.java         |   10 +-
 .../background/BlockDeletingService.java        |  248 ++++
 .../statemachine/background/package-info.java   |   18 +
 .../container/ozoneimpl/OzoneContainer.java     |    2 +-
 .../common/TestKeyValueContainerData.java       |    2 +-
 .../common/impl/TestContainerDataYaml.java      |    6 +-
 .../container/common/impl/TestContainerSet.java |   10 +-
 .../TestRoundRobinVolumeChoosingPolicy.java     |    6 +-
 .../container/common/volume/TestVolumeSet.java  |    6 +-
 .../keyvalue/TestKeyValueContainer.java         |   20 +-
 .../container/keyvalue/TestKeyValueHandler.java |   11 +-
 .../container/ozoneimpl/TestOzoneContainer.java |    2 +-
 .../testutils/BlockDeletingServiceTestImpl.java |    9 +-
 .../test/resources/additionalfields.container   |    2 +-
 .../src/test/resources/incorrect.container      |    2 +-
 .../ozone/container/ContainerTestHelper.java    |    2 +
 .../common/TestBlockDeletingService.java        |  118 +-
 .../TestContainerDeletionChoosingPolicy.java    |   63 +-
 .../common/impl/TestContainerPersistence.java   |  478 ++++----
 .../container/metrics/TestContainerMetrics.java |    3 +-
 .../container/server/TestContainerServer.java   |   14 +-
 .../org/apache/hadoop/ozone/scm/TestSCMCli.java |   44 +-
 .../genesis/BenchMarkDatanodeDispatcher.java    |   24 +-
 53 files changed, 943 insertions(+), 5045 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 27aa6ee..82d67b7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -183,4 +183,14 @@ public final class OzoneConsts {
   private OzoneConsts() {
     // Never Constructed
   }
+
+  // YAML fields for .container files
+  public static final String CONTAINER_ID = "containerID";
+  public static final String CONTAINER_TYPE = "containerType";
+  public static final String STATE = "state";
+  public static final String METADATA = "metadata";
+  public static final String MAX_SIZE_GB = "maxSizeGB";
+  public static final String METADATA_PATH = "metadataPath";
+  public static final String CHUNKS_PATH = "chunksPath";
+  public static final String CONTAINER_DB_TYPE = "containerDBType";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
deleted file mode 100644
index e0bf213..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.StandardOpenOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CHECKSUM_MISMATCH;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_WRITE_SIZE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.OVERWRITE_FLAG_REQUIRED;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_FIND_CHUNK;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_FIND_DATA_DIR;
-
-/**
- * Set of utility functions used by the chunk Manager.
- */
-public final class ChunkUtils {
-
-  /* Never constructed. */
-  private ChunkUtils() {
-  }
-
-  /**
-   * Checks if we are getting a request to overwrite an existing range of
-   * chunk.
-   *
-   * @param chunkFile - File
-   * @param chunkInfo - Buffer to write
-   * @return bool
-   */
-  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
-      chunkInfo) {
-
-    if (!chunkFile.exists()) {
-      return false;
-    }
-
-    long offset = chunkInfo.getOffset();
-    return offset < chunkFile.length();
-  }
-
-  /**
-   * Overwrite is permitted if an only if the user explicitly asks for it. We
-   * permit this iff the key/value pair contains a flag called
-   * [OverWriteRequested, true].
-   *
-   * @param chunkInfo - Chunk info
-   * @return true if the user asks for it.
-   */
-  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
-    String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
-    return (overWrite != null) &&
-        (!overWrite.isEmpty()) &&
-        (Boolean.valueOf(overWrite));
-  }
-
-  /**
-   * Validates chunk data and returns a file object to Chunk File that we are
-   * expected to write data to.
-   *
-   * @param data - container data.
-   * @param info - chunk info.
-   * @return File
-   * @throws StorageContainerException
-   */
-  public static File validateChunk(ContainerData data,
-      ChunkInfo info) throws StorageContainerException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    File chunkFile = getChunkFile(data, info);
-    if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
-      if (!ChunkUtils.isOverWritePermitted(info)) {
-        log.error("Rejecting write chunk request. Chunk overwrite " +
-            "without explicit request. {}", info.toString());
-        throw new StorageContainerException("Rejecting write chunk request. " +
-            "OverWrite flag required." + info.toString(),
-            OVERWRITE_FLAG_REQUIRED);
-      }
-    }
-    return chunkFile;
-  }
-
-  /**
-   * Validates that Path to chunk file exists.
-   *
-   * @param data - Container Data
-   * @param info - Chunk info
-   * @return - File.
-   * @throws StorageContainerException
-   */
-  public static File getChunkFile(ContainerData data,
-      ChunkInfo info) throws StorageContainerException {
-
-    Preconditions.checkNotNull(data, "Container data can't be null");
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data.getContainerID() < 0) {
-      log.error("Invalid container id: {}", data.getContainerID());
-      throw new StorageContainerException("Unable to find the container id:" +
-          " " +
-          data.getContainerID(), CONTAINER_NOT_FOUND);
-    }
-
-    File dataDir = ContainerUtils.getDataDirectory(data).toFile();
-    if (!dataDir.exists()) {
-      log.error("Unable to find the data directory: {}", dataDir);
-      throw new StorageContainerException("Unable to find the data directory:" +
-          " " + dataDir, UNABLE_TO_FIND_DATA_DIR);
-    }
-
-    return dataDir.toPath().resolve(info.getChunkName()).toFile();
-
-  }
-
-  /**
-   * Writes the data in chunk Info to the specified location in the chunkfile.
-   *
-   * @param chunkFile - File to write data to.
-   * @param chunkInfo - Data stream to write.
-   * @param data - The data buffer.
-   * @throws StorageContainerException
-   */
-  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
-      byte[] data) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data.length != chunkInfo.getLen()) {
-      String err = String.format("data array does not match the length " +
-              "specified. DataLen: %d Byte Array: %d",
-          chunkInfo.getLen(), data.length);
-      log.error(err);
-      throw new StorageContainerException(err, INVALID_WRITE_SIZE);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-
-    try {
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.CREATE,
-              StandardOpenOption.WRITE,
-              StandardOpenOption.SPARSE,
-              StandardOpenOption.SYNC);
-      lock = file.lock().get();
-      if (chunkInfo.getChecksum() != null &&
-          !chunkInfo.getChecksum().isEmpty()) {
-        verifyChecksum(chunkInfo, data, log);
-      }
-      int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
-      if (size != data.length) {
-        log.error("Invalid write size found. Size:{}  Expected: {} ", size,
-            data.length);
-        throw new StorageContainerException("Invalid write size found. " +
-            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
-      }
-    } catch (IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("Unable to release lock ??, Fatal Error.");
-          throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
-
-        }
-      }
-      if (file != null) {
-        try {
-          file.close();
-        } catch (IOException e) {
-          throw new StorageContainerException("Error closing chunk file",
-              e, CONTAINER_INTERNAL_ERROR);
-        }
-      }
-    }
-  }
-
-  /**
-   * Verifies the checksum of a chunk against the data buffer.
-   *
-   * @param chunkInfo - Chunk Info.
-   * @param data - data buffer
-   * @param log - log
-   * @throws NoSuchAlgorithmException
-   * @throws StorageContainerException
-   */
-  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
-      log) throws NoSuchAlgorithmException, StorageContainerException {
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha.update(data);
-    if (!Hex.encodeHexString(sha.digest()).equals(
-        chunkInfo.getChecksum())) {
-      log.error("Checksum mismatch. Provided: {} , computed: {}",
-          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
-      throw new StorageContainerException("Checksum mismatch. Provided: " +
-          chunkInfo.getChecksum() + " , computed: " +
-          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
-    }
-  }
-
-  /**
-   * Reads data from an existing chunk file.
-   *
-   * @param chunkFile - file where data lives.
-   * @param data - chunk definition.
-   * @return ByteBuffer
-   * @throws StorageContainerException
-   * @throws ExecutionException
-   * @throws InterruptedException
-   */
-  public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    if (!chunkFile.exists()) {
-      log.error("Unable to find the chunk file. chunk info : {}",
-          data.toString());
-      throw new StorageContainerException("Unable to find the chunk file. " +
-          "chunk info " +
-          data.toString(), UNABLE_TO_FIND_CHUNK);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-    try {
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.READ);
-      lock = file.lock(data.getOffset(), data.getLen(), true).get();
-
-      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
-      file.read(buf, data.getOffset()).get();
-
-      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
-        verifyChecksum(data, buf.array(), log);
-      }
-
-      return buf;
-    } catch (IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("I/O error is lock release.");
-        }
-      }
-      if (file != null) {
-        IOUtils.closeStream(file);
-      }
-    }
-  }
-
-  /**
-   * Returns a CreateContainer Response. This call is used by create and delete
-   * containers which have null success responses.
-   *
-   * @param msg Request
-   * @return Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-  /**
-   * Gets a response to the read chunk calls.
-   *
-   * @param msg - Msg
-   * @param data - Data
-   * @param info - Info
-   * @return Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      byte[] data, ChunkInfo info) {
-    Preconditions.checkNotNull(msg);
-
-    ContainerProtos.ReadChunkResponseProto.Builder response =
-        ContainerProtos.ReadChunkResponseProto.newBuilder();
-    response.setChunkData(info.getProtoBufMessage());
-    response.setData(ByteString.copyFrom(data));
-    response.setBlockID(msg.getReadChunk().getBlockID());
-
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setReadChunk(response);
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
deleted file mode 100644
index 5767f76..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerLifeCycleState;
-import org.apache.hadoop.ozone.OzoneConsts;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.lang.Math.max;
-
-/**
- * This class maintains the information about a container in the ozone world.
- * <p>
- * A container is a name, along with metadata- which is a set of key value
- * pair.
- */
-public class ContainerData {
-
-  private final Map<String, String> metadata;
-  private String dbPath;  // Path to Level DB Store.
-  // Path to Physical file system where container and checksum are stored.
-  private String containerFilePath;
-  private AtomicLong bytesUsed;
-  private long maxSize;
-  private long containerID;
-  private ContainerLifeCycleState state;
-  private ContainerType containerType;
-  private String containerDBType;
-
-
-  /**
-   * Number of pending deletion blocks in container.
-   */
-  private int numPendingDeletionBlocks;
-  private long deleteTransactionId;
-  private AtomicLong readBytes;
-  private AtomicLong writeBytes;
-  private AtomicLong readCount;
-  private AtomicLong writeCount;
-
-
-  /**
-   * Constructs a  ContainerData Object.
-   *
-   * @param containerID - ID
-   * @param conf - Configuration
-   */
-  public ContainerData(long containerID,
-      Configuration conf) {
-    this.metadata = new TreeMap<>();
-    this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
-        ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
-    this.bytesUsed =  new AtomicLong(0L);
-    this.containerID = containerID;
-    this.state = ContainerLifeCycleState.OPEN;
-    this.numPendingDeletionBlocks = 0;
-    this.deleteTransactionId = 0;
-    this.readCount = new AtomicLong(0L);
-    this.readBytes =  new AtomicLong(0L);
-    this.writeCount =  new AtomicLong(0L);
-    this.writeBytes =  new AtomicLong(0L);
-  }
-
-  /**
-   * Constructs a  ContainerData Object.
-   *
-   * @param containerID - ID
-   * @param conf - Configuration
-   * @param state - ContainerLifeCycleState
-   * @param
-   */
-  public ContainerData(long containerID, Configuration conf,
-                       ContainerLifeCycleState state) {
-    this.metadata = new TreeMap<>();
-    this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
-        ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
-    this.bytesUsed =  new AtomicLong(0L);
-    this.containerID = containerID;
-    this.state = state;
-    this.numPendingDeletionBlocks = 0;
-    this.deleteTransactionId = 0;
-    this.readCount = new AtomicLong(0L);
-    this.readBytes =  new AtomicLong(0L);
-    this.writeCount =  new AtomicLong(0L);
-    this.writeBytes =  new AtomicLong(0L);
-  }
-
-  /**
-   * Constructs a ContainerData object from ProtoBuf classes.
-   *
-   * @param protoData - ProtoBuf Message
-   * @throws IOException
-   */
-  public static ContainerData getFromProtBuf(
-      ContainerProtos.ContainerData protoData, Configuration conf)
-      throws IOException {
-    ContainerData data = new ContainerData(
-        protoData.getContainerID(), conf);
-    for (int x = 0; x < protoData.getMetadataCount(); x++) {
-      data.addMetadata(protoData.getMetadata(x).getKey(),
-          protoData.getMetadata(x).getValue());
-    }
-
-    if (protoData.hasContainerPath()) {
-      data.setContainerPath(protoData.getContainerPath());
-    }
-
-    if (protoData.hasDbPath()) {
-      data.setDBPath(protoData.getDbPath());
-    }
-
-    if (protoData.hasState()) {
-      data.setState(protoData.getState());
-    }
-
-    if (protoData.hasBytesUsed()) {
-      data.setBytesUsed(protoData.getBytesUsed());
-    }
-
-    if (protoData.hasSize()) {
-      data.setMaxSize(protoData.getSize());
-    }
-
-    if(protoData.hasContainerType()) {
-      data.setContainerType(protoData.getContainerType());
-    }
-
-    if(protoData.hasContainerDBType()) {
-      data.setContainerDBType(protoData.getContainerDBType());
-    }
-
-    return data;
-  }
-
-  public String getContainerDBType() {
-    return containerDBType;
-  }
-
-  public void setContainerDBType(String containerDBType) {
-    this.containerDBType = containerDBType;
-  }
-
-  /**
-   * Returns a ProtoBuf Message from ContainerData.
-   *
-   * @return Protocol Buffer Message
-   */
-  public ContainerProtos.ContainerData getProtoBufMessage() {
-    ContainerProtos.ContainerData.Builder builder = ContainerProtos
-        .ContainerData.newBuilder();
-    builder.setContainerID(this.getContainerID());
-
-    if (this.getDBPath() != null) {
-      builder.setDbPath(this.getDBPath());
-    }
-
-    if (this.getContainerPath() != null) {
-      builder.setContainerPath(this.getContainerPath());
-    }
-
-    builder.setState(this.getState());
-
-    for (Map.Entry<String, String> entry : metadata.entrySet()) {
-      ContainerProtos.KeyValue.Builder keyValBuilder =
-          ContainerProtos.KeyValue.newBuilder();
-      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
-          .setValue(entry.getValue()).build());
-    }
-
-    if (this.getBytesUsed() >= 0) {
-      builder.setBytesUsed(this.getBytesUsed());
-    }
-
-    if (this.getKeyCount() >= 0) {
-      builder.setKeyCount(this.getKeyCount());
-    }
-
-    if (this.getMaxSize() >= 0) {
-      builder.setSize(this.getMaxSize());
-    }
-
-    if(this.getContainerType() != null) {
-      builder.setContainerType(containerType);
-    }
-
-    if(this.getContainerDBType() != null) {
-      builder.setContainerDBType(containerDBType);
-    }
-
-    return builder.build();
-  }
-
-  public void setContainerType(ContainerType containerType) {
-    this.containerType = containerType;
-  }
-
-  public ContainerType getContainerType() {
-    return this.containerType;
-  }
-  /**
-   * Adds metadata.
-   */
-  public void addMetadata(String key, String value) throws IOException {
-    synchronized (this.metadata) {
-      if (this.metadata.containsKey(key)) {
-        throw new IOException("This key already exists. Key " + key);
-      }
-      metadata.put(key, value);
-    }
-  }
-
-  /**
-   * Returns all metadata.
-   */
-  public Map<String, String> getAllMetadata() {
-    synchronized (this.metadata) {
-      return Collections.unmodifiableMap(this.metadata);
-    }
-  }
-
-  /**
-   * Returns value of a key.
-   */
-  public String getValue(String key) {
-    synchronized (this.metadata) {
-      return metadata.get(key);
-    }
-  }
-
-  /**
-   * Deletes a metadata entry from the map.
-   *
-   * @param key - Key
-   */
-  public void deleteKey(String key) {
-    synchronized (this.metadata) {
-      metadata.remove(key);
-    }
-  }
-
-  /**
-   * Returns path.
-   *
-   * @return - path
-   */
-  public String getDBPath() {
-    return dbPath;
-  }
-
-  /**
-   * Sets path.
-   *
-   * @param path - String.
-   */
-  public void setDBPath(String path) {
-    this.dbPath = path;
-  }
-
-  /**
-   * This function serves as the generic key for ContainerCache class. Both
-   * ContainerData and ContainerKeyData overrides this function to appropriately
-   * return the right name that can  be used in ContainerCache.
-   *
-   * @return String Name.
-   */
-    // TODO: check the ContainerCache class to see if
-  // we are using the ContainerID instead.
-   /*
-   public String getName() {
-    return getContainerID();
-  }*/
-
-  /**
-   * Get container file path.
-   * @return - Physical path where container file and checksum is stored.
-   */
-  public String getContainerPath() {
-    return containerFilePath;
-  }
-
-  /**
-   * Set container Path.
-   * @param containerPath - File path.
-   */
-  public void setContainerPath(String containerPath) {
-    this.containerFilePath = containerPath;
-  }
-
-  /**
-   * Get container ID.
-   * @return - container ID.
-   */
-  public synchronized long getContainerID() {
-    return containerID;
-  }
-
-  public synchronized void setState(ContainerLifeCycleState state) {
-    this.state = state;
-  }
-
-  public synchronized ContainerLifeCycleState getState() {
-    return this.state;
-  }
-
-  /**
-   * checks if the container is open.
-   * @return - boolean
-   */
-  public synchronized  boolean isOpen() {
-    return ContainerLifeCycleState.OPEN == state;
-  }
-
-  /**
-   * checks if the container is invalid.
-   * @return - boolean
-   */
-  public boolean isValid() {
-    return !(ContainerLifeCycleState.INVALID == state);
-  }
-
-  /**
-   * checks if the container is closed.
-   * @return - boolean
-   */
-  public synchronized  boolean isClosed() {
-    return ContainerLifeCycleState.CLOSED == state;
-  }
-
-  /**
-   * Marks this container as closed.
-   */
-  public synchronized void closeContainer() {
-    // TODO: closed or closing here
-    setState(ContainerLifeCycleState.CLOSED);
-
-  }
-
-  public void setMaxSize(long maxSize) {
-    this.maxSize = maxSize;
-  }
-
-  public long getMaxSize() {
-    return maxSize;
-  }
-
-  public long getKeyCount() {
-    return metadata.size();
-  }
-
-  public void setBytesUsed(long used) {
-    this.bytesUsed.set(used);
-  }
-
-  /**
-   * Get the number of bytes used by the container.
-   * @return the number of bytes used by the container.
-   */
-  public long getBytesUsed() {
-    return bytesUsed.get();
-  }
-
-  /**
-   * Increase the number of bytes used by the container.
-   * @param used number of bytes used by the container.
-   * @return the current number of bytes used by the container afert increase.
-   */
-  public long incrBytesUsed(long used) {
-    return this.bytesUsed.addAndGet(used);
-  }
-
-
-  /**
-   * Decrease the number of bytes used by the container.
-   * @param reclaimed the number of bytes reclaimed from the container.
-   * @return the current number of bytes used by the container after decrease.
-   */
-  public long decrBytesUsed(long reclaimed) {
-    return this.bytesUsed.addAndGet(-1L * reclaimed);
-  }
-
-  /**
-   * Increase the count of pending deletion blocks.
-   *
-   * @param numBlocks increment number
-   */
-  public void incrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks += numBlocks;
-  }
-
-  /**
-   * Decrease the count of pending deletion blocks.
-   *
-   * @param numBlocks decrement number
-   */
-  public void decrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks -= numBlocks;
-  }
-
-  /**
-   * Get the number of pending deletion blocks.
-   */
-  public int getNumPendingDeletionBlocks() {
-    return this.numPendingDeletionBlocks;
-  }
-
-  /**
-   * Sets deleteTransactionId to latest delete transactionId for the container.
-   *
-   * @param transactionId latest transactionId of the container.
-   */
-  public void updateDeleteTransactionId(long transactionId) {
-    deleteTransactionId = max(transactionId, deleteTransactionId);
-  }
-
-  /**
-   * Return the latest deleteTransactionId of the container.
-   */
-  public long getDeleteTransactionId() {
-    return deleteTransactionId;
-  }
-
-  /**
-   * Get the number of bytes read from the container.
-   * @return the number of bytes read from the container.
-   */
-  public long getReadBytes() {
-    return readBytes.get();
-  }
-
-  /**
-   * Increase the number of bytes read from the container.
-   * @param bytes number of bytes read.
-   */
-  public void incrReadBytes(long bytes) {
-    this.readBytes.addAndGet(bytes);
-  }
-
-  /**
-   * Get the number of times the container is read.
-   * @return the number of times the container is read.
-   */
-  public long getReadCount() {
-    return readCount.get();
-  }
-
-  /**
-   * Increase the number of container read count by 1.
-   */
-  public void incrReadCount() {
-    this.readCount.incrementAndGet();
-  }
-
-  /**
-   * Get the number of bytes write into the container.
-   * @return the number of bytes write into the container.
-   */
-  public long getWriteBytes() {
-    return writeBytes.get();
-  }
-
-  /**
-   * Increase the number of bytes write into the container.
-   * @param bytes the number of bytes write into the container.
-   */
-  public void incrWriteBytes(long bytes) {
-    this.writeBytes.addAndGet(bytes);
-  }
-
-  /**
-   * Get the number of writes into the container.
-   * @return the number of writes into the container.
-   */
-  public long getWriteCount() {
-    return writeCount.get();
-  }
-
-  /**
-   * Increase the number of writes into the container by 1.
-   */
-  public void incrWriteCount() {
-    this.writeCount.incrementAndGet();
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index b975217..18a5231 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.ozone.container.common.helpers;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -34,10 +32,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,14 +45,6 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 
 import static org.apache.commons.io.FilenameUtils.removeExtension;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.SUCCESS;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_FIND_DATA_DIR;
 
 /**
  * A set of helper functions to create proper responses.
@@ -203,30 +191,17 @@ public final class ContainerUtils {
    * @param containerFile - Container File to verify
    * @throws IOException
    */
-  public static void verifyIsNewContainer(File containerFile)
-      throws IOException {
-    Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
-    if (containerFile.exists()) {
-      log.error("container already exists on disk. File: {}",
-          containerFile.toPath());
+  public static void verifyIsNewContainer(File containerFile) throws
+      FileAlreadyExistsException {
+    Logger log = LoggerFactory.getLogger(ContainerSet.class);
+    Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
+        "null");
+    if (containerFile.getParentFile().exists()) {
+      log.error("Container already exists on disk. File: {}", containerFile
+          .toPath());
       throw new FileAlreadyExistsException("container already exists on " +
           "disk.");
     }
-
-    File parentPath = new File(containerFile.getParent());
-
-    if (!parentPath.exists() && !parentPath.mkdirs()) {
-      log.error("Unable to create parent path. Path: {}",
-          parentPath.toString());
-      throw new IOException("Unable to create container directory.");
-    }
-
-    if (!containerFile.createNewFile()) {
-      log.error("creation of a new container file failed. File: {}",
-          containerFile.toPath());
-      throw new IOException("creation of a new container file failed.");
-    }
-
   }
 
   public static String getContainerDbFileName(String containerName) {
@@ -234,53 +209,6 @@ public final class ContainerUtils {
   }
 
   /**
-   * creates a Metadata DB for the specified container.
-   *
-   * @param containerPath - Container Path.
-   * @throws IOException
-   */
-  public static Path createMetadata(Path containerPath, String containerName,
-      Configuration conf)
-      throws IOException {
-    Logger log = LoggerFactory.getLogger(ContainerManagerImpl.class);
-    Preconditions.checkNotNull(containerPath);
-    Path metadataPath = containerPath.resolve(OzoneConsts.CONTAINER_META_PATH);
-    if (!metadataPath.toFile().mkdirs()) {
-      log.error("Unable to create directory for metadata storage. Path: {}",
-          metadataPath);
-      throw new IOException("Unable to create directory for metadata storage." +
-          " Path: " + metadataPath);
-    }
-    MetadataStore store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setCreateIfMissing(true)
-        .setDbFile(metadataPath
-            .resolve(getContainerDbFileName(containerName)).toFile())
-        .build();
-
-    // we close since the SCM pre-creates containers.
-    // we will open and put Db handle into a cache when keys are being created
-    // in a container.
-
-    store.close();
-
-    Path dataPath = containerPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
-    if (!dataPath.toFile().mkdirs()) {
-
-      // If we failed to create data directory, we cleanup the
-      // metadata directory completely. That is, we will delete the
-      // whole directory including LevelDB file.
-      log.error("Unable to create directory for data storage. cleaning up the" +
-              " container path: {} dataPath: {}",
-          containerPath, dataPath);
-      FileUtils.deleteDirectory(containerPath.toFile());
-      throw new IOException("Unable to create directory for data storage." +
-          " Path: " + dataPath);
-    }
-    return metadataPath;
-  }
-
-  /**
    * Returns container file location.
    *
    * @param containerData - Data
@@ -295,93 +223,6 @@ public final class ContainerUtils {
   }
 
   /**
-   * Container metadata directory -- here is where the level DB lives.
-   *
-   * @param cData - cData.
-   * @return Path to the parent directory where the DB lives.
-   */
-  public static Path getMetadataDirectory(ContainerData cData) {
-    Path dbPath = Paths.get(cData.getDBPath());
-    Preconditions.checkNotNull(dbPath);
-    Preconditions.checkState(dbPath.toString().length() > 0);
-    return dbPath.getParent();
-  }
-
-  /**
-   * Returns the path where data or chunks live for a given container.
-   *
-   * @param cData - cData container
-   * @return - Path
-   * @throws StorageContainerException
-   */
-  public static Path getDataDirectory(ContainerData cData)
-      throws StorageContainerException {
-    Path path = getMetadataDirectory(cData);
-    Preconditions.checkNotNull(path);
-    Path parentPath = path.getParent();
-    if (parentPath == null) {
-      throw new StorageContainerException("Unable to get Data directory."
-          + path, UNABLE_TO_FIND_DATA_DIR);
-    }
-    return parentPath.resolve(OzoneConsts.CONTAINER_DATA_PATH);
-  }
-
-  /**
-   * remove Container if it is empty.
-   * <p/>
-   * There are three things we need to delete.
-   * <p/>
-   * 1. Container file and metadata file. 2. The Level DB file 3. The path that
-   * we created on the data location.
-   *
-   * @param containerData - Data of the container to remove.
-   * @param conf - configuration of the cluster.
-   * @param forceDelete - whether this container should be deleted forcibly.
-   * @throws IOException
-   */
-  public static void removeContainer(ContainerData containerData,
-      Configuration conf, boolean forceDelete) throws IOException {
-    Preconditions.checkNotNull(containerData);
-    Path dbPath = Paths.get(containerData.getDBPath());
-
-    MetadataStore db = KeyUtils.getDB(containerData, conf);
-    // If the container is not empty and cannot be deleted forcibly,
-    // then throw a SCE to stop deleting.
-    if(!forceDelete && !db.isEmpty()) {
-      throw new StorageContainerException(
-          "Container cannot be deleted because it is not empty.",
-          Result.ERROR_CONTAINER_NOT_EMPTY);
-    }
-    // Close the DB connection and remove the DB handler from cache
-    KeyUtils.removeDB(containerData, conf);
-
-    // Delete the DB File.
-    FileUtils.forceDelete(dbPath.toFile());
-    dbPath = dbPath.getParent();
-
-    // Delete all Metadata in the Data directories for this containers.
-    if (dbPath != null) {
-      FileUtils.deleteDirectory(dbPath.toFile());
-      dbPath = dbPath.getParent();
-    }
-
-    // now delete the container directory, this means that all key data dirs
-    // will be removed too.
-    if (dbPath != null) {
-      FileUtils.deleteDirectory(dbPath.toFile());
-    }
-
-    // Delete the container metadata from the metadata locations.
-    String rootPath = getContainerNameFromFile(new File(containerData
-        .getContainerPath()));
-    Path containerPath = Paths.get(rootPath.concat(CONTAINER_EXTENSION));
-
-
-    FileUtils.forceDelete(containerPath.toFile());
-
-  }
-
-  /**
    * Persistent a {@link DatanodeDetails} to a local file.
    *
    * @throws IOException when read/write error occurs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
deleted file mode 100644
index a710864..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/KeyUtils.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_KEY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_READ_METADATA_DB;
-
-/**
- * Utils functions to help key functions.
- */
-public final class KeyUtils {
-  public static final String ENCODING_NAME = "UTF-8";
-  public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
-
-  /**
-   * Never Constructed.
-   */
-  private KeyUtils() {
-  }
-
-  /**
-   * Get a DB handler for a given container.
-   * If the handler doesn't exist in cache yet, first create one and
-   * add into cache. This function is called with containerManager
-   * ReadLock held.
-   *
-   * @param container container.
-   * @param conf configuration.
-   * @return MetadataStore handle.
-   * @throws StorageContainerException
-   */
-  public static MetadataStore getDB(ContainerData container,
-      Configuration conf) throws StorageContainerException {
-    Preconditions.checkNotNull(container);
-    ContainerCache cache = ContainerCache.getInstance(conf);
-    Preconditions.checkNotNull(cache);
-    try {
-      return cache.getDB(container.getContainerID(), container
-          .getContainerDBType(), container.getDBPath());
-    } catch (IOException ex) {
-      String message =
-          String.format("Unable to open DB. DB Name: %s, Path: %s. ex: %s",
-          container.getContainerID(), container.getDBPath(), ex.getMessage());
-      throw new StorageContainerException(message, UNABLE_TO_READ_METADATA_DB);
-    }
-  }
-
-  /**
-   * Remove a DB handler from cache.
-   *
-   * @param container - Container data.
-   * @param conf - Configuration.
-   */
-  public static void removeDB(ContainerData container,
-      Configuration conf) {
-    Preconditions.checkNotNull(container);
-    ContainerCache cache = ContainerCache.getInstance(conf);
-    Preconditions.checkNotNull(cache);
-    cache.removeDB(container.getContainerID());
-  }
-  /**
-   * Shutdown all DB Handles.
-   *
-   * @param cache - Cache for DB Handles.
-   */
-  @SuppressWarnings("unchecked")
-  public static void shutdownCache(ContainerCache cache)  {
-    cache.shutdownCache();
-  }
-
-  /**
-   * Returns successful keyResponse.
-   * @param msg - Request.
-   * @return Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getKeyResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-
-  public static ContainerProtos.ContainerCommandResponseProto
-      getKeyDataResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      KeyData data) {
-    ContainerProtos.GetKeyResponseProto.Builder getKey = ContainerProtos
-        .GetKeyResponseProto.newBuilder();
-    getKey.setKeyData(data.getProtoBufMessage());
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getSuccessResponseBuilder(msg);
-    builder.setGetKey(getKey);
-    return  builder.build();
-  }
-
-  /**
-   * Parses the key name from a bytes array.
-   * @param bytes key name in bytes.
-   * @return key name string.
-   */
-  public static String getKeyName(byte[] bytes) {
-    return new String(bytes, ENCODING);
-  }
-
-  /**
-   * Parses the {@link KeyData} from a bytes array.
-   *
-   * @param bytes key data in bytes.
-   * @return key data.
-   * @throws IOException if the bytes array is malformed or invalid.
-   */
-  public static KeyData getKeyData(byte[] bytes) throws IOException {
-    try {
-      ContainerProtos.KeyData kd = ContainerProtos.KeyData.parseFrom(bytes);
-      KeyData data = KeyData.getFromProtoBuf(kd);
-      return data;
-    } catch (IOException e) {
-      throw new StorageContainerException("Failed to parse key data from the" +
-              " bytes array.", NO_SUCH_KEY);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
deleted file mode 100644
index fa82026..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNSUPPORTED_REQUEST;
-
-/**
- * An implementation of ChunkManager that is used by default in ozone.
- */
-public class ChunkManagerImpl implements ChunkManager {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-  private final ContainerManager containerManager;
-
-  /**
-   * Constructs a ChunkManager.
-   *
-   * @param manager - ContainerManager.
-   */
-  public ChunkManagerImpl(ContainerManager manager) {
-    this.containerManager = manager;
-  }
-
-  /**
-   * writes a given chunk.
-   *
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void writeChunk(BlockID blockID, ChunkInfo info,
-      byte[] data, ContainerProtos.Stage stage)
-      throws StorageContainerException {
-    // we don't want container manager to go away while we are writing chunks.
-    containerManager.readLock();
-
-    // TODO : Take keyManager Write lock here.
-    try {
-      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-      long containerID = blockID.getContainerID();
-      Preconditions.checkState(containerID >= 0,
-          "Container ID cannot be negative");
-      ContainerData container =
-          containerManager.readContainer(containerID);
-      File chunkFile = ChunkUtils.validateChunk(container, info);
-      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
-
-      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
-          info.getChunkName(), stage, chunkFile, tmpChunkFile);
-      switch (stage) {
-      case WRITE_DATA:
-        ChunkUtils.writeData(tmpChunkFile, info, data);
-        break;
-      case COMMIT_DATA:
-        commitChunk(tmpChunkFile, chunkFile, containerID, info.getLen());
-        break;
-      case COMBINED:
-        // directly write to the chunk file
-        long oldSize = chunkFile.length();
-        ChunkUtils.writeData(chunkFile, info, data);
-        long newSize = chunkFile.length();
-        containerManager.incrBytesUsed(containerID, newSize - oldSize);
-        containerManager.incrWriteCount(containerID);
-        containerManager.incrWriteBytes(containerID, info.getLen());
-        break;
-      default:
-        throw new IOException("Can not identify write operation.");
-      }
-    } catch (ExecutionException | NoSuchAlgorithmException | IOException e) {
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  // Create a temporary file in the same container directory
-  // in the format "<chunkname>.tmp"
-  private static File getTmpChunkFile(File chunkFile, ChunkInfo info)
-      throws StorageContainerException {
-    return new File(chunkFile.getParent(),
-        chunkFile.getName() +
-            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
-  }
-
-  // Commit the chunk by renaming the temporary chunk file to chunk file
-  private void commitChunk(File tmpChunkFile, File chunkFile,
-      long containerID, long chunkLen) throws IOException {
-    long sizeDiff = tmpChunkFile.length() - chunkFile.length();
-    // It is safe to replace here as the earlier chunk if existing should be
-    // caught as part of validateChunk
-    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
-        StandardCopyOption.REPLACE_EXISTING);
-    containerManager.incrBytesUsed(containerID, sizeDiff);
-    containerManager.incrWriteCount(containerID);
-    containerManager.incrWriteBytes(containerID, chunkLen);
-  }
-
-  /**
-   * reads the data defined by a chunk.
-   *
-   * @param blockID - ID of the block.
-   * @param info - ChunkInfo.
-   * @return byte array
-   * @throws StorageContainerException
-   * TODO: Right now we do not support partial reads and writes of chunks.
-   * TODO: Explore if we need to do that for ozone.
-   */
-  @Override
-  public byte[] readChunk(BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    containerManager.readLock();
-    try {
-      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-      long containerID = blockID.getContainerID();
-      Preconditions.checkState(containerID >= 0,
-          "Container ID cannot be negative");
-      ContainerData container =
-          containerManager.readContainer(containerID);
-      File chunkFile = ChunkUtils.getChunkFile(container, info);
-      ByteBuffer data =  ChunkUtils.readData(chunkFile, info);
-      containerManager.incrReadCount(containerID);
-      containerManager.incrReadBytes(containerID, chunkFile.length());
-      return data.array();
-    } catch (ExecutionException | NoSuchAlgorithmException e) {
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * Deletes a given chunk.
-   *
-   * @param blockID - ID of the block.
-   * @param info - Chunk Info
-   * @throws StorageContainerException
-   */
-  @Override
-  public void deleteChunk(BlockID blockID, ChunkInfo info)
-      throws StorageContainerException {
-    containerManager.readLock();
-    try {
-      Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
-      long containerID = blockID.getContainerID();
-      Preconditions.checkState(containerID >= 0,
-          "Container ID cannot be negative");
-
-      File chunkFile = ChunkUtils.getChunkFile(containerManager
-          .readContainer(containerID), info);
-      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
-        FileUtil.fullyDelete(chunkFile);
-        containerManager.decrBytesUsed(containerID, chunkFile.length());
-      } else {
-        LOG.error("Not Supported Operation. Trying to delete a " +
-            "chunk that is in shared file. chunk info : " + info.toString());
-        throw new StorageContainerException("Not Supported Operation. " +
-            "Trying to delete a chunk that is in shared file. chunk info : "
-            + info.toString(), UNSUPPORTED_REQUEST);
-      }
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * Shutdown the chunkManager.
-   *
-   * In the chunkManager we haven't acquired any resources, so nothing to do
-   * here. This call is made with containerManager Writelock held.
-   */
-  @Override
-  public void shutdown() {
-    Preconditions.checkState(this.containerManager.hasWriteLock());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 238fb09..5638b60 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.common.impl;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerType;
@@ -28,6 +29,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -41,7 +43,10 @@ public class ContainerData {
   private final ContainerType containerType;
 
   // Unique identifier for the container
-  private final long containerId;
+  private final long containerID;
+
+  // Path to container root dir.
+  private String containerPath;
 
   // Layout version of the container data
   private final int layOutVersion;
@@ -65,6 +70,10 @@ public class ContainerData {
 
   private HddsVolume volume;
 
+  /**
+   * Number of pending deletion blocks in container.
+   */
+  private final AtomicInteger numPendingDeletionBlocks;
 
   /**
    * Creates a ContainerData Object, which holds metadata of the container.
@@ -73,18 +82,8 @@ public class ContainerData {
    * @param size - container maximum size
    */
   public ContainerData(ContainerType type, long containerId, int size) {
-    this.containerType = type;
-    this.containerId = containerId;
-    this.layOutVersion = ChunkLayOutVersion.getLatestVersion().getVersion();
-    this.metadata = new TreeMap<>();
-    this.state = ContainerLifeCycleState.OPEN;
-    this.readCount = new AtomicLong(0L);
-    this.readBytes =  new AtomicLong(0L);
-    this.writeCount =  new AtomicLong(0L);
-    this.writeBytes =  new AtomicLong(0L);
-    this.bytesUsed = new AtomicLong(0L);
-    this.keyCount = new AtomicLong(0L);
-    this.maxSizeGB = size;
+    this(type, containerId,
+        ChunkLayOutVersion.getLatestVersion().getVersion(), size);
   }
 
   /**
@@ -94,10 +93,12 @@ public class ContainerData {
    * @param layOutVersion - Container layOutVersion
    * @param size - Container maximum size
    */
-  public ContainerData(ContainerType type, long containerId, int
-      layOutVersion, int size) {
+  public ContainerData(ContainerType type, long containerId,
+    int layOutVersion, int size) {
+    Preconditions.checkNotNull(type);
+
     this.containerType = type;
-    this.containerId = containerId;
+    this.containerID = containerId;
     this.layOutVersion = layOutVersion;
     this.metadata = new TreeMap<>();
     this.state = ContainerLifeCycleState.OPEN;
@@ -108,13 +109,30 @@ public class ContainerData {
     this.bytesUsed = new AtomicLong(0L);
     this.keyCount = new AtomicLong(0L);
     this.maxSizeGB = size;
+    this.numPendingDeletionBlocks = new AtomicInteger(0);
   }
 
   /**
-   * Returns the containerId.
+   * Returns the containerID.
    */
-  public long getContainerId() {
-    return containerId;
+  public long getContainerID() {
+    return containerID;
+  }
+
+  /**
+   * Returns the path to base dir of the container.
+   * @return Path to base dir.
+   */
+  public String getContainerPath() {
+    return containerPath;
+  }
+
+  /**
+   * Set the base dir path of the container.
+   * @param baseDir path to base dir
+   */
+  public void setContainerPath(String baseDir) {
+    this.containerPath = baseDir;
   }
 
   /**
@@ -163,9 +181,6 @@ public class ContainerData {
    */
   public void addMetadata(String key, String value) throws IOException {
     synchronized (this.metadata) {
-      if (this.metadata.containsKey(key)) {
-        throw new IOException("This key already exists. Key " + key);
-      }
       metadata.put(key, value);
     }
   }
@@ -299,7 +314,6 @@ public class ContainerData {
     return this.bytesUsed.addAndGet(used);
   }
 
-
   /**
    * Decrease the number of bytes used by the container.
    * @param reclaimed the number of bytes reclaimed from the container.
@@ -356,4 +370,75 @@ public class ContainerData {
     this.keyCount.set(count);
   }
 
+  /**
+   * Returns container metadata path.
+   */
+  public String getMetadataPath() {
+    return null;
+  }
+
+  /**
+   * Returns container data path.
+   */
+  public String getDataPath() {
+    return null;
+  }
+
+  /**
+   * Increase the count of pending deletion blocks.
+   *
+   * @param numBlocks increment number
+   */
+  public void incrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks.addAndGet(numBlocks);
+  }
+
+  /**
+   * Decrease the count of pending deletion blocks.
+   *
+   * @param numBlocks decrement number
+   */
+  public void decrPendingDeletionBlocks(int numBlocks) {
+    this.numPendingDeletionBlocks.addAndGet(-1 * numBlocks);
+  }
+
+  /**
+   * Get the number of pending deletion blocks.
+   */
+  public int getNumPendingDeletionBlocks() {
+    return this.numPendingDeletionBlocks.get();
+  }
+
+  /**
+   * Returns a ProtoBuf Message from ContainerData.
+   *
+   * @return Protocol Buffer Message
+   */
+  public ContainerProtos.ContainerData getProtoBufMessage() {
+    ContainerProtos.ContainerData.Builder builder =
+        ContainerProtos.ContainerData.newBuilder();
+
+    builder.setContainerID(this.getContainerID());
+
+    if (this.containerPath != null) {
+      builder.setContainerPath(this.containerPath);
+    }
+
+    builder.setState(this.getState());
+
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      ContainerProtos.KeyValue.Builder keyValBuilder =
+          ContainerProtos.KeyValue.newBuilder();
+      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+          .setValue(entry.getValue()).build());
+    }
+
+    if (this.getBytesUsed() >= 0) {
+      builder.setBytesUsed(this.getBytesUsed());
+    }
+
+    builder.setContainerType(containerType);
+
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index 4f4d588..70d1615 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.ozone.container.common.impl;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.yaml.snakeyaml.Yaml;
 
@@ -48,7 +50,8 @@ import org.yaml.snakeyaml.nodes.ScalarNode;
 import org.yaml.snakeyaml.nodes.Tag;
 import org.yaml.snakeyaml.representer.Representer;
 
-import static org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData.YAML_TAG;
+import static org.apache.hadoop.ozone.container.keyvalue
+    .KeyValueContainerData.KEYVALUE_YAML_TAG;
 
 /**
  * Class for creating and reading .container files.
@@ -84,7 +87,7 @@ public final class ContainerDataYaml {
       Representer representer = new ContainerDataRepresenter();
       representer.setPropertyUtils(propertyUtils);
       representer.addClassTag(KeyValueContainerData.class,
-          KeyValueContainerData.YAML_TAG);
+          KeyValueContainerData.KEYVALUE_YAML_TAG);
 
       Constructor keyValueDataConstructor = new ContainerDataConstructor();
 
@@ -171,7 +174,8 @@ public final class ContainerDataYaml {
       //Adding our own specific constructors for tags.
       // When a new Container type is added, we need to add yamlConstructor
       // for that
-      this.yamlConstructors.put(YAML_TAG, new ConstructKeyValueContainerData());
+      this.yamlConstructors.put(
+          KEYVALUE_YAML_TAG, new ConstructKeyValueContainerData());
       this.yamlConstructors.put(Tag.INT, new ConstructLong());
     }
 
@@ -181,20 +185,21 @@ public final class ContainerDataYaml {
         Map<Object, Object> nodes = constructMapping(mnode);
 
         //Needed this, as TAG.INT type is by default converted to Long.
-        long layOutVersion = (long) nodes.get("layOutVersion");
+        long layOutVersion = (long) nodes.get(OzoneConsts.LAYOUTVERSION);
         int lv = (int) layOutVersion;
 
-        long size = (long) nodes.get("maxSizeGB");
+        long size = (long) nodes.get(OzoneConsts.MAX_SIZE_GB);
         int maxSize = (int) size;
 
         //When a new field is added, it needs to be added here.
         KeyValueContainerData kvData = new KeyValueContainerData((long) nodes
-            .get("containerId"), lv, maxSize);
-        kvData.setContainerDBType((String)nodes.get("containerDBType"));
+            .get(OzoneConsts.CONTAINER_ID), lv, maxSize);
+        kvData.setContainerDBType((String)nodes.get(
+            OzoneConsts.CONTAINER_DB_TYPE));
         kvData.setMetadataPath((String) nodes.get(
-            "metadataPath"));
-        kvData.setChunksPath((String) nodes.get("chunksPath"));
-        Map<String, String> meta = (Map) nodes.get("metadata");
+            OzoneConsts.METADATA_PATH));
+        kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
+        Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
         meta.forEach((key, val) -> {
           try {
             kvData.addMetadata(key, val);
@@ -204,7 +209,7 @@ public final class ContainerDataYaml {
                 "for containerId " + (long) nodes.get("containerId"));
           }
         });
-        String state = (String) nodes.get("state");
+        String state = (String) nodes.get(OzoneConsts.STATE);
         switch (state) {
         case "OPEN":
           kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
@@ -218,7 +223,7 @@ public final class ContainerDataYaml {
         default:
           throw new IllegalStateException("Unexpected " +
               "ContainerLifeCycleState " + state + " for the containerId " +
-              (long) nodes.get("containerId"));
+              (long) nodes.get(OzoneConsts.CONTAINER_ID));
         }
         return kvData;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
deleted file mode 100644
index 5f5b81f..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces
-    .ContainerLocationManager;
-import org.apache.hadoop.ozone.container.common.interfaces
-    .ContainerLocationManagerMXBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * A class that tells the ContainerManager where to place the containers.
- * Please note : There is *no* one-to-one correlation between metadata
- * Locations and data Locations.
- *
- *  For example : A user could map all container files to a
- *  SSD but leave data/metadata on bunch of other disks.
- */
-public class ContainerLocationManagerImpl implements ContainerLocationManager,
-    ContainerLocationManagerMXBean {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerLocationManagerImpl.class);
-
-  private final List<ContainerStorageLocation> dataLocations;
-  private int currentIndex;
-  private final List<StorageLocation> metadataLocations;
-  private final ObjectName jmxbean;
-
-  /**
-   * Constructs a Location Manager.
-   * @param metadataLocations  - Refers to the metadataLocations
-   * where we store the container metadata.
-   * @param dataDirs - metadataLocations where we store the actual
-   * data or chunk files.
-   * @param conf - configuration.
-   * @throws IOException
-   */
-  public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations,
-      List<StorageLocation> dataDirs, Configuration conf)
-      throws IOException {
-    dataLocations = new LinkedList<>();
-    for (StorageLocation dataDir : dataDirs) {
-      dataLocations.add(new ContainerStorageLocation(dataDir, conf));
-    }
-    this.metadataLocations = metadataLocations;
-    jmxbean = MBeans.register("OzoneDataNode",
-        ContainerLocationManager.class.getSimpleName(), this);
-  }
-
-  /**
-   * Returns the path where the container should be placed from a set of
-   * metadataLocations.
-   *
-   * @return A path where we should place this container and metadata.
-   * @throws IOException
-   */
-  @Override
-  public Path getContainerPath()
-      throws IOException {
-    Preconditions.checkState(metadataLocations.size() > 0);
-    int index = currentIndex % metadataLocations.size();
-    return Paths.get(metadataLocations.get(index).getNormalizedUri());
-  }
-
-  /**
-   * Returns the path where the container Data file are stored.
-   *
-   * @return  a path where we place the LevelDB and data files of a container.
-   * @throws IOException
-   */
-  @Override
-  public Path getDataPath(String containerName) throws IOException {
-    Path currentPath = Paths.get(
-        dataLocations.get(currentIndex++ % dataLocations.size())
-            .getNormalizedUri());
-    currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX);
-    return currentPath.resolve(containerName);
-  }
-
-  @Override
-  public StorageLocationReport[] getLocationReport() throws IOException {
-    boolean failed;
-    StorageLocationReport[] reports =
-        new StorageLocationReport[dataLocations.size()];
-    for (int idx = 0; idx < dataLocations.size(); idx++) {
-      ContainerStorageLocation loc = dataLocations.get(idx);
-      long scmUsed = 0;
-      long remaining = 0;
-      failed = false;
-      try {
-        scmUsed = loc.getScmUsed();
-        remaining = loc.getAvailable();
-      } catch (IOException ex) {
-        LOG.warn("Failed to get scmUsed and remaining for container " +
-            "storage location {}", loc.getNormalizedUri());
-        // reset scmUsed and remaining if df/du failed.
-        scmUsed = 0;
-        remaining = 0;
-        failed = true;
-      }
-
-      StorageLocationReport.Builder builder =
-          StorageLocationReport.newBuilder();
-      builder.setStorageLocation(loc.getStorageLocation())
-          .setId(loc.getStorageUuId())
-          .setFailed(failed)
-          .setCapacity(loc.getCapacity())
-          .setRemaining(remaining)
-          .setScmUsed(scmUsed)
-          .setStorageType(loc.getStorageType());
-      StorageLocationReport r = builder.build();
-      reports[idx] = r;
-    }
-    return reports;
-  }
-
-  /**
-   * Supports clean shutdown of container location du threads.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void shutdown() throws IOException {
-    for (ContainerStorageLocation loc: dataLocations) {
-      loc.shutdown();
-    }
-    MBeans.unregister(jmxbean);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org