You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:38 UTC

[29/37] hadoop git commit: HDDS-182:CleanUp Reimplemented classes. Contributed by Hansiha Koneru

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
deleted file mode 100644
index 02572a8..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ /dev/null
@@ -1,1115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerLifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces
-    .ContainerDeletionChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.interfaces
-    .ContainerLocationManager;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.DigestInputStream;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_EXISTS;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.ERROR_IN_COMPACT_DB;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.INVALID_CONFIG;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNABLE_TO_READ_METADATA_DB;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNCLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.UNSUPPORTED_REQUEST;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    Result.INVALID_CONTAINER_STATE;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-
-/**
- * A Generic ContainerManagerImpl that will be called from Ozone
- * ContainerManagerImpl. This allows us to support delta changes to ozone
- * version without having to rewrite the containerManager.
- */
-public class ContainerManagerImpl implements ContainerManager {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ContainerManagerImpl.class);
-
-  // TODO: consider primitive collection like eclipse-collections
-  // to avoid autoboxing overhead
-  private final ConcurrentSkipListMap<Long, ContainerData>
-      containerMap = new ConcurrentSkipListMap<>();
-
-  // Use a non-fair RW lock for better throughput, we may revisit this decision
-  // if this causes fairness issues.
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private ContainerLocationManager locationManager;
-  private ChunkManager chunkManager;
-  private KeyManager keyManager;
-  private Configuration conf;
-
-  private ContainerDeletionChoosingPolicy containerDeletionChooser;
-
-  /**
-   * Init call that sets up a container Manager.
-   *
-   * @param config - Configuration.
-   * @param containerDirs - List of Metadata Container locations.
-   * @param dnDetails - DatanodeDetails.
-   * @throws IOException
-   */
-  @Override
-  public void init(
-      Configuration config, List<StorageLocation> containerDirs,
-      DatanodeDetails dnDetails) throws IOException {
-    Preconditions.checkNotNull(config, "Config must not be null");
-    Preconditions.checkNotNull(containerDirs, "Container directories cannot " +
-        "be null");
-    Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " +
-        "be null");
-
-    Preconditions.checkState(containerDirs.size() > 0, "Number of container" +
-        " directories must be greater than zero.");
-
-    this.conf = config;
-
-    readLock();
-    try {
-      containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass(
-          ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
-          TopNOrderedContainerDeletionChoosingPolicy.class,
-          ContainerDeletionChoosingPolicy.class), conf);
-
-      for (StorageLocation path : containerDirs) {
-        File directory = Paths.get(path.getNormalizedUri()).toFile();
-        if (!directory.exists() && !directory.mkdirs()) {
-          LOG.error("Container metadata directory doesn't exist "
-              + "and cannot be created. Path: {}", path.toString());
-          throw new StorageContainerException("Container metadata "
-              + "directory doesn't exist and cannot be created " + path
-              .toString(), INVALID_CONFIG);
-        }
-
-        // TODO: This will fail if any directory is invalid.
-        // We should fix this to handle invalid directories and continue.
-        // Leaving it this way to fail fast for time being.
-        if (!directory.isDirectory()) {
-          LOG.error("Invalid path to container metadata directory. path: {}",
-              path.toString());
-          throw new StorageContainerException("Invalid path to container " +
-              "metadata directory." + path, INVALID_CONFIG);
-        }
-        LOG.info("Loading containers under {}", path);
-        File[] files = directory.listFiles(new ContainerFilter());
-        if (files != null) {
-          for (File containerFile : files) {
-            LOG.debug("Loading container {}", containerFile);
-            String containerPath =
-                ContainerUtils.getContainerNameFromFile(containerFile);
-            Preconditions.checkNotNull(containerPath, "Container path cannot" +
-                " be null");
-            readContainerInfo(containerPath);
-          }
-        }
-      }
-
-      List<StorageLocation> dataDirs = new LinkedList<>();
-      for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
-        StorageLocation location = StorageLocation.parse(dir);
-        dataDirs.add(location);
-      }
-      this.locationManager =
-          new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Reads the Container Info from a file and verifies that checksum match. If
-   * the checksums match, then that file is added to containerMap.
-   *
-   * @param containerName - Name which points to the persisted container.
-   * @throws StorageContainerException
-   */
-  private void readContainerInfo(String containerName)
-      throws StorageContainerException {
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
-    FileInputStream containerStream = null;
-    DigestInputStream dis = null;
-    FileInputStream metaStream = null;
-    Path cPath = Paths.get(containerName).getFileName();
-    String keyName = null;
-    if (cPath != null) {
-      keyName = cPath.toString();
-    }
-    Preconditions.checkNotNull(keyName,
-        "Container Name  to container key mapping is null");
-
-    long containerID = Long.parseLong(keyName);
-    try {
-      String containerFileName = containerName.concat(CONTAINER_EXTENSION);
-
-      containerStream = new FileInputStream(containerFileName);
-
-      ContainerProtos.ContainerData containerDataProto =
-          ContainerProtos.ContainerData.parseDelimitedFrom(containerStream);
-      ContainerData containerData;
-      if (containerDataProto == null) {
-        // Sometimes container metadata might have been created but empty,
-        // when loading the info we get a null, this often means last time
-        // SCM was ending up at some middle phase causing that the metadata
-        // was not populated. Such containers are marked as inactive.
-        ContainerData cData = new ContainerData(containerID, conf,
-            ContainerLifeCycleState.INVALID);
-        containerMap.put(containerID, cData);
-        return;
-      }
-      containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
-
-      // Initialize pending deletion blocks and deleted blocks count in
-      // in-memory containerData.
-      MetadataStore metadata = KeyUtils.getDB(containerData, conf);
-      List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
-          .getSequentialRangeKVs(null, Integer.MAX_VALUE,
-              MetadataKeyFilters.getDeletingKeyFilter());
-      byte[] transactionID = metadata.get(DFSUtil.string2Bytes(
-          OzoneConsts.DELETE_TRANSACTION_KEY_PREFIX + containerID));
-      if (transactionID != null) {
-        containerData
-            .updateDeleteTransactionId(Longs.fromByteArray(transactionID));
-      }
-      containerData.incrPendingDeletionBlocks(underDeletionBlocks.size());
-
-      List<Map.Entry<byte[], byte[]>> liveKeys = metadata
-          .getRangeKVs(null, Integer.MAX_VALUE,
-              MetadataKeyFilters.getNormalKeyFilter());
-
-      // Get container bytesUsed upon loading container
-      // The in-memory state is updated upon key write or delete
-      // TODO: update containerDataProto and persist it into container MetaFile
-      long bytesUsed = 0;
-      bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
-        KeyData keyData;
-        try {
-          keyData = KeyUtils.getKeyData(e.getValue());
-          return keyData.getSize();
-        } catch (IOException ex) {
-          return 0L;
-        }
-      }).sum();
-      containerData.setBytesUsed(bytesUsed);
-
-      containerMap.put(containerID, containerData);
-    } catch (IOException ex) {
-      LOG.error("read failed for file: {} ex: {}", containerName,
-          ex.getMessage());
-
-      // TODO : Add this file to a recovery Queue.
-
-      // Remember that this container is busted and we cannot use it.
-      ContainerData cData = new ContainerData(containerID, conf,
-          ContainerLifeCycleState.INVALID);
-      containerMap.put(containerID, cData);
-      throw new StorageContainerException("Unable to read container info",
-          UNABLE_TO_READ_METADATA_DB);
-    } finally {
-      IOUtils.closeStream(dis);
-      IOUtils.closeStream(containerStream);
-      IOUtils.closeStream(metaStream);
-    }
-  }
-
-  /**
-   * Creates a container with the given name.
-   *
-   * @param containerData - Container Name and metadata.
-   * @throws StorageContainerException - Exception
-   */
-  @Override
-  public void createContainer(ContainerData containerData)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(containerData, "Container data cannot be null");
-    writeLock();
-    try {
-      if (containerMap.containsKey(containerData.getContainerID())) {
-        LOG.debug("container already exists. {}",
-            containerData.getContainerID());
-        throw new StorageContainerException("container already exists.",
-            CONTAINER_EXISTS);
-      }
-
-      // This is by design. We first write and close the
-      // container Info and metadata to a directory.
-      // Then read back and put that info into the containerMap.
-      // This allows us to make sure that our write is consistent.
-
-      writeContainerInfo(containerData, false);
-      File cFile = new File(containerData.getContainerPath());
-      readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
-    } catch (NoSuchAlgorithmException ex) {
-      LOG.error("Internal error: We seem to be running a JVM without a " +
-          "needed hash algorithm.");
-      throw new StorageContainerException("failed to create container",
-          NO_SUCH_ALGORITHM);
-    } finally {
-      writeUnlock();
-    }
-
-  }
-
-  /**
-   * Writes a container to a chosen location and updates the container Map.
-   *
-   * The file formats of ContainerData and Container Meta is the following.
-   *
-   * message ContainerData {
-   * required string name = 1;
-   * repeated KeyValue metadata = 2;
-   * optional string dbPath = 3;
-   * optional string containerPath = 4;
-   * optional int64 bytesUsed = 5;
-   * optional int64 size = 6;
-   * }
-   *
-   * message ContainerMeta {
-   * required string fileName = 1;
-   * required string hash = 2;
-   * }
-   *
-   * @param containerData - container Data
-   * @param overwrite - Whether we are overwriting.
-   * @throws StorageContainerException, NoSuchAlgorithmException
-   */
-  private void writeContainerInfo(ContainerData containerData,
-      boolean  overwrite)
-      throws StorageContainerException, NoSuchAlgorithmException {
-
-    Preconditions.checkNotNull(this.locationManager,
-        "Internal error: location manager cannot be null");
-
-    FileOutputStream containerStream = null;
-    DigestOutputStream dos = null;
-    FileOutputStream metaStream = null;
-
-    try {
-      Path metadataPath = null;
-      Path location = (!overwrite) ? locationManager.getContainerPath():
-          Paths.get(containerData.getContainerPath()).getParent();
-      if (location == null) {
-        throw new StorageContainerException(
-            "Failed to get container file path.",
-            CONTAINER_INTERNAL_ERROR);
-      }
-
-      File containerFile = ContainerUtils.getContainerFile(containerData,
-          location);
-      String containerName = Long.toString(containerData.getContainerID());
-
-      if(!overwrite) {
-        ContainerUtils.verifyIsNewContainer(containerFile);
-        metadataPath = this.locationManager.getDataPath(containerName);
-        metadataPath = ContainerUtils.createMetadata(metadataPath,
-            containerName, conf);
-      }  else {
-        metadataPath = ContainerUtils.getMetadataDirectory(containerData);
-      }
-
-      containerStream = new FileOutputStream(containerFile);
-
-      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-
-      dos = new DigestOutputStream(containerStream, sha);
-      containerData.setDBPath(metadataPath.resolve(
-          ContainerUtils.getContainerDbFileName(containerName))
-          .toString());
-      containerData.setContainerPath(containerFile.toString());
-
-      if(containerData.getContainerDBType() == null) {
-        String impl = conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
-            OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
-        containerData.setContainerDBType(impl);
-      }
-
-      ContainerProtos.ContainerData protoData = containerData
-          .getProtoBufMessage();
-      protoData.writeDelimitedTo(dos);
-
-    } catch (IOException ex) {
-      // TODO : we need to clean up partially constructed files
-      // The proper way to do would be for a thread
-      // to read all these 3 artifacts and make sure they are
-      // sane. That info needs to come from the replication
-      // pipeline, and if not consistent delete these file.
-
-      // In case of ozone this is *not* a deal breaker since
-      // SCM is guaranteed to generate unique container names.
-      // The saving grace is that we check if we have residue files
-      // lying around when creating a new container. We need to queue
-      // this information to a cleaner thread.
-
-      LOG.error("Creation of container failed. Name: {}, we might need to " +
-              "cleanup partially created artifacts. ",
-          containerData.getContainerID(), ex);
-      throw new StorageContainerException("Container creation failed. ",
-          ex, CONTAINER_INTERNAL_ERROR);
-    } finally {
-      IOUtils.closeStream(dos);
-      IOUtils.closeStream(containerStream);
-      IOUtils.closeStream(metaStream);
-    }
-  }
-
-  /**
-   * Deletes an existing container.
-   *
-   * @param containerID - ID of the container.
-   * @param forceDelete - whether this container should be deleted forcibly.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void deleteContainer(long containerID,
-      boolean forceDelete) throws StorageContainerException {
-    Preconditions.checkState(containerID >= 0,
-        "Container ID cannot be negative.");
-    writeLock();
-    try {
-      if (isOpen(containerID)) {
-        throw new StorageContainerException(
-            "Deleting an open container is not allowed.",
-            UNCLOSED_CONTAINER_IO);
-      }
-
-      ContainerData containerData = containerMap.get(containerID);
-      if (containerData == null) {
-        LOG.debug("No such container. ID: {}", containerID);
-        throw new StorageContainerException("No such container. ID : " +
-            containerID, CONTAINER_NOT_FOUND);
-      }
-
-      if(!containerData.isValid()) {
-        LOG.debug("Invalid container data. ID: {}", containerID);
-        throw new StorageContainerException("Invalid container data. Name : " +
-            containerID, CONTAINER_NOT_FOUND);
-      }
-      ContainerUtils.removeContainer(containerData, conf, forceDelete);
-      containerMap.remove(containerID);
-    } catch (StorageContainerException e) {
-      throw e;
-    } catch (IOException e) {
-      // TODO : An I/O error during delete can leave partial artifacts on the
-      // disk. We will need the cleaner thread to cleanup this information.
-      String errMsg = String.format("Failed to cleanup container. ID: %d",
-          containerID);
-      LOG.error(errMsg, e);
-      throw new StorageContainerException(errMsg, e, IO_EXCEPTION);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * A simple interface for container Iterations.
-   * <p/>
-   * This call make no guarantees about consistency of the data between
-   * different list calls. It just returns the best known data at that point of
-   * time. It is possible that using this iteration you can miss certain
-   * container from the listing.
-   *
-   * @param startContainerID -  Return containers with ID >= startContainerID.
-   * @param count - how many to return
-   * @param data - Actual containerData
-   * @throws StorageContainerException
-   */
-  @Override
-  public void listContainer(long startContainerID, long count,
-      List<ContainerData> data) throws StorageContainerException {
-    Preconditions.checkNotNull(data,
-        "Internal assertion: data cannot be null");
-    Preconditions.checkState(startContainerID >= 0,
-        "Start container ID cannot be negative");
-    Preconditions.checkState(count > 0,
-        "max number of containers returned " +
-            "must be positive");
-
-    readLock();
-    try {
-      ConcurrentNavigableMap<Long, ContainerData> map;
-      if (startContainerID == 0) {
-        map = containerMap.tailMap(containerMap.firstKey(), true);
-      } else {
-        map = containerMap.tailMap(startContainerID, false);
-      }
-
-      int currentCount = 0;
-      for (ContainerData entry : map.values()) {
-        if (currentCount < count) {
-          data.add(entry);
-          currentCount++;
-        } else {
-          return;
-        }
-      }
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get metadata about a specific container.
-   *
-   * @param containerID - ID of the container
-   * @return ContainerData - Container Data.
-   * @throws StorageContainerException
-   */
-  @Override
-  public ContainerData readContainer(long containerID)
-      throws StorageContainerException {
-    Preconditions.checkState(containerID >= 0,
-        "Container ID cannot be negative.");
-    if (!containerMap.containsKey(containerID)) {
-      throw new StorageContainerException("Unable to find the container. ID: "
-          + containerID, CONTAINER_NOT_FOUND);
-    }
-    ContainerData cData = containerMap.get(containerID);
-    if (cData == null) {
-      throw new StorageContainerException("Invalid container data. ID: "
-          + containerID, CONTAINER_INTERNAL_ERROR);
-    }
-    return cData;
-  }
-
-  /**
-   * Closes a open container, if it is already closed or does not exist a
-   * StorageContainerException is thrown.
-   *
-   * @param containerID - ID of the container.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void closeContainer(long containerID)
-      throws StorageContainerException, NoSuchAlgorithmException {
-    ContainerData containerData = readContainer(containerID);
-    containerData.closeContainer();
-    writeContainerInfo(containerData, true);
-    MetadataStore db = KeyUtils.getDB(containerData, conf);
-
-    // It is ok if this operation takes a bit of time.
-    // Close container is not expected to be instantaneous.
-    try {
-      db.compactDB();
-    } catch (IOException e) {
-      LOG.error("Error in DB compaction while closing container", e);
-      throw new StorageContainerException(e, ERROR_IN_COMPACT_DB);
-    }
-
-    // Active is different from closed. Closed means it is immutable, active
-    // false means we have some internal error that is happening to this
-    // container. This is a way to track damaged containers if we have an
-    // I/O failure, this allows us to take quick action in case of container
-    // issues.
-
-    containerMap.put(containerID, containerData);
-  }
-
-  @Override
-  public void updateContainer(long containerID, ContainerData data,
-      boolean forceUpdate) throws StorageContainerException {
-    Preconditions.checkState(containerID >= 0,
-        "Container ID cannot be negative.");
-    Preconditions.checkNotNull(data, "Container data cannot be null");
-    FileOutputStream containerStream = null;
-    DigestOutputStream dos = null;
-    MessageDigest sha = null;
-    File containerFileBK = null, containerFile = null;
-    boolean deleted = false;
-
-    if(!containerMap.containsKey(containerID)) {
-      throw new StorageContainerException("Container doesn't exist. Name :"
-          + containerID, CONTAINER_NOT_FOUND);
-    }
-
-    try {
-      sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    } catch (NoSuchAlgorithmException e) {
-      throw new StorageContainerException("Unable to create Message Digest,"
-          + " usually this is a java configuration issue.",
-          NO_SUCH_ALGORITHM);
-    }
-
-    try {
-      Path location = locationManager.getContainerPath();
-      ContainerData orgData = containerMap.get(containerID);
-      if (orgData == null) {
-        // updating a invalid container
-        throw new StorageContainerException("Update a container with invalid" +
-            "container meta data", CONTAINER_INTERNAL_ERROR);
-      }
-
-      if (!forceUpdate && !orgData.isOpen()) {
-        throw new StorageContainerException(
-            "Update a closed container is not allowed. ID: " + containerID,
-            UNSUPPORTED_REQUEST);
-      }
-
-      containerFile = ContainerUtils.getContainerFile(orgData, location);
-      // If forceUpdate is true, there is no need to check
-      // whether the container file exists.
-      if (!forceUpdate) {
-        if (!containerFile.exists() || !containerFile.canWrite()) {
-          throw new StorageContainerException(
-              "Container file not exists or corrupted. ID: " + containerID,
-              CONTAINER_INTERNAL_ERROR);
-        }
-
-        // Backup the container file
-        containerFileBK = File.createTempFile(
-            "tmp_" + System.currentTimeMillis() + "_",
-            containerFile.getName(), containerFile.getParentFile());
-        FileUtils.copyFile(containerFile, containerFileBK);
-
-        deleted = containerFile.delete();
-        containerStream = new FileOutputStream(containerFile);
-        dos = new DigestOutputStream(containerStream, sha);
-
-        ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
-        protoData.writeDelimitedTo(dos);
-      }
-
-      // Update the in-memory map
-      containerMap.replace(containerID, data);
-    } catch (IOException e) {
-      // Restore the container file from backup
-      if(containerFileBK != null && containerFileBK.exists() && deleted) {
-        if(containerFile.delete()
-            && containerFileBK.renameTo(containerFile)) {
-          throw new StorageContainerException("Container update failed,"
-              + " container data restored from the backup.",
-              CONTAINER_INTERNAL_ERROR);
-        } else {
-          throw new StorageContainerException(
-              "Failed to restore container data from the backup. ID: "
-                  + containerID, CONTAINER_INTERNAL_ERROR);
-        }
-      } else {
-        throw new StorageContainerException(
-            e.getMessage(), CONTAINER_INTERNAL_ERROR);
-      }
-    } finally {
-      if (containerFileBK != null && containerFileBK.exists()) {
-        if(!containerFileBK.delete()) {
-          LOG.warn("Unable to delete container file backup : {}.",
-              containerFileBK.getAbsolutePath());
-        }
-      }
-      IOUtils.closeStream(dos);
-      IOUtils.closeStream(containerStream);
-    }
-  }
-
-  @VisibleForTesting
-  protected File getContainerFile(ContainerData data) throws IOException {
-    return ContainerUtils.getContainerFile(data,
-        this.locationManager.getContainerPath());
-  }
-
-  /**
-   * Checks if a container exists.
-   *
-   * @param containerID - ID of the container.
-   * @return true if the container is open false otherwise.
-   * @throws StorageContainerException - Throws Exception if we are not able to
-   *                                   find the container.
-   */
-  @Override
-  public boolean isOpen(long containerID) throws StorageContainerException {
-    final ContainerData containerData = containerMap.get(containerID);
-    if (containerData == null) {
-      throw new StorageContainerException(
-          "Container not found: " + containerID, CONTAINER_NOT_FOUND);
-    }
-    return containerData.isOpen();
-  }
-
-  /**
-   * Returns LifeCycle State of the container.
-   * @param containerID - Id of the container
-   * @return LifeCycle State of the container
-   * @throws StorageContainerException
-   */
-  private HddsProtos.LifeCycleState getState(long containerID)
-      throws StorageContainerException {
-    LifeCycleState state;
-    final ContainerData data = containerMap.get(containerID);
-    if (data == null) {
-      throw new StorageContainerException(
-          "Container status not found: " + containerID, CONTAINER_NOT_FOUND);
-    }
-    switch (data.getState()) {
-    case OPEN:
-      state = LifeCycleState.OPEN;
-      break;
-    case CLOSING:
-      state = LifeCycleState.CLOSING;
-      break;
-    case CLOSED:
-      state = LifeCycleState.CLOSED;
-      break;
-    default:
-      throw new StorageContainerException(
-          "Invalid Container state found: " + containerID,
-          INVALID_CONTAINER_STATE);
-    }
-
-    return state;
-  }
-
-  /**
-   * Supports clean shutdown of container.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void shutdown() throws IOException {
-    Preconditions.checkState(this.hasWriteLock(),
-        "Assumption that we are holding the lock violated.");
-    this.containerMap.clear();
-    this.locationManager.shutdown();
-  }
-
-
-  @VisibleForTesting
-  public ConcurrentSkipListMap<Long, ContainerData> getContainerMap() {
-    return containerMap;
-  }
-
-  /**
-   * Acquire read lock.
-   */
-  @Override
-  public void readLock() {
-    this.lock.readLock().lock();
-
-  }
-
-  @Override
-  public void readLockInterruptibly() throws InterruptedException {
-    this.lock.readLock().lockInterruptibly();
-  }
-
-  /**
-   * Release read lock.
-   */
-  @Override
-  public void readUnlock() {
-    this.lock.readLock().unlock();
-  }
-
-  /**
-   * Check if the current thread holds read lock.
-   */
-  @Override
-  public boolean hasReadLock() {
-    return this.lock.readLock().tryLock();
-  }
-
-  /**
-   * Acquire write lock.
-   */
-  @Override
-  public void writeLock() {
-    this.lock.writeLock().lock();
-  }
-
-  /**
-   * Acquire write lock, unless interrupted while waiting.
-   */
-  @Override
-  public void writeLockInterruptibly() throws InterruptedException {
-    this.lock.writeLock().lockInterruptibly();
-
-  }
-
-  /**
-   * Release write lock.
-   */
-  @Override
-  public void writeUnlock() {
-    this.lock.writeLock().unlock();
-
-  }
-
-  /**
-   * Check if the current thread holds write lock.
-   */
-  @Override
-  public boolean hasWriteLock() {
-    return this.lock.writeLock().isHeldByCurrentThread();
-  }
-
-  public ChunkManager getChunkManager() {
-    return this.chunkManager;
-  }
-
-  /**
-   * Sets the chunk Manager.
-   *
-   * @param chunkManager - Chunk Manager
-   */
-  public void setChunkManager(ChunkManager chunkManager) {
-    this.chunkManager = chunkManager;
-  }
-
-  /**
-   * Gets the Key Manager.
-   *
-   * @return KeyManager.
-   */
-  @Override
-  public KeyManager getKeyManager() {
-    return this.keyManager;
-  }
-
-  /**
-   * Get the node report.
-   * @return node report.
-   */
-  @Override
-  public NodeReportProto getNodeReport() throws IOException {
-    StorageLocationReport[] reports = locationManager.getLocationReport();
-    NodeReportProto.Builder nrb = NodeReportProto.newBuilder();
-    for (int i = 0; i < reports.length; i++) {
-      StorageReportProto.Builder srb = StorageReportProto.newBuilder();
-      nrb.addStorageReport(reports[i].getProtoBufMessage());
-    }
-    return nrb.build();
-  }
-
-
-  /**
-   * Gets container reports.
-   *
-   * @return List of all closed containers.
-   * @throws IOException
-   */
-  @Override
-  public List<ContainerData> getClosedContainerReports() throws IOException {
-    LOG.debug("Starting container report iteration.");
-    // No need for locking since containerMap is a ConcurrentSkipListMap
-    // And we can never get the exact state since close might happen
-    // after we iterate a point.
-    return containerMap.entrySet().stream()
-        .filter(containerData ->
-            containerData.getValue().isClosed())
-        .map(containerData -> containerData.getValue())
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get container report.
-   *
-   * @return The container report.
-   * @throws IOException
-   */
-  @Override
-  public ContainerReportsProto getContainerReport() throws IOException {
-    LOG.debug("Starting container report iteration.");
-    // No need for locking since containerMap is a ConcurrentSkipListMap
-    // And we can never get the exact state since close might happen
-    // after we iterate a point.
-    List<ContainerData> containers = containerMap.values().stream()
-        .collect(Collectors.toList());
-
-    ContainerReportsProto.Builder crBuilder =
-        ContainerReportsProto.newBuilder();
-
-    for (ContainerData container: containers) {
-      long containerId = container.getContainerID();
-      StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
-          StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
-      ciBuilder.setContainerID(container.getContainerID())
-          .setSize(container.getMaxSize())
-          .setUsed(container.getBytesUsed())
-          .setKeyCount(container.getKeyCount())
-          .setReadCount(container.getReadCount())
-          .setWriteCount(container.getWriteCount())
-          .setReadBytes(container.getReadBytes())
-          .setWriteBytes(container.getWriteBytes())
-          .setState(getState(containerId))
-          .setDeleteTransactionId(container.getDeleteTransactionId());
-
-      crBuilder.addReports(ciBuilder.build());
-    }
-
-    return crBuilder.build();
-  }
-
-  /**
-   * Sets the Key Manager.
-   *
-   * @param keyManager - Key Manager.
-   */
-  @Override
-  public void setKeyManager(KeyManager keyManager) {
-    this.keyManager = keyManager;
-  }
-
-  /**
-   * Filter out only container files from the container metadata dir.
-   */
-  private static class ContainerFilter implements FilenameFilter {
-    /**
-     * Tests if a specified file should be included in a file list.
-     *
-     * @param dir the directory in which the file was found.
-     * @param name the name of the file.
-     * @return <code>true</code> if and only if the name should be included in
-     * the file list; <code>false</code> otherwise.
-     */
-    @Override
-    public boolean accept(File dir, String name) {
-      return name.endsWith(CONTAINER_EXTENSION);
-    }
-  }
-
-  @Override
-  public List<ContainerData> chooseContainerForBlockDeletion(
-      int count) throws StorageContainerException {
-    readLock();
-    try {
-      return containerDeletionChooser.chooseContainerForBlockDeletion(
-          count, containerMap);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  @VisibleForTesting
-  public ContainerDeletionChoosingPolicy getContainerDeletionChooser() {
-    return containerDeletionChooser;
-  }
-
-  @Override
-  public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
-    writeLock();
-    try {
-      ContainerData cData = containerMap.get(containerId);
-      cData.incrPendingDeletionBlocks(numBlocks);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  @Override
-  public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
-    writeLock();
-    try {
-      ContainerData cData = containerMap.get(containerId);
-      cData.decrPendingDeletionBlocks(numBlocks);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * Increase the read count of the container.
-   *
-   * @param containerId - ID of the container.
-   */
-  @Override
-  public void incrReadCount(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    cData.incrReadCount();
-  }
-
-  public long getReadCount(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.getReadCount();
-  }
-
-  /**
-   * Increase the read counter for bytes read from the container.
-   *
-   * @param containerId - ID of the container.
-   * @param readBytes     - bytes read from the container.
-   */
-  @Override
-  public void incrReadBytes(long containerId, long readBytes) {
-    ContainerData cData = containerMap.get(containerId);
-    cData.incrReadBytes(readBytes);
-  }
-
-  /**
-   * Returns number of bytes read from the container.
-   * @param containerId
-   * @return
-   */
-  public long getReadBytes(long containerId) {
-    readLock();
-    try {
-      ContainerData cData = containerMap.get(containerId);
-      return cData.getReadBytes();
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Increase the write count of the container.
-   *
-   * @param containerId - Name of the container.
-   */
-  @Override
-  public void incrWriteCount(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    cData.incrWriteCount();
-  }
-
-  public long getWriteCount(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.getWriteCount();
-  }
-
-  /**
-   * Increse the write counter for bytes write into the container.
-   *
-   * @param containerId   - ID of the container.
-   * @param writeBytes    - bytes write into the container.
-   */
-  @Override
-  public void incrWriteBytes(long containerId, long writeBytes) {
-    ContainerData cData = containerMap.get(containerId);
-    cData.incrWriteBytes(writeBytes);
-  }
-
-  public long getWriteBytes(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.getWriteBytes();
-  }
-
-  /**
-   * Increase the bytes used by the container.
-   *
-   * @param containerId   - ID of the container.
-   * @param used          - additional bytes used by the container.
-   * @return the current bytes used.
-   */
-  @Override
-  public long incrBytesUsed(long containerId, long used) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.incrBytesUsed(used);
-  }
-
-  /**
-   * Decrease the bytes used by the container.
-   *
-   * @param containerId   - ID of the container.
-   * @param used          - additional bytes reclaimed by the container.
-   * @return the current bytes used.
-   */
-  @Override
-  public long decrBytesUsed(long containerId, long used) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.decrBytesUsed(used);
-  }
-
-  public long getBytesUsed(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.getBytesUsed();
-  }
-
-  /**
-   * Get the number of keys in the container.
-   *
-   * @param containerId - ID of the container.
-   * @return the current key count.
-   */
-  @Override
-  public long getNumKeys(long containerId) {
-    ContainerData cData = containerMap.get(containerId);
-    return cData.getKeyCount();
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 18a7839..bcba8c8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
@@ -63,7 +64,7 @@ public class ContainerSet {
       StorageContainerException {
     Preconditions.checkNotNull(container, "container cannot be null");
 
-    long containerId = container.getContainerData().getContainerId();
+    long containerId = container.getContainerData().getContainerID();
     if(containerMap.putIfAbsent(containerId, container) == null) {
       LOG.debug("Container with container Id {} is added to containerMap",
           containerId);
@@ -133,6 +134,13 @@ public class ContainerSet {
     return containerMap.entrySet().iterator();
   }
 
+  /**
+   * Return a copy of the containerMap
+   * @return containerMap
+   */
+  public Map<Long, Container> getContainerMap() {
+    return ImmutableMap.copyOf(containerMap);
+  }
 
   /**
    * A simple interface for container Iterations.
@@ -196,7 +204,7 @@ public class ContainerSet {
 
 
     for (Container container: containers) {
-      long containerId = container.getContainerData().getContainerId();
+      long containerId = container.getContainerData().getContainerID();
       ContainerInfo.Builder ciBuilder = ContainerInfo.newBuilder();
       ContainerData containerData = container.getContainerData();
       ciBuilder.setContainerID(containerId)
@@ -234,9 +242,14 @@ public class ContainerSet {
       break;
     default:
       throw new StorageContainerException("Invalid Container state found: " +
-          containerData.getContainerId(), INVALID_CONTAINER_STATE);
+          containerData.getContainerID(), INVALID_CONTAINER_STATE);
     }
     return state;
   }
 
+  // TODO: Implement BlockDeletingService
+  public List<ContainerData> chooseContainerForBlockDeletion(
+      int count) throws StorageContainerException {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
deleted file mode 100644
index 7431baa..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CachingGetSpaceUsed;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.GetSpaceUsed;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Scanner;
-
-import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
-
-/**
- * Class that wraps the space usage of the Datanode Container Storage Location
- * by SCM containers.
- */
-public class ContainerStorageLocation {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStorageLocation.class);
-
-  private static final String DU_CACHE_FILE = "scmUsed";
-  private volatile boolean scmUsedSaved = false;
-
-  private final StorageLocation dataLocation;
-  private final String storageUuId;
-  private final DF usage;
-  private final GetSpaceUsed scmUsage;
-  private final File scmUsedFile;
-
-  public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
-      throws IOException {
-    this.dataLocation = dataLoc;
-    this.storageUuId = DatanodeStorage.generateUuid();
-    File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve(
-        OzoneConsts.CONTAINER_PREFIX).toFile();
-    // Initialize container data root if it does not exist as required by DF/DU
-    if (!dataDir.exists()) {
-      if (!dataDir.mkdirs()) {
-        LOG.error("Unable to create the container storage location at : {}",
-            dataDir);
-        throw new IllegalArgumentException("Unable to create the container" +
-            " storage location at : " + dataDir);
-      }
-    }
-    scmUsedFile = new File(dataDir, DU_CACHE_FILE);
-    // get overall disk usage
-    this.usage = new DF(dataDir, conf);
-    // get SCM specific usage
-    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
-        .setConf(conf)
-        .setInitialUsed(loadScmUsed())
-        .build();
-
-    // Ensure scm usage is saved during shutdown.
-    ShutdownHookManager.get().addShutdownHook(
-        new Runnable() {
-          @Override
-          public void run() {
-            if (!scmUsedSaved) {
-              saveScmUsed();
-            }
-          }
-        }, SHUTDOWN_HOOK_PRIORITY);
-  }
-
-  public URI getNormalizedUri() {
-    return dataLocation.getNormalizedUri();
-  }
-
-  public String getStorageUuId() {
-    return storageUuId;
-  }
-  public long getCapacity() {
-    long capacity = usage.getCapacity();
-    return (capacity > 0) ? capacity : 0;
-  }
-
-  public long getAvailable() throws IOException {
-    long remaining = getCapacity() - getScmUsed();
-    long available = usage.getAvailable();
-    if (remaining > available) {
-      remaining = available;
-    }
-    return (remaining > 0) ? remaining : 0;
-  }
-
-  public long getScmUsed() throws IOException{
-    return scmUsage.getUsed();
-  }
-
-  public String getStorageLocation() {
-    return getNormalizedUri().getRawPath();
-  }
-
-  public StorageType getStorageType() {
-    return dataLocation.getStorageType();
-  }
-
-  public void shutdown() {
-    saveScmUsed();
-    scmUsedSaved = true;
-
-    if (scmUsage instanceof CachingGetSpaceUsed) {
-      IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
-    }
-  }
-
-  /**
-   * Read in the cached DU value and return it if it is less than 600 seconds
-   * old (DU update interval). Slight imprecision of scmUsed is not critical
-   * and skipping DU can significantly shorten the startup time.
-   * If the cached value is not available or too old, -1 is returned.
-   */
-  long loadScmUsed() {
-    long cachedScmUsed;
-    long mtime;
-    Scanner sc;
-
-    try {
-      sc = new Scanner(scmUsedFile, "UTF-8");
-    } catch (FileNotFoundException fnfe) {
-      return -1;
-    }
-
-    try {
-      // Get the recorded scmUsed from the file.
-      if (sc.hasNextLong()) {
-        cachedScmUsed = sc.nextLong();
-      } else {
-        return -1;
-      }
-      // Get the recorded mtime from the file.
-      if (sc.hasNextLong()) {
-        mtime = sc.nextLong();
-      } else {
-        return -1;
-      }
-
-      // Return the cached value if mtime is okay.
-      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
-        LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
-            cachedScmUsed);
-        return cachedScmUsed;
-      }
-      return -1;
-    } finally {
-      sc.close();
-    }
-  }
-
-  /**
-   * Write the current scmUsed to the cache file.
-   */
-  void saveScmUsed() {
-    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
-      LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
-    }
-    OutputStreamWriter out = null;
-    try {
-      long used = getScmUsed();
-      if (used > 0) {
-        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
-            StandardCharsets.UTF_8);
-        // mtime is written last, so that truncated writes won't be valid.
-        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
-        out.flush();
-        out.close();
-        out = null;
-      }
-    } catch (IOException ioe) {
-      // If write failed, the volume might be bad. Since the cache file is
-      // not critical, log the error and continue.
-      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
-    } finally {
-      IOUtils.cleanupWithLogger(null, out);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
deleted file mode 100644
index 3ffe6e4..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ /dev/null
@@ -1,695 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.CLOSED_CONTAINER_IO;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.GET_SMALL_FILE_ERROR;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.PUT_SMALL_FILE_ERROR;
-
-/**
- * Ozone Container dispatcher takes a call from the netty server and routes it
- * to the right handler function.
- */
-public class Dispatcher implements ContainerDispatcher {
-  static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
-
-  private final ContainerManager containerManager;
-  private ContainerMetrics metrics;
-  private Configuration conf;
-
-  /**
-   * Constructs an OzoneContainer that receives calls from
-   * XceiverServerHandler.
-   *
-   * @param containerManager - A class that manages containers.
-   */
-  public Dispatcher(ContainerManager containerManager, Configuration config) {
-    Preconditions.checkNotNull(containerManager);
-    this.containerManager = containerManager;
-    this.metrics = null;
-    this.conf = config;
-  }
-
-  @Override
-  public void init() {
-    this.metrics = ContainerMetrics.create(conf);
-  }
-
-  @Override
-  public void shutdown() {
-  }
-
-  @Override
-  public Handler getHandler(ContainerProtos.ContainerType containerType) {
-    return null;
-  }
-
-  @Override
-  public void setScmId(String scmId) {
-    // DO nothing, this will be removed when cleanup.
-  }
-
-  @Override
-  public ContainerCommandResponseProto dispatch(
-      ContainerCommandRequestProto msg) {
-    LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
-        msg.getTraceID());
-    long startNanos = System.nanoTime();
-    ContainerCommandResponseProto resp = null;
-    try {
-      Preconditions.checkNotNull(msg);
-      Type cmdType = msg.getCmdType();
-      metrics.incContainerOpsMetrics(cmdType);
-      if ((cmdType == Type.CreateContainer) ||
-          (cmdType == Type.DeleteContainer) ||
-          (cmdType == Type.ReadContainer) ||
-          (cmdType == Type.ListContainer) ||
-          (cmdType == Type.UpdateContainer) ||
-          (cmdType == Type.CloseContainer)) {
-        resp = containerProcessHandler(msg);
-      }
-
-      if ((cmdType == Type.PutKey) ||
-          (cmdType == Type.GetKey) ||
-          (cmdType == Type.DeleteKey) ||
-          (cmdType == Type.ListKey)) {
-        resp = keyProcessHandler(msg);
-      }
-
-      if ((cmdType == Type.WriteChunk) ||
-          (cmdType == Type.ReadChunk) ||
-          (cmdType == Type.DeleteChunk)) {
-        resp = chunkProcessHandler(msg);
-      }
-
-      if ((cmdType == Type.PutSmallFile) ||
-          (cmdType == Type.GetSmallFile)) {
-        resp = smallFileHandler(msg);
-      }
-
-      if (resp != null) {
-        metrics.incContainerOpsLatencies(cmdType,
-            System.nanoTime() - startNanos);
-        return resp;
-      }
-
-      return ContainerUtils.unsupportedRequest(msg);
-    } catch (StorageContainerException e) {
-      // This useful since the trace ID will allow us to correlate failures.
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    }
-  }
-
-  public ContainerMetrics getContainerMetrics() {
-    return metrics;
-  }
-
-  /**
-   * Handles the all Container related functionality.
-   *
-   * @param msg - command
-   * @return - response
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto containerProcessHandler(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-    try {
-
-      switch (msg.getCmdType()) {
-      case CreateContainer:
-        return handleCreateContainer(msg);
-
-      case DeleteContainer:
-        return handleDeleteContainer(msg);
-
-      case ListContainer:
-        // TODO : Support List Container.
-        return ContainerUtils.unsupportedRequest(msg);
-
-      case UpdateContainer:
-        return handleUpdateContainer(msg);
-
-      case ReadContainer:
-        return handleReadContainer(msg);
-
-      case CloseContainer:
-        return handleCloseContainer(msg);
-
-      default:
-        return ContainerUtils.unsupportedRequest(msg);
-      }
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IOException ex) {
-      LOG.warn("Container operation failed. " +
-              "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerID(),
-          msg.getCmdType().name(),
-          msg.getTraceID(),
-          ex.toString(), ex);
-
-      // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerCommandResponse(msg,
-          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
-          ex.toString()).build();
-    }
-  }
-
-  /**
-   * Handles the all key related functionality.
-   *
-   * @param msg - command
-   * @return - response
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto keyProcessHandler(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-    try {
-      switch (msg.getCmdType()) {
-      case PutKey:
-        return handlePutKey(msg);
-
-      case GetKey:
-        return handleGetKey(msg);
-
-      case DeleteKey:
-        return handleDeleteKey(msg);
-
-      case ListKey:
-        return ContainerUtils.unsupportedRequest(msg);
-
-      default:
-        return ContainerUtils.unsupportedRequest(msg);
-
-      }
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IOException ex) {
-      LOG.warn("Container operation failed. " +
-              "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerID(),
-          msg.getCmdType().name(),
-          msg.getTraceID(),
-          ex.toString(), ex);
-
-      // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerCommandResponse(msg,
-          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
-          ex.toString()).build();
-    }
-  }
-
-  /**
-   * Handles the all chunk related functionality.
-   *
-   * @param msg - command
-   * @return - response
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto chunkProcessHandler(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-    try {
-      switch (msg.getCmdType()) {
-      case WriteChunk:
-        return handleWriteChunk(msg);
-
-      case ReadChunk:
-        return handleReadChunk(msg);
-
-      case DeleteChunk:
-        return handleDeleteChunk(msg);
-
-      case ListChunk:
-        return ContainerUtils.unsupportedRequest(msg);
-
-      default:
-        return ContainerUtils.unsupportedRequest(msg);
-      }
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IOException ex) {
-      LOG.warn("Container operation failed. " +
-              "Container: {} Operation: {}  trace ID: {} Error: {}",
-          msg.getCreateContainer().getContainerID(),
-          msg.getCmdType().name(),
-          msg.getTraceID(),
-          ex.toString(), ex);
-
-      // TODO : Replace with finer error codes.
-      return ContainerUtils.getContainerCommandResponse(msg,
-          ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
-          ex.toString()).build();
-    }
-  }
-
-  /**
-   * Dispatch calls to small file hanlder.
-   * @param msg - request
-   * @return response
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto smallFileHandler(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-    switch (msg.getCmdType()) {
-    case PutSmallFile:
-      return handlePutSmallFile(msg);
-    case GetSmallFile:
-      return handleGetSmallFile(msg);
-    default:
-      return ContainerUtils.unsupportedRequest(msg);
-    }
-  }
-
-  /**
-   * Update an existing container with the new container data.
-   *
-   * @param msg Request
-   * @return ContainerCommandResponseProto
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleUpdateContainer(
-      ContainerCommandRequestProto msg)
-      throws IOException {
-    if (!msg.hasUpdateContainer()) {
-      LOG.debug("Malformed read container request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    long containerID = msg.getUpdateContainer().getContainerID();
-
-    ContainerData data = new ContainerData(msg.getUpdateContainer()
-        .getContainerID(), conf);
-    boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
-    this.containerManager.updateContainer(containerID,
-        data, forceUpdate);
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-  /**
-   * Calls into container logic and returns appropriate response.
-   *
-   * @param msg - Request
-   * @return ContainerCommandResponseProto
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleReadContainer(
-      ContainerCommandRequestProto msg)
-      throws IOException {
-
-    if (!msg.hasReadContainer()) {
-      LOG.debug("Malformed read container request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-
-    long containerID = msg.getReadContainer().getContainerID();
-    ContainerData container = this.containerManager.
-        readContainer(containerID);
-    return ContainerUtils.getReadContainerResponse(msg, container);
-  }
-
-  /**
-   * Calls into container logic and returns appropriate response.
-   *
-   * @param msg - Request
-   * @return Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleDeleteContainer(
-      ContainerCommandRequestProto msg) throws IOException {
-
-    if (!msg.hasDeleteContainer()) {
-      LOG.debug("Malformed delete container request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-
-    long containerID = msg.getDeleteContainer().getContainerID();
-    boolean forceDelete = msg.getDeleteContainer().getForceDelete();
-    this.containerManager.deleteContainer(containerID, forceDelete);
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-  /**
-   * Calls into container logic and returns appropriate response.
-   *
-   * @param msg - Request
-   * @return Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleCreateContainer(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasCreateContainer()) {
-      LOG.debug("Malformed create container request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    ContainerData cData = new ContainerData(
-        msg.getCreateContainer().getContainerID(), conf);
-
-    this.containerManager.createContainer(cData);
-    return ContainerUtils.getSuccessResponse(msg);
-  }
-
-  /**
-   * closes an open container.
-   *
-   * @param msg -
-   * @return
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleCloseContainer(
-      ContainerCommandRequestProto msg) throws IOException {
-    try {
-      if (!msg.hasCloseContainer()) {
-        LOG.debug("Malformed close Container request. trace ID: {}",
-            msg.getTraceID());
-        return ContainerUtils.malformedRequest(msg);
-      }
-      long containerID = msg.getCloseContainer().getContainerID();
-      if (!this.containerManager.isOpen(containerID)) {
-        throw new StorageContainerException("Attempting to close a closed " +
-            "container.", CLOSED_CONTAINER_IO);
-      }
-      this.containerManager.closeContainer(containerID);
-      return ContainerUtils.getSuccessResponse(msg);
-    } catch (NoSuchAlgorithmException e) {
-      throw new StorageContainerException("No such Algorithm", e,
-          NO_SUCH_ALGORITHM);
-    }
-  }
-
-  /**
-   * Calls into chunk manager to write a chunk.
-   *
-   * @param msg - Request.
-   * @return Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleWriteChunk(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasWriteChunk()) {
-      LOG.debug("Malformed write chunk request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    BlockID blockID = BlockID.getFromProtobuf(
-        msg.getWriteChunk().getBlockID());
-    if (!this.containerManager.isOpen(blockID.getContainerID())) {
-      throw new StorageContainerException("Write to closed container.",
-          CLOSED_CONTAINER_IO);
-    }
-
-    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
-        .getChunkData());
-    Preconditions.checkNotNull(chunkInfo);
-    byte[] data = null;
-    if (msg.getWriteChunk().getStage() == ContainerProtos.Stage.WRITE_DATA
-        || msg.getWriteChunk().getStage() == ContainerProtos.Stage.COMBINED) {
-      data = msg.getWriteChunk().getData().toByteArray();
-      metrics.incContainerBytesStats(Type.WriteChunk, data.length);
-
-    }
-    this.containerManager.getChunkManager()
-        .writeChunk(blockID, chunkInfo,
-            data, msg.getWriteChunk().getStage());
-
-    return ChunkUtils.getChunkResponse(msg);
-  }
-
-  /**
-   * Calls into chunk manager to read a chunk.
-   *
-   * @param msg - Request.
-   * @return - Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleReadChunk(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasReadChunk()) {
-      LOG.debug("Malformed read chunk request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    BlockID blockID = BlockID.getFromProtobuf(
-        msg.getReadChunk().getBlockID());
-    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getReadChunk()
-        .getChunkData());
-    Preconditions.checkNotNull(chunkInfo);
-    byte[] data = this.containerManager.getChunkManager().
-        readChunk(blockID, chunkInfo);
-    metrics.incContainerBytesStats(Type.ReadChunk, data.length);
-    return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
-  }
-
-  /**
-   * Calls into chunk manager to write a chunk.
-   *
-   * @param msg - Request.
-   * @return Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleDeleteChunk(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasDeleteChunk()) {
-      LOG.debug("Malformed delete chunk request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-
-    BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteChunk()
-        .getBlockID());
-    long containerID = blockID.getContainerID();
-    if (!this.containerManager.isOpen(containerID)) {
-      throw new StorageContainerException("Write to closed container.",
-          CLOSED_CONTAINER_IO);
-    }
-    ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
-        .getChunkData());
-    Preconditions.checkNotNull(chunkInfo);
-
-    this.containerManager.getChunkManager().deleteChunk(blockID,
-        chunkInfo);
-    return ChunkUtils.getChunkResponse(msg);
-  }
-
-  /**
-   * Put Key handler.
-   *
-   * @param msg - Request.
-   * @return - Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handlePutKey(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasPutKey()) {
-      LOG.debug("Malformed put key request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    BlockID blockID = BlockID.getFromProtobuf(
-        msg.getPutKey().getKeyData().getBlockID());
-    long containerID = blockID.getContainerID();
-    if (!this.containerManager.isOpen(containerID)) {
-      throw new StorageContainerException("Write to closed container.",
-          CLOSED_CONTAINER_IO);
-    }
-    KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
-    Preconditions.checkNotNull(keyData);
-    this.containerManager.getKeyManager().putKey(keyData);
-    long numBytes = keyData.getProtoBufMessage().toByteArray().length;
-    metrics.incContainerBytesStats(Type.PutKey, numBytes);
-    return KeyUtils.getKeyResponse(msg);
-  }
-
-  /**
-   * Handle Get Key.
-   *
-   * @param msg - Request.
-   * @return - Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleGetKey(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasGetKey()) {
-      LOG.debug("Malformed get key request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    KeyData keyData = new KeyData(
-        BlockID.getFromProtobuf(msg.getGetKey().getBlockID()));
-    Preconditions.checkNotNull(keyData);
-    KeyData responseData =
-        this.containerManager.getKeyManager().getKey(keyData);
-    long numBytes = responseData.getProtoBufMessage().toByteArray().length;
-    metrics.incContainerBytesStats(Type.GetKey, numBytes);
-    return KeyUtils.getKeyDataResponse(msg, responseData);
-  }
-
-  /**
-   * Handle Delete Key.
-   *
-   * @param msg - Request.
-   * @return - Response.
-   * @throws IOException
-   */
-  private ContainerCommandResponseProto handleDeleteKey(
-      ContainerCommandRequestProto msg) throws IOException {
-    if (!msg.hasDeleteKey()) {
-      LOG.debug("Malformed delete key request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    BlockID blockID = BlockID.getFromProtobuf(msg.getDeleteKey()
-        .getBlockID());
-    Preconditions.checkNotNull(blockID);
-    long containerID = blockID.getContainerID();
-    if (!this.containerManager.isOpen(containerID)) {
-      throw new StorageContainerException("Write to closed container.",
-          CLOSED_CONTAINER_IO);
-    }
-    this.containerManager.getKeyManager().deleteKey(blockID);
-    return KeyUtils.getKeyResponse(msg);
-  }
-
-  /**
-   * Handles writing a chunk and associated key using single RPC.
-   *
-   * @param msg - Message.
-   * @return ContainerCommandResponseProto
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto handlePutSmallFile(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-
-    if (!msg.hasPutSmallFile()) {
-      LOG.debug("Malformed put small file request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    try {
-
-      BlockID blockID = BlockID.getFromProtobuf(msg.
-          getPutSmallFile().getKey().getKeyData().getBlockID());
-      long containerID = blockID.getContainerID();
-
-      if (!this.containerManager.isOpen(containerID)) {
-        throw new StorageContainerException("Write to closed container.",
-            CLOSED_CONTAINER_IO);
-      }
-      KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
-          .getKeyData());
-      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()
-          .getChunkInfo());
-      byte[] data = msg.getPutSmallFile().getData().toByteArray();
-
-      metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
-      this.containerManager.getChunkManager().writeChunk(blockID,
-          chunkInfo, data, ContainerProtos.Stage.COMBINED);
-      List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
-      chunks.add(chunkInfo.getProtoBufMessage());
-      keyData.setChunks(chunks);
-      this.containerManager.getKeyManager().putKey(keyData);
-      return SmallFileUtils.getPutFileResponseSuccess(msg);
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IOException e) {
-      throw new StorageContainerException("Put Small File Failed.", e,
-          PUT_SMALL_FILE_ERROR);
-    }
-  }
-
-  /**
-   * Handles getting a data stream using a key. This helps in reducing the RPC
-   * overhead for small files.
-   *
-   * @param msg - ContainerCommandRequestProto
-   * @return ContainerCommandResponseProto
-   * @throws StorageContainerException
-   */
-  private ContainerCommandResponseProto handleGetSmallFile(
-      ContainerCommandRequestProto msg) throws StorageContainerException {
-    ByteString dataBuf = ByteString.EMPTY;
-    if (!msg.hasGetSmallFile()) {
-      LOG.debug("Malformed get small file request. trace ID: {}",
-          msg.getTraceID());
-      return ContainerUtils.malformedRequest(msg);
-    }
-    try {
-      long bytes = 0;
-      KeyData keyData = new KeyData(BlockID.getFromProtobuf(
-          msg.getGetSmallFile().getKey().getBlockID()));
-      KeyData data = this.containerManager.getKeyManager().getKey(keyData);
-      ContainerProtos.ChunkInfo c = null;
-      for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
-        bytes += chunk.getSerializedSize();
-        ByteString current =
-            ByteString.copyFrom(this.containerManager.getChunkManager()
-                .readChunk(keyData.getBlockID(),
-                    ChunkInfo.getFromProtoBuf(chunk)));
-        dataBuf = dataBuf.concat(current);
-        c = chunk;
-      }
-      metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
-      return SmallFileUtils.getGetSmallFileResponseSuccess(
-          msg, dataBuf.toByteArray(), ChunkInfo.getFromProtoBuf(c));
-    } catch (StorageContainerException e) {
-      return ContainerUtils.logAndReturnError(LOG, e, msg);
-    } catch (IOException e) {
-      throw new StorageContainerException("Get Small File Failed", e,
-          GET_SMALL_FILE_ERROR);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
deleted file mode 100644
index 40ae1c7..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .Result.NO_SUCH_KEY;
-
-/**
- * Key Manager impl.
- */
-public class KeyManagerImpl implements KeyManager {
-  static final Logger LOG =
-      LoggerFactory.getLogger(KeyManagerImpl.class);
-
-  private static final float LOAD_FACTOR = 0.75f;
-  private final ContainerManager containerManager;
-  private final Configuration conf;
-
-  /**
-   * Constructs a key Manager.
-   *
-   * @param containerManager - Container Manager.
-   */
-  public KeyManagerImpl(ContainerManager containerManager, Configuration conf) {
-    Preconditions.checkNotNull(containerManager, "Container manager cannot be" +
-        " null");
-    Preconditions.checkNotNull(conf, "Config cannot be null");
-    this.containerManager = containerManager;
-    this.conf = conf;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void putKey(KeyData data) throws IOException {
-    Preconditions.checkNotNull(data,
-        "KeyData cannot be null for put operation.");
-    Preconditions.checkState(data.getContainerID() >= 0,
-        "Container ID cannot be negative");
-    containerManager.readLock();
-    try {
-      // We are not locking the key manager since LevelDb serializes all actions
-      // against a single DB. We rely on DB level locking to avoid conflicts.
-      ContainerData cData = containerManager.readContainer(
-          data.getContainerID());
-      MetadataStore db = KeyUtils.getDB(cData, conf);
-
-      // This is a post condition that acts as a hint to the user.
-      // Should never fail.
-      Preconditions.checkNotNull(db, "DB cannot be null here");
-      db.put(Longs.toByteArray(data.getLocalID()), data
-          .getProtoBufMessage().toByteArray());
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public KeyData getKey(KeyData data) throws IOException {
-    containerManager.readLock();
-    try {
-      Preconditions.checkNotNull(data, "Key data cannot be null");
-      Preconditions.checkNotNull(data.getContainerID(),
-          "Container name cannot be null");
-      ContainerData cData = containerManager.readContainer(data
-          .getContainerID());
-      MetadataStore db = KeyUtils.getDB(cData, conf);
-
-      // This is a post condition that acts as a hint to the user.
-      // Should never fail.
-      Preconditions.checkNotNull(db, "DB cannot be null here");
-
-      byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
-      if (kData == null) {
-        throw new StorageContainerException("Unable to find the key.",
-            NO_SUCH_KEY);
-      }
-      ContainerProtos.KeyData keyData =
-          ContainerProtos.KeyData.parseFrom(kData);
-      return KeyData.getFromProtoBuf(keyData);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void deleteKey(BlockID blockID)
-      throws IOException {
-    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
-    Preconditions.checkState(blockID.getContainerID() >= 0,
-        "Container ID cannot be negative.");
-    Preconditions.checkState(blockID.getLocalID() >= 0,
-        "Local ID cannot be negative.");
-
-    containerManager.readLock();
-    try {
-
-      ContainerData cData = containerManager
-          .readContainer(blockID.getContainerID());
-      MetadataStore db = KeyUtils.getDB(cData, conf);
-
-      // This is a post condition that acts as a hint to the user.
-      // Should never fail.
-      Preconditions.checkNotNull(db, "DB cannot be null here");
-      // Note : There is a race condition here, since get and delete
-      // are not atomic. Leaving it here since the impact is refusing
-      // to delete a key which might have just gotten inserted after
-      // the get check.
-
-      byte[] kKey = Longs.toByteArray(blockID.getLocalID());
-      byte[] kData = db.get(kKey);
-      if (kData == null) {
-        throw new StorageContainerException("Unable to find the key.",
-            NO_SUCH_KEY);
-      }
-      db.delete(kKey);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KeyData> listKey(
-      long containerID, long startLocalID, int count)
-      throws IOException {
-    Preconditions.checkState(containerID >= 0,
-        "Container ID cannot be negative");
-    Preconditions.checkState(startLocalID >= 0,
-        "startLocal ID cannot be negative");
-    Preconditions.checkArgument(count > 0,
-        "Count must be a positive number.");
-    ContainerData cData = containerManager.readContainer(containerID);
-    MetadataStore db = KeyUtils.getDB(cData, conf);
-
-    List<KeyData> result = new ArrayList<>();
-    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
-    List<Map.Entry<byte[], byte[]>> range =
-        db.getSequentialRangeKVs(startKeyInBytes, count, null);
-    for (Map.Entry<byte[], byte[]> entry : range) {
-      KeyData value = KeyUtils.getKeyData(entry.getValue());
-      KeyData data = new KeyData(value.getBlockID());
-      result.add(data);
-    }
-    return result;
-  }
-
-  /**
-   * Shutdown keyManager.
-   */
-  @Override
-  public void shutdown() {
-    Preconditions.checkState(this.containerManager.hasWriteLock(), "asserts " +
-        "that we are holding the container manager lock when shutting down.");
-    KeyUtils.shutdownCache(ContainerCache.getInstance(conf));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
index 97fdb9e..83d746b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces
     .ContainerDeletionChoosingPolicy;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c2351e8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
index 9a109e8..68074fc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.impl;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces
     .ContainerDeletionChoosingPolicy;
 import org.slf4j.Logger;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org