You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:22 UTC

[13/37] hadoop git commit: HDDS-156. Implement HDDSVolume to manage volume state

HDDS-156. Implement HDDSVolume to manage volume state


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9a5552bf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9a5552bf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9a5552bf

Branch: refs/heads/trunk
Commit: 9a5552bf762880c38a233597b7c6e9ea09441108
Parents: 418cff4
Author: Hanisha Koneru <ha...@apache.org>
Authored: Thu Jun 14 13:28:41 2018 -0700
Committer: Hanisha Koneru <ha...@apache.org>
Committed: Thu Jun 14 13:28:41 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   5 +
 .../container/common/DataNodeLayoutVersion.java |  80 +++++
 .../common/helpers/DatanodeVersionFile.java     |  95 ++++++
 .../impl/RoundRobinVolumeChoosingPolicy.java    |  82 -----
 .../ozone/container/common/impl/VolumeInfo.java | 162 ---------
 .../ozone/container/common/impl/VolumeSet.java  | 251 --------------
 .../container/common/impl/VolumeUsage.java      | 189 -----------
 .../common/interfaces/VolumeChoosingPolicy.java |   4 +-
 .../container/common/utils/HddsVolumeUtil.java  | 163 +++++++++
 .../container/common/volume/HddsVolume.java     | 330 +++++++++++++++++++
 .../volume/RoundRobinVolumeChoosingPolicy.java  |  83 +++++
 .../container/common/volume/VolumeInfo.java     | 132 ++++++++
 .../container/common/volume/VolumeSet.java      | 309 +++++++++++++++++
 .../container/common/volume/VolumeUsage.java    | 198 +++++++++++
 .../container/common/volume/package-info.java   |  21 ++
 .../common/TestDatanodeLayOutVersion.java       |  38 +++
 .../common/helpers/TestDatanodeVersionFile.java | 134 ++++++++
 .../TestRoundRobinVolumeChoosingPolicy.java     | 100 ------
 .../common/interfaces/TestVolumeSet.java        | 149 ---------
 .../container/common/volume/TestHddsVolume.java | 145 ++++++++
 .../TestRoundRobinVolumeChoosingPolicy.java     | 131 ++++++++
 .../container/common/volume/TestVolumeSet.java  | 157 +++++++++
 22 files changed, 2023 insertions(+), 935 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index c40dc8e..36f830b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -33,6 +33,11 @@ public final class OzoneConsts {
   public static final String OZONE_SIMPLE_ROOT_USER = "root";
   public static final String OZONE_SIMPLE_HDFS_USER = "hdfs";
 
+  public static final String STORAGE_ID = "storageID";
+  public static final String DATANODE_UUID = "datanodeUuid";
+  public static final String CLUSTER_ID = "clusterID";
+  public static final String LAYOUTVERSION = "layOutVersion";
+  public static final String CTIME = "ctime";
   /*
    * BucketName length is used for both buckets and volume lengths
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java
new file mode 100644
index 0000000..2d58c39
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/DataNodeLayoutVersion.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+/**
+ * Datanode layout version which describes information about the layout version
+ * on the datanode.
+ */
+public final class DataNodeLayoutVersion {
+
+  // We will just be normal and use positive counting numbers for versions.
+  private final static DataNodeLayoutVersion[] VERSION_INFOS =
+      {new DataNodeLayoutVersion(1, "HDDS Datanode LayOut Version 1")};
+
+  private final String description;
+  private final int version;
+
+  /**
+   * Never created outside this class.
+   *
+   * @param description -- description
+   * @param version     -- version number
+   */
+  private DataNodeLayoutVersion(int version, String description) {
+    this.description = description;
+    this.version = version;
+  }
+
+  /**
+   * Returns all versions.
+   *
+   * @return Version info array.
+   */
+  public static DataNodeLayoutVersion[] getAllVersions() {
+    return VERSION_INFOS.clone();
+  }
+
+  /**
+   * Returns the latest version.
+   *
+   * @return versionInfo
+   */
+  public static DataNodeLayoutVersion getLatestVersion() {
+    return VERSION_INFOS[VERSION_INFOS.length - 1];
+  }
+
+  /**
+   * Return description.
+   *
+   * @return String
+   */
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * Return the version.
+   *
+   * @return int.
+   */
+  public int getVersion() {
+    return version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java
new file mode 100644
index 0000000..4db6d31
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DatanodeVersionFile.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Properties;
+
+/**
+ * This is a utility class which helps to create the version file on datanode
+ * and also validate the content of the version file.
+ */
+public class DatanodeVersionFile {
+
+  private final String storageId;
+  private final String clusterId;
+  private final String datanodeUuid;
+  private final long cTime;
+  private final int layOutVersion;
+
+  public DatanodeVersionFile(String storageId, String clusterId,
+      String datanodeUuid, long cTime, int layOutVersion) {
+    this.storageId = storageId;
+    this.clusterId = clusterId;
+    this.datanodeUuid = datanodeUuid;
+    this.cTime = cTime;
+    this.layOutVersion = layOutVersion;
+  }
+
+  private Properties createProperties() {
+    Properties properties = new Properties();
+    properties.setProperty(OzoneConsts.STORAGE_ID, storageId);
+    properties.setProperty(OzoneConsts.CLUSTER_ID, clusterId);
+    properties.setProperty(OzoneConsts.DATANODE_UUID, datanodeUuid);
+    properties.setProperty(OzoneConsts.CTIME, String.valueOf(cTime));
+    properties.setProperty(OzoneConsts.LAYOUTVERSION, String.valueOf(
+        layOutVersion));
+    return properties;
+  }
+
+  /**
+   * Creates a version File in specified path.
+   * @param path
+   * @throws IOException
+   */
+  public void createVersionFile(File path) throws
+      IOException {
+    try (RandomAccessFile file = new RandomAccessFile(path, "rws");
+         FileOutputStream out = new FileOutputStream(file.getFD())) {
+      file.getChannel().truncate(0);
+      Properties properties = createProperties();
+      /*
+       * If server is interrupted before this line,
+       * the version file will remain unchanged.
+       */
+      properties.store(out, null);
+    }
+  }
+
+
+  /**
+   * Creates a property object from the specified file content.
+   * @param  versionFile
+   * @return Properties
+   * @throws IOException
+   */
+  public static Properties readFrom(File versionFile) throws IOException {
+    try (RandomAccessFile file = new RandomAccessFile(versionFile, "rws");
+         FileInputStream in = new FileInputStream(file.getFD())) {
+      Properties props = new Properties();
+      props.load(in);
+      return props;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java
deleted file mode 100644
index 55b3049..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RoundRobinVolumeChoosingPolicy.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Choose volumes in round-robin order.
- * The caller should synchronize access to the list of volumes.
- */
-public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
-
-  public static final Log LOG = LogFactory.getLog(
-		RoundRobinVolumeChoosingPolicy.class);
-
-  // Stores the index of the next volume to be returned.
-  private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
-
-  @Override
-  public VolumeInfo chooseVolume(List<VolumeInfo> volumes,
-      long maxContainerSize) throws IOException {
-
-    // No volumes available to choose from
-    if (volumes.size() < 1) {
-      throw new DiskOutOfSpaceException("No more available volumes");
-    }
-
-    // since volumes could've been removed because of the failure
-    // make sure we are not out of bounds
-    int nextIndex = nextVolumeIndex.get();
-    int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
-
-    int startVolumeIndex = currentVolumeIndex;
-    long maxAvailable = 0;
-
-    while (true) {
-      final VolumeInfo volume = volumes.get(currentVolumeIndex);
-      long availableVolumeSize = volume.getAvailable();
-
-      currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
-
-      if (availableVolumeSize > maxContainerSize) {
-        nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
-        return volume;
-      }
-
-      if (availableVolumeSize > maxAvailable) {
-        maxAvailable = availableVolumeSize;
-      }
-
-      if (currentVolumeIndex == startVolumeIndex) {
-        throw new DiskOutOfSpaceException("Out of space: "
-            + "The volume with the most available space (=" + maxAvailable
-            + " B) is less than the container size (=" + maxContainerSize
-            + " B).");
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java
deleted file mode 100644
index 3e8dda6..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeInfo.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Stores information about a disk/volume.
- */
-public class VolumeInfo {
-
-  private static final Logger LOG = LoggerFactory.getLogger(VolumeInfo.class);
-
-  private final Path rootDir;
-  private final StorageType storageType;
-  private VolumeState state;
-
-  // Space usage calculator
-  private VolumeUsage usage;
-  // Capacity configured. This is useful when we want to
-  // limit the visible capacity for tests. If negative, then we just
-  // query from the filesystem.
-  private long configuredCapacity;
-
-  public static class Builder {
-    private final Configuration conf;
-    private final Path rootDir;
-    private StorageType storageType;
-    private VolumeState state;
-    private long configuredCapacity;
-
-    public Builder(Path rootDir, Configuration conf) {
-      this.rootDir = rootDir;
-      this.conf = conf;
-    }
-
-    public Builder(String rootDirStr, Configuration conf) {
-      this.rootDir = new Path(rootDirStr);
-      this.conf = conf;
-    }
-
-    public Builder storageType(StorageType storageType) {
-      this.storageType = storageType;
-      return this;
-    }
-
-    public Builder volumeState(VolumeState state) {
-      this.state = state;
-      return this;
-    }
-
-    public Builder configuredCapacity(long configuredCapacity) {
-      this.configuredCapacity = configuredCapacity;
-      return this;
-    }
-
-    public VolumeInfo build() throws IOException {
-      return new VolumeInfo(this);
-    }
-  }
-
-  private VolumeInfo(Builder b) throws IOException {
-
-    this.rootDir = b.rootDir;
-    File root = new File(rootDir.toString());
-
-    Boolean succeeded = root.isDirectory() || root.mkdirs();
-
-    if (!succeeded) {
-      LOG.error("Unable to create the volume root dir at : {}", root);
-      throw new IOException("Unable to create the volume root dir at " + root);
-    }
-
-    this.storageType = (b.storageType != null ?
-        b.storageType : StorageType.DEFAULT);
-
-    this.configuredCapacity = (b.configuredCapacity != 0 ?
-        b.configuredCapacity : -1);
-
-    this.state = (b.state != null ? b.state : VolumeState.NOT_FORMATTED);
-
-    this.usage = new VolumeUsage(root, b.conf);
-
-    LOG.info("Creating Volume : " + rootDir + " of storage type : " +
-        storageType + " and capacity : " + configuredCapacity);
-  }
-
-  public long getCapacity() {
-    return configuredCapacity < 0 ? usage.getCapacity() : configuredCapacity;
-  }
-
-  public long getAvailable() throws IOException {
-    return usage.getAvailable();
-  }
-
-  public long getScmUsed() throws IOException {
-    return usage.getScmUsed();
-  }
-
-  void shutdown() {
-    this.state = VolumeState.NON_EXISTENT;
-    shutdownUsageThread();
-  }
-
-  void failVolume() {
-    setState(VolumeState.FAILED);
-    shutdownUsageThread();
-  }
-
-  private void shutdownUsageThread() {
-    if (usage != null) {
-      usage.shutdown();
-    }
-    usage = null;
-  }
-
-  void setState(VolumeState state) {
-    this.state = state;
-  }
-
-  public boolean isFailed() {
-    return (state == VolumeState.FAILED);
-  }
-
-  public Path getRootDir() {
-    return this.rootDir;
-  }
-
-  public StorageType getStorageType() {
-    return this.storageType;
-  }
-
-  public enum VolumeState {
-    NORMAL,
-    FAILED,
-    NON_EXISTENT,
-    NOT_FORMATTED,
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java
deleted file mode 100644
index c55c84a..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeSet.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.container.common.impl.VolumeInfo.VolumeState;
-import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.InstrumentedLock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * VolumeSet to manage volumes in a DataNode.
- */
-public class VolumeSet {
-
-  private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
-
-  private Configuration conf;
-
-  /**
-   * {@link VolumeSet#volumeMap} maintains a map of all active volumes in the
-   * DataNode. Each volume has one-to-one mapping with a volumeInfo object.
-   */
-  private Map<Path, VolumeInfo> volumeMap;
-  /**
-   * {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have
-   * failed. The keys in this map and {@link VolumeSet#volumeMap} are
-   * mutually exclusive.
-   */
-  private Map<Path, VolumeInfo> failedVolumeMap;
-  /**
-   * {@link VolumeSet#volumeStateMap} maintains a list of active volumes per
-   * StorageType.
-   */
-  private EnumMap<StorageType, List<VolumeInfo>> volumeStateMap;
-
-  /**
-   * Lock to synchronize changes to the VolumeSet. Any update to
-   * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
-   * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
-   */
-  private final AutoCloseableLock volumeSetLock;
-
-  public VolumeSet(Configuration conf) throws DiskOutOfSpaceException {
-    this.conf = conf;
-    this.volumeSetLock = new AutoCloseableLock(
-        new InstrumentedLock(getClass().getName(), LOG,
-            new ReentrantLock(true),
-            conf.getTimeDuration(
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
-                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
-                TimeUnit.MILLISECONDS),
-            300));
-
-    initializeVolumeSet();
-  }
-
-  // Add DN volumes configured through ConfigKeys to volumeMap.
-  private void initializeVolumeSet() throws DiskOutOfSpaceException {
-    volumeMap = new ConcurrentHashMap<>();
-    failedVolumeMap = new ConcurrentHashMap<>();
-    volumeStateMap = new EnumMap<>(StorageType.class);
-
-    Collection<String> datanodeDirs = conf.getTrimmedStringCollection(
-        HDDS_DATANODE_DIR_KEY);
-    if (datanodeDirs.isEmpty()) {
-      datanodeDirs = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    }
-    if (datanodeDirs.isEmpty()) {
-      throw new IllegalArgumentException("No location configured in either "
-          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
-    }
-
-    for (StorageType storageType : StorageType.values()) {
-      volumeStateMap.put(storageType, new ArrayList<VolumeInfo>());
-    }
-
-    for (String dir : datanodeDirs) {
-      try {
-        VolumeInfo volumeInfo = getVolumeInfo(dir);
-
-        volumeMap.put(volumeInfo.getRootDir(), volumeInfo);
-        volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
-      } catch (IOException e) {
-        LOG.error("Failed to parse the storage location: " + dir, e);
-      }
-    }
-
-    if (volumeMap.size() == 0) {
-      throw new DiskOutOfSpaceException("No storage location configured");
-    }
-  }
-
-  public void acquireLock() {
-    volumeSetLock.acquire();
-  }
-
-  public void releaseLock() {
-    volumeSetLock.release();
-  }
-
-  private VolumeInfo getVolumeInfo(String rootDir) throws IOException {
-    StorageLocation location = StorageLocation.parse(rootDir);
-    StorageType storageType = location.getStorageType();
-
-    VolumeInfo.Builder volumeBuilder = new VolumeInfo.Builder(rootDir, conf);
-    volumeBuilder.storageType(storageType);
-    return volumeBuilder.build();
-  }
-
-  // Add a volume to VolumeSet
-  public void addVolume(String dataDir) throws IOException {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        LOG.warn("Volume : {} already exists in VolumeMap", dataDir);
-      } else {
-        if (failedVolumeMap.containsKey(dirPath)) {
-          failedVolumeMap.remove(dirPath);
-        }
-
-        VolumeInfo volumeInfo = getVolumeInfo(dirPath.toString());
-        volumeMap.put(dirPath, volumeInfo);
-        volumeStateMap.get(volumeInfo.getStorageType()).add(volumeInfo);
-
-        LOG.debug("Added Volume : {} to VolumeSet", dataDir);
-      }
-    }
-  }
-
-  // Mark a volume as failed
-  public void failVolume(String dataDir) {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = volumeMap.get(dirPath);
-        volumeInfo.failVolume();
-
-        volumeMap.remove(dirPath);
-        volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
-        failedVolumeMap.put(dirPath, volumeInfo);
-
-        LOG.debug("Moving Volume : {} to failed Volumes", dataDir);
-      } else if (failedVolumeMap.containsKey(dirPath)) {
-        LOG.debug("Volume : {} is not active", dataDir);
-      } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
-      }
-    }
-  }
-
-  // Remove a volume from the VolumeSet completely.
-  public void removeVolume(String dataDir) throws IOException {
-    Path dirPath = new Path(dataDir);
-
-    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
-      if (volumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = volumeMap.get(dirPath);
-        volumeInfo.shutdown();
-
-        volumeMap.remove(dirPath);
-        volumeStateMap.get(volumeInfo.getStorageType()).remove(volumeInfo);
-
-        LOG.debug("Removed Volume : {} from VolumeSet", dataDir);
-      } else if (failedVolumeMap.containsKey(dirPath)) {
-        VolumeInfo volumeInfo = failedVolumeMap.get(dirPath);
-        volumeInfo.setState(VolumeState.NON_EXISTENT);
-
-        failedVolumeMap.remove(dirPath);
-        LOG.debug("Removed Volume : {} from failed VolumeSet", dataDir);
-      } else {
-        LOG.warn("Volume : {} does not exist in VolumeSet", dataDir);
-      }
-    }
-  }
-
-  public VolumeInfo chooseVolume(long containerSize,
-      VolumeChoosingPolicy choosingPolicy) throws IOException {
-    return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
-  }
-
-  public void shutdown() {
-    for (VolumeInfo volumeInfo : volumeMap.values()) {
-      try {
-        volumeInfo.shutdown();
-      } catch (Exception e) {
-        LOG.error("Failed to shutdown volume : " + volumeInfo.getRootDir(), e);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public List<VolumeInfo> getVolumesList() {
-    return ImmutableList.copyOf(volumeMap.values());
-  }
-
-  @VisibleForTesting
-  public List<VolumeInfo> getFailedVolumesList() {
-    return ImmutableList.copyOf(failedVolumeMap.values());
-  }
-
-  @VisibleForTesting
-  public Map<Path, VolumeInfo> getVolumeMap() {
-    return ImmutableMap.copyOf(volumeMap);
-  }
-
-  @VisibleForTesting
-  public Map<StorageType, List<VolumeInfo>> getVolumeStateMap() {
-    return ImmutableMap.copyOf(volumeStateMap);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java
deleted file mode 100644
index bcd78ba..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/VolumeUsage.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CachingGetSpaceUsed;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.GetSpaceUsed;
-import org.apache.hadoop.io.IOUtils;
-import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.Scanner;
-
-/**
- * Class that wraps the space df of the Datanode Volumes used by SCM
- * containers.
- */
-public class VolumeUsage {
-  private static final Logger LOG = LoggerFactory.getLogger(VolumeUsage.class);
-
-  private final File rootDir;
-  private final DF df;
-  private final File scmUsedFile;
-  private GetSpaceUsed scmUsage;
-  private Runnable shutdownHook;
-
-  private static final String DU_CACHE_FILE = "scmUsed";
-  private volatile boolean scmUsedSaved = false;
-
-  VolumeUsage(File dataLoc, Configuration conf)
-      throws IOException {
-    this.rootDir = dataLoc;
-
-    // SCM used cache file
-    scmUsedFile = new File(rootDir, DU_CACHE_FILE);
-    // get overall disk df
-    this.df = new DF(rootDir, conf);
-
-    startScmUsageThread(conf);
-  }
-
-  void startScmUsageThread(Configuration conf) throws IOException {
-    // get SCM specific df
-    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(rootDir)
-        .setConf(conf)
-        .setInitialUsed(loadScmUsed())
-        .build();
-
-    // Ensure scm df is saved during shutdown.
-    shutdownHook = () -> {
-      if (!scmUsedSaved) {
-        saveScmUsed();
-      }
-    };
-    ShutdownHookManager.get().addShutdownHook(shutdownHook,
-        SHUTDOWN_HOOK_PRIORITY);
-  }
-
-  long getCapacity() {
-    long capacity = df.getCapacity();
-    return (capacity > 0) ? capacity : 0;
-  }
-
-  /*
-   * Calculate the available space in the volume.
-   */
-  long getAvailable() throws IOException {
-    long remaining = getCapacity() - getScmUsed();
-    long available = df.getAvailable();
-    if (remaining > available) {
-      remaining = available;
-    }
-    return (remaining > 0) ? remaining : 0;
-  }
-
-  long getScmUsed() throws IOException{
-    return scmUsage.getUsed();
-  }
-
-  public void shutdown() {
-    saveScmUsed();
-    scmUsedSaved = true;
-
-    if (shutdownHook != null) {
-      ShutdownHookManager.get().removeShutdownHook(shutdownHook);
-    }
-
-    if (scmUsage instanceof CachingGetSpaceUsed) {
-      IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
-    }
-  }
-
-  /**
-   * Read in the cached DU value and return it if it is less than 600 seconds
-   * old (DU update interval). Slight imprecision of scmUsed is not critical
-   * and skipping DU can significantly shorten the startup time.
-   * If the cached value is not available or too old, -1 is returned.
-   */
-  long loadScmUsed() {
-    long cachedScmUsed;
-    long mtime;
-    Scanner sc;
-
-    try {
-      sc = new Scanner(scmUsedFile, "UTF-8");
-    } catch (FileNotFoundException fnfe) {
-      return -1;
-    }
-
-    try {
-      // Get the recorded scmUsed from the file.
-      if (sc.hasNextLong()) {
-        cachedScmUsed = sc.nextLong();
-      } else {
-        return -1;
-      }
-      // Get the recorded mtime from the file.
-      if (sc.hasNextLong()) {
-        mtime = sc.nextLong();
-      } else {
-        return -1;
-      }
-
-      // Return the cached value if mtime is okay.
-      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
-        LOG.info("Cached ScmUsed found for {} : {} ", rootDir,
-            cachedScmUsed);
-        return cachedScmUsed;
-      }
-      return -1;
-    } finally {
-      sc.close();
-    }
-  }
-
-  /**
-   * Write the current scmUsed to the cache file.
-   */
-  void saveScmUsed() {
-    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
-      LOG.warn("Failed to delete old scmUsed file in {}.", rootDir);
-    }
-    OutputStreamWriter out = null;
-    try {
-      long used = getScmUsed();
-      if (used > 0) {
-        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
-            StandardCharsets.UTF_8);
-        // mtime is written last, so that truncated writes won't be valid.
-        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
-        out.flush();
-        out.close();
-        out = null;
-      }
-    } catch (IOException ioe) {
-      // If write failed, the volume might be bad. Since the cache file is
-      // not critical, log the error and continue.
-      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
-    } finally {
-      IOUtils.cleanupWithLogger(null, out);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java
index b8cbcb6..7de0e2a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/VolumeChoosingPolicy.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.container.common.interfaces;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ozone.container.common.impl.VolumeInfo;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,6 +41,6 @@ public interface VolumeChoosingPolicy {
    * @return the chosen volume.
    * @throws IOException when disks are unavailable or are full.
    */
-  VolumeInfo chooseVolume(List<VolumeInfo> volumes, long maxContainerSize)
+  HddsVolume chooseVolume(List<HddsVolume> volumes, long maxContainerSize)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
new file mode 100644
index 0000000..6809d57
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/HddsVolumeUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * A util class for {@link HddsVolume}.
+ */
+public final class HddsVolumeUtil {
+
+  // Private constructor for Utility class. Unused.
+  private HddsVolumeUtil() {
+  }
+
+  private static final String VERSION_FILE   = "VERSION";
+  private static final String STORAGE_ID_PREFIX = "DS-";
+
+  public static File getVersionFile(File rootDir) {
+    return new File(rootDir, VERSION_FILE);
+  }
+
+  public static String generateUuid() {
+    return STORAGE_ID_PREFIX + UUID.randomUUID();
+  }
+
+  /**
+   * Get hddsRoot from volume root. If volumeRoot points to hddsRoot, it is
+   * returned as is.
+   * For a volumeRoot /data/disk1, the hddsRoot is /data/disk1/hdds.
+   * @param volumeRoot root of the volume.
+   * @return hddsRoot of the volume.
+   */
+  public static String getHddsRoot(String volumeRoot) {
+    if (volumeRoot.endsWith(HddsVolume.HDDS_VOLUME_DIR)) {
+      return volumeRoot;
+    } else {
+      File hddsRoot = new File(volumeRoot, HddsVolume.HDDS_VOLUME_DIR);
+      return hddsRoot.getPath();
+    }
+  }
+
+  /**
+   * Returns storageID if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getStorageID(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    return getProperty(props, OzoneConsts.STORAGE_ID, versionFile);
+  }
+
+  /**
+   * Returns clusterID if it is valid. It should match the clusterID from the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getClusterID(Properties props, File versionFile,
+      String clusterID) throws InconsistentStorageStateException {
+    String cid = getProperty(props, OzoneConsts.CLUSTER_ID, versionFile);
+
+    if (clusterID == null) {
+      return cid;
+    }
+    if (!clusterID.equals(cid)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "ClusterIDs. Version File : " + versionFile + " has clusterID: " +
+          cid + " and Datanode has clusterID: " + clusterID);
+    }
+    return cid;
+  }
+
+  /**
+   * Returns datanodeUuid if it is valid. It should match the UUID of the
+   * Datanode. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static String getDatanodeUUID(Properties props, File versionFile,
+      String datanodeUuid)
+      throws InconsistentStorageStateException {
+    String datanodeID = getProperty(props, OzoneConsts.DATANODE_UUID,
+        versionFile);
+
+    if (datanodeUuid != null && !datanodeUuid.equals(datanodeID)) {
+      throw new InconsistentStorageStateException("Mismatched " +
+          "DatanodeUUIDs. Version File : " + versionFile + " has datanodeUuid: "
+          + datanodeID + " and Datanode has datanodeUuid: " + datanodeUuid);
+    }
+    return datanodeID;
+  }
+
+  /**
+   * Returns creationTime if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static long getCreationTime(Properties props, File versionFile)
+      throws InconsistentStorageStateException {
+    String cTimeStr = getProperty(props, OzoneConsts.CTIME, versionFile);
+
+    long cTime = Long.parseLong(cTimeStr);
+    long currentTime = Time.now();
+    if (cTime > currentTime || cTime < 0) {
+      throw new InconsistentStorageStateException("Invalid Creation time in " +
+          "Version File : " + versionFile + " - " + cTime + ". Current system" +
+          " time is " + currentTime);
+    }
+    return cTime;
+  }
+
+  /**
+   * Returns layOutVersion if it is valid. Throws an exception otherwise.
+   */
+  @VisibleForTesting
+  public static int getLayOutVersion(Properties props, File versionFile) throws
+      InconsistentStorageStateException {
+    String lvStr = getProperty(props, OzoneConsts.LAYOUTVERSION, versionFile);
+
+    int lv = Integer.parseInt(lvStr);
+    if(DataNodeLayoutVersion.getLatestVersion().getVersion() != lv) {
+      throw new InconsistentStorageStateException("Invalid layOutVersion. " +
+          "Version file has layOutVersion as " + lv + " and latest Datanode " +
+          "layOutVersion is " +
+          DataNodeLayoutVersion.getLatestVersion().getVersion());
+    }
+    return lv;
+  }
+
+  private static String getProperty(Properties props, String propName, File
+      versionFile)
+      throws InconsistentStorageStateException {
+    String value = props.getProperty(propName);
+    if (StringUtils.isBlank(value)) {
+      throw new InconsistentStorageStateException("Invalid " + propName +
+          ". Version File : " + versionFile + " has null or empty " + propName);
+    }
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
new file mode 100644
index 0000000..788e2cf
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
+import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * HddsVolume represents volume in a datanode. {@link VolumeSet} maitains a
+ * list of HddsVolumes, one for each volume in the Datanode.
+ * {@link VolumeInfo} in encompassed by this class.
+ */
+public final class HddsVolume {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
+
+  public static final String HDDS_VOLUME_DIR = "hdds";
+
+  private final File hddsRootDir;
+  private final VolumeInfo volumeInfo;
+  private VolumeState state;
+
+  // VERSION file properties
+  private String storageID;       // id of the file system
+  private String clusterID;       // id of the cluster
+  private String datanodeUuid;    // id of the DataNode
+  private long cTime;             // creation time of the file system state
+  private int layoutVersion;      // layout version of the storage data
+
+  /**
+   * Builder for HddsVolume.
+   */
+  public static class Builder {
+    private final String volumeRootStr;
+    private Configuration conf;
+    private StorageType storageType;
+    private long configuredCapacity;
+
+    private String datanodeUuid;
+    private String clusterID;
+
+    public Builder(String rootDirStr) {
+      this.volumeRootStr = rootDirStr;
+    }
+
+    public Builder conf(Configuration config) {
+      this.conf = config;
+      return this;
+    }
+
+    public Builder storageType(StorageType st) {
+      this.storageType = st;
+      return this;
+    }
+
+    public Builder configuredCapacity(long capacity) {
+      this.configuredCapacity = capacity;
+      return this;
+    }
+
+    public Builder datanodeUuid(String datanodeUUID) {
+      this.datanodeUuid = datanodeUUID;
+      return this;
+    }
+
+    public Builder clusterID(String cid) {
+      this.clusterID = cid;
+      return this;
+    }
+
+    public HddsVolume build() throws IOException {
+      return new HddsVolume(this);
+    }
+  }
+
+  private HddsVolume(Builder b) throws IOException {
+    Preconditions.checkNotNull(b.volumeRootStr,
+        "Volume root dir cannot be null");
+    Preconditions.checkNotNull(b.datanodeUuid, "DatanodeUUID cannot be null");
+    Preconditions.checkNotNull(b.conf, "Configuration cannot be null");
+
+    StorageLocation location = StorageLocation.parse(b.volumeRootStr);
+    hddsRootDir = new File(location.getUri().getPath(), HDDS_VOLUME_DIR);
+    this.state = VolumeState.NOT_INITIALIZED;
+    this.clusterID = b.clusterID;
+    this.datanodeUuid = b.datanodeUuid;
+
+    VolumeInfo.Builder volumeBuilder =
+        new VolumeInfo.Builder(b.volumeRootStr, b.conf)
+        .storageType(b.storageType)
+        .configuredCapacity(b.configuredCapacity);
+    this.volumeInfo = volumeBuilder.build();
+
+    LOG.info("Creating Volume: " + this.hddsRootDir + " of  storage type : " +
+        b.storageType + " and capacity : " + volumeInfo.getCapacity());
+
+    initialize();
+  }
+
+  /**
+   * Initializes the volume.
+   * Creates the Version file if not present,
+   * otherwise returns with IOException.
+   * @throws IOException
+   */
+  private void initialize() throws IOException {
+    VolumeState intialVolumeState = analyzeVolumeState();
+    switch (intialVolumeState) {
+    case NON_EXISTENT:
+      // Root directory does not exist. Create it.
+      if (!hddsRootDir.mkdir()) {
+        throw new IOException("Cannot create directory " + hddsRootDir);
+      }
+      setState(VolumeState.NOT_FORMATTED);
+      createVersionFile();
+      break;
+    case NOT_FORMATTED:
+      // Version File does not exist. Create it.
+      createVersionFile();
+      break;
+    case NOT_INITIALIZED:
+      // Version File exists. Verify its correctness and update property fields.
+      readVersionFile();
+      setState(VolumeState.NORMAL);
+      break;
+    default:
+      throw new IOException("Unrecognized initial state : " +
+          intialVolumeState + "of volume : " + hddsRootDir);
+    }
+  }
+
+  private VolumeState analyzeVolumeState() {
+    if (!hddsRootDir.exists()) {
+      return VolumeState.NON_EXISTENT;
+    }
+    if (!getVersionFile().exists()) {
+      return VolumeState.NOT_FORMATTED;
+    }
+    return VolumeState.NOT_INITIALIZED;
+  }
+
+  public void format(String cid) throws IOException {
+    Preconditions.checkNotNull(cid, "clusterID cannot be null while " +
+        "formatting Volume");
+    this.clusterID = cid;
+    initialize();
+  }
+
+  /**
+   * Create Version File and write property fields into it.
+   * @throws IOException
+   */
+  private void createVersionFile() throws IOException {
+    this.storageID = HddsVolumeUtil.generateUuid();
+    this.cTime = Time.now();
+    this.layoutVersion = ChunkLayOutVersion.getLatestVersion().getVersion();
+
+    if (this.clusterID == null || datanodeUuid == null) {
+      // HddsDatanodeService does not have the cluster information yet. Wait
+      // for registration with SCM.
+      LOG.debug("ClusterID not available. Cannot format the volume {}",
+          this.hddsRootDir.getPath());
+      setState(VolumeState.NOT_FORMATTED);
+    } else {
+      // Write the version file to disk.
+      writeVersionFile();
+      setState(VolumeState.NORMAL);
+    }
+  }
+
+  private void writeVersionFile() throws IOException {
+    Preconditions.checkNotNull(this.storageID,
+        "StorageID cannot be null in Version File");
+    Preconditions.checkNotNull(this.clusterID,
+        "ClusterID cannot be null in Version File");
+    Preconditions.checkNotNull(this.datanodeUuid,
+        "DatanodeUUID cannot be null in Version File");
+    Preconditions.checkArgument(this.cTime > 0,
+        "Creation Time should be positive");
+    Preconditions.checkArgument(this.layoutVersion ==
+            DataNodeLayoutVersion.getLatestVersion().getVersion(),
+        "Version File should have the latest LayOutVersion");
+
+    File versionFile = getVersionFile();
+    LOG.debug("Writing Version file to disk, {}", versionFile);
+
+    DatanodeVersionFile dnVersionFile = new DatanodeVersionFile(this.storageID,
+        this.clusterID, this.datanodeUuid, this.cTime, this.layoutVersion);
+    dnVersionFile.createVersionFile(versionFile);
+  }
+
+  /**
+   * Read Version File and update property fields.
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be read.
+   *
+   * @throws IOException on error
+   */
+  private void readVersionFile() throws IOException {
+    File versionFile = getVersionFile();
+    Properties props = DatanodeVersionFile.readFrom(versionFile);
+    if (props.isEmpty()) {
+      throw new InconsistentStorageStateException(
+          "Version file " + versionFile + " is missing");
+    }
+
+    LOG.debug("Reading Version file from disk, {}", versionFile);
+    this.storageID = HddsVolumeUtil.getStorageID(props, versionFile);
+    this.clusterID = HddsVolumeUtil.getClusterID(props, versionFile,
+        this.clusterID);
+    this.datanodeUuid = HddsVolumeUtil.getDatanodeUUID(props, versionFile,
+        this.datanodeUuid);
+    this.cTime = HddsVolumeUtil.getCreationTime(props, versionFile);
+    this.layoutVersion = HddsVolumeUtil.getLayOutVersion(props, versionFile);
+  }
+
+  private File getVersionFile() {
+    return HddsVolumeUtil.getVersionFile(hddsRootDir);
+  }
+
+  public File getHddsRootDir() {
+    return hddsRootDir;
+  }
+
+  public StorageType getStorageType() {
+    return volumeInfo.getStorageType();
+  }
+
+  public String getStorageID() {
+    return storageID;
+  }
+
+  public String getClusterID() {
+    return clusterID;
+  }
+
+  public String getDatanodeUuid() {
+    return datanodeUuid;
+  }
+
+  public long getCTime() {
+    return cTime;
+  }
+
+  public int getLayoutVersion() {
+    return layoutVersion;
+  }
+
+  public VolumeState getStorageState() {
+    return state;
+  }
+
+  public long getCapacity() throws IOException {
+    return volumeInfo.getCapacity();
+  }
+
+  public long getAvailable() throws IOException {
+    return volumeInfo.getAvailable();
+  }
+
+  public void setState(VolumeState state) {
+    this.state = state;
+  }
+
+  public boolean isFailed() {
+    return (state == VolumeState.FAILED);
+  }
+
+  public void failVolume() {
+    setState(VolumeState.FAILED);
+    volumeInfo.shutdownUsageThread();
+  }
+
+  public void shutdown() {
+    this.state = VolumeState.NON_EXISTENT;
+    volumeInfo.shutdownUsageThread();
+  }
+
+  /**
+   * VolumeState represents the different states a HddsVolume can be in.
+   */
+  public enum VolumeState {
+    NORMAL,
+    FAILED,
+    NON_EXISTENT,
+    NOT_FORMATTED,
+    NOT_INITIALIZED
+  }
+
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    volumeInfo.setScmUsageForTesting(scmUsageForTest);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
new file mode 100644
index 0000000..75c92ec
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/RoundRobinVolumeChoosingPolicy.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Choose volumes in round-robin order.
+ * The caller should synchronize access to the list of volumes.
+ */
+public class RoundRobinVolumeChoosingPolicy implements VolumeChoosingPolicy {
+
+  public static final Log LOG = LogFactory.getLog(
+      RoundRobinVolumeChoosingPolicy.class);
+
+  // Stores the index of the next volume to be returned.
+  private AtomicInteger nextVolumeIndex = new AtomicInteger(0);
+
+  @Override
+  public HddsVolume chooseVolume(List<HddsVolume> volumes,
+      long maxContainerSize) throws IOException {
+
+    // No volumes available to choose from
+    if (volumes.size() < 1) {
+      throw new DiskOutOfSpaceException("No more available volumes");
+    }
+
+    // since volumes could've been removed because of the failure
+    // make sure we are not out of bounds
+    int nextIndex = nextVolumeIndex.get();
+    int currentVolumeIndex = nextIndex < volumes.size() ? nextIndex : 0;
+
+    int startVolumeIndex = currentVolumeIndex;
+    long maxAvailable = 0;
+
+    while (true) {
+      final HddsVolume volume = volumes.get(currentVolumeIndex);
+      long availableVolumeSize = volume.getAvailable();
+
+      currentVolumeIndex = (currentVolumeIndex + 1) % volumes.size();
+
+      if (availableVolumeSize > maxContainerSize) {
+        nextVolumeIndex.compareAndSet(nextIndex, currentVolumeIndex);
+        return volume;
+      }
+
+      if (availableVolumeSize > maxAvailable) {
+        maxAvailable = availableVolumeSize;
+      }
+
+      if (currentVolumeIndex == startVolumeIndex) {
+        throw new DiskOutOfSpaceException("Out of space: "
+            + "The volume with the most available space (=" + maxAvailable
+            + " B) is less than the container size (=" + maxContainerSize
+            + " B).");
+      }
+
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java
new file mode 100644
index 0000000..4b13d45
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeInfo.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.fs.StorageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Stores information about a disk/volume.
+ */
+public class VolumeInfo {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VolumeInfo.class);
+
+  private final String rootDir;
+  private final StorageType storageType;
+
+  // Space usage calculator
+  private VolumeUsage usage;
+  // Capacity configured. This is useful when we want to
+  // limit the visible capacity for tests. If negative, then we just
+  // query from the filesystem.
+  private long configuredCapacity;
+
+  /**
+   * Builder for VolumeInfo.
+   */
+  public static class Builder {
+    private final Configuration conf;
+    private final String rootDir;
+    private StorageType storageType;
+    private long configuredCapacity;
+
+    public Builder(String root, Configuration config) {
+      this.rootDir = root;
+      this.conf = config;
+    }
+
+    public Builder storageType(StorageType st) {
+      this.storageType = st;
+      return this;
+    }
+
+    public Builder configuredCapacity(long capacity) {
+      this.configuredCapacity = capacity;
+      return this;
+    }
+
+    public VolumeInfo build() throws IOException {
+      return new VolumeInfo(this);
+    }
+  }
+
+  private VolumeInfo(Builder b) throws IOException {
+
+    this.rootDir = b.rootDir;
+    File root = new File(this.rootDir);
+
+    Boolean succeeded = root.isDirectory() || root.mkdirs();
+
+    if (!succeeded) {
+      LOG.error("Unable to create the volume root dir at : {}", root);
+      throw new IOException("Unable to create the volume root dir at " + root);
+    }
+
+    this.storageType = (b.storageType != null ?
+        b.storageType : StorageType.DEFAULT);
+
+    this.configuredCapacity = (b.configuredCapacity != 0 ?
+        b.configuredCapacity : -1);
+
+    this.usage = new VolumeUsage(root, b.conf);
+  }
+
+  public long getCapacity() {
+    return configuredCapacity < 0 ? usage.getCapacity() : configuredCapacity;
+  }
+
+  public long getAvailable() throws IOException {
+    return usage.getAvailable();
+  }
+
+  public long getScmUsed() throws IOException {
+    return usage.getScmUsed();
+  }
+
+  protected void shutdownUsageThread() {
+    if (usage != null) {
+      usage.shutdown();
+    }
+    usage = null;
+  }
+
+  public String getRootDir() {
+    return this.rootDir;
+  }
+
+  public StorageType getStorageType() {
+    return this.storageType;
+  }
+
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    usage.setScmUsageForTesting(scmUsageForTest);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
new file mode 100644
index 0000000..61aca79
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeSet.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
+import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.InstrumentedLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * VolumeSet to manage volumes in a DataNode.
+ */
+public class VolumeSet {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VolumeSet.class);
+
+  private Configuration conf;
+
+  /**
+   * {@link VolumeSet#volumeMap} maintains a map of all active volumes in the
+   * DataNode. Each volume has one-to-one mapping with a volumeInfo object.
+   */
+  private Map<String, HddsVolume> volumeMap;
+  /**
+   * {@link VolumeSet#failedVolumeMap} maintains a map of volumes which have
+   * failed. The keys in this map and {@link VolumeSet#volumeMap} are
+   * mutually exclusive.
+   */
+  private Map<String, HddsVolume> failedVolumeMap;
+  /**
+   * {@link VolumeSet#volumeStateMap} maintains a list of active volumes per
+   * StorageType.
+   */
+  private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
+
+  /**
+   * Lock to synchronize changes to the VolumeSet. Any update to
+   * {@link VolumeSet#volumeMap}, {@link VolumeSet#failedVolumeMap}, or
+   * {@link VolumeSet#volumeStateMap} should be done after acquiring this lock.
+   */
+  private final AutoCloseableLock volumeSetLock;
+
+  private final DatanodeDetails dnDetails;
+  private String datanodeUuid;
+  private String clusterID;
+
+  public VolumeSet(DatanodeDetails datanodeDetails, Configuration conf)
+      throws DiskOutOfSpaceException {
+    this(datanodeDetails, null, conf);
+  }
+
+  public VolumeSet(DatanodeDetails datanodeDetails, String clusterID,
+      Configuration conf)
+      throws DiskOutOfSpaceException {
+    this.dnDetails = datanodeDetails;
+    this.datanodeUuid = datanodeDetails.getUuidString();
+    this.clusterID = clusterID;
+    this.conf = conf;
+    this.volumeSetLock = new AutoCloseableLock(
+        new InstrumentedLock(getClass().getName(), LOG,
+            new ReentrantLock(true),
+            conf.getTimeDuration(
+                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+                DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+                TimeUnit.MILLISECONDS),
+            300));
+
+    initializeVolumeSet();
+  }
+
+  // Add DN volumes configured through ConfigKeys to volumeMap.
+  private void initializeVolumeSet() throws DiskOutOfSpaceException {
+    volumeMap = new ConcurrentHashMap<>();
+    failedVolumeMap = new ConcurrentHashMap<>();
+    volumeStateMap = new EnumMap<>(StorageType.class);
+
+    Collection<String> rawLocations = conf.getTrimmedStringCollection(
+        HDDS_DATANODE_DIR_KEY);
+    if (rawLocations.isEmpty()) {
+      rawLocations = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
+    }
+    if (rawLocations.isEmpty()) {
+      throw new IllegalArgumentException("No location configured in either "
+          + HDDS_DATANODE_DIR_KEY + " or " + DFS_DATANODE_DATA_DIR_KEY);
+    }
+
+    for (StorageType storageType : StorageType.values()) {
+      volumeStateMap.put(storageType, new ArrayList<HddsVolume>());
+    }
+
+    for (String locationString : rawLocations) {
+      try {
+        StorageLocation location = StorageLocation.parse(locationString);
+
+        HddsVolume hddsVolume = createVolume(location.getUri().getPath(),
+            location.getStorageType());
+
+        checkAndSetClusterID(hddsVolume.getClusterID());
+
+        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+        LOG.info("Added Volume : {} to VolumeSet",
+            hddsVolume.getHddsRootDir().getPath());
+      } catch (IOException e) {
+        LOG.error("Failed to parse the storage location: " + locationString, e);
+      }
+    }
+
+    if (volumeMap.size() == 0) {
+      throw new DiskOutOfSpaceException("No storage location configured");
+    }
+  }
+
+  /**
+   * If Version file exists and the {@link VolumeSet#clusterID} is not set yet,
+   * assign it the value from Version file. Otherwise, check that the given
+   * id matches with the id from version file.
+   * @param idFromVersionFile value of the property from Version file
+   * @throws InconsistentStorageStateException
+   */
+  private void checkAndSetClusterID(String idFromVersionFile)
+      throws InconsistentStorageStateException {
+    // If the clusterID is null (not set), assign it the value
+    // from version file.
+    if (this.clusterID == null) {
+      this.clusterID = idFromVersionFile;
+      return;
+    }
+
+    // If the clusterID is already set, it should match with the value from the
+    // version file.
+    if (!idFromVersionFile.equals(this.clusterID)) {
+      throw new InconsistentStorageStateException(
+          "Mismatched ClusterIDs. VolumeSet has: " + this.clusterID +
+              ", and version file has: " + idFromVersionFile);
+    }
+  }
+
+  public void acquireLock() {
+    volumeSetLock.acquire();
+  }
+
+  public void releaseLock() {
+    volumeSetLock.release();
+  }
+
+  private HddsVolume createVolume(String locationString,
+      StorageType storageType) throws IOException {
+    HddsVolume.Builder volumeBuilder = new HddsVolume.Builder(locationString)
+        .conf(conf)
+        .datanodeUuid(datanodeUuid)
+        .clusterID(clusterID)
+        .storageType(storageType);
+    return volumeBuilder.build();
+  }
+
+
+  // Add a volume to VolumeSet
+  public void addVolume(String dataDir) throws IOException {
+    addVolume(dataDir, StorageType.DEFAULT);
+  }
+
+  // Add a volume to VolumeSet
+  public void addVolume(String volumeRoot, StorageType storageType)
+      throws IOException {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        LOG.warn("Volume : {} already exists in VolumeMap", hddsRoot);
+      } else {
+        if (failedVolumeMap.containsKey(hddsRoot)) {
+          failedVolumeMap.remove(hddsRoot);
+        }
+
+        HddsVolume hddsVolume = createVolume(volumeRoot, storageType);
+        volumeMap.put(hddsVolume.getHddsRootDir().getPath(), hddsVolume);
+        volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
+
+        LOG.info("Added Volume : {} to VolumeSet",
+            hddsVolume.getHddsRootDir().getPath());
+      }
+    }
+  }
+
+  // Mark a volume as failed
+  public void failVolume(String dataDir) {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
+        hddsVolume.failVolume();
+
+        volumeMap.remove(hddsRoot);
+        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
+        failedVolumeMap.put(hddsRoot, hddsVolume);
+
+        LOG.info("Moving Volume : {} to failed Volumes", hddsRoot);
+      } else if (failedVolumeMap.containsKey(hddsRoot)) {
+        LOG.info("Volume : {} is not active", hddsRoot);
+      } else {
+        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+      }
+    }
+  }
+
+  // Remove a volume from the VolumeSet completely.
+  public void removeVolume(String dataDir) throws IOException {
+    String hddsRoot = HddsVolumeUtil.getHddsRoot(dataDir);
+
+    try (AutoCloseableLock lock = volumeSetLock.acquire()) {
+      if (volumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = volumeMap.get(hddsRoot);
+        hddsVolume.shutdown();
+
+        volumeMap.remove(hddsRoot);
+        volumeStateMap.get(hddsVolume.getStorageType()).remove(hddsVolume);
+
+        LOG.info("Removed Volume : {} from VolumeSet", hddsRoot);
+      } else if (failedVolumeMap.containsKey(hddsRoot)) {
+        HddsVolume hddsVolume = failedVolumeMap.get(hddsRoot);
+        hddsVolume.setState(VolumeState.NON_EXISTENT);
+
+        failedVolumeMap.remove(hddsRoot);
+        LOG.info("Removed Volume : {} from failed VolumeSet", hddsRoot);
+      } else {
+        LOG.warn("Volume : {} does not exist in VolumeSet", hddsRoot);
+      }
+    }
+  }
+
+  public HddsVolume chooseVolume(long containerSize,
+      VolumeChoosingPolicy choosingPolicy) throws IOException {
+    return choosingPolicy.chooseVolume(getVolumesList(), containerSize);
+  }
+
+  public void shutdown() {
+    for (HddsVolume hddsVolume : volumeMap.values()) {
+      try {
+        hddsVolume.shutdown();
+      } catch (Exception ex) {
+        LOG.error("Failed to shutdown volume : " + hddsVolume.getHddsRootDir(),
+            ex);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public List<HddsVolume> getVolumesList() {
+    return ImmutableList.copyOf(volumeMap.values());
+  }
+
+  @VisibleForTesting
+  public List<HddsVolume> getFailedVolumesList() {
+    return ImmutableList.copyOf(failedVolumeMap.values());
+  }
+
+  @VisibleForTesting
+  public Map<String, HddsVolume> getVolumeMap() {
+    return ImmutableMap.copyOf(volumeMap);
+  }
+
+  @VisibleForTesting
+  public Map<StorageType, List<HddsVolume>> getVolumeStateMap() {
+    return ImmutableMap.copyOf(volumeStateMap);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java
new file mode 100644
index 0000000..e10d1d4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/VolumeUsage.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CachingGetSpaceUsed;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.GetSpaceUsed;
+import org.apache.hadoop.io.IOUtils;
+import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
+
+/**
+ * Class that wraps the space df of the Datanode Volumes used by SCM
+ * containers.
+ */
+public class VolumeUsage {
+  private static final Logger LOG = LoggerFactory.getLogger(VolumeUsage.class);
+
+  private final File rootDir;
+  private final DF df;
+  private final File scmUsedFile;
+  private GetSpaceUsed scmUsage;
+  private Runnable shutdownHook;
+
+  private static final String DU_CACHE_FILE = "scmUsed";
+  private volatile boolean scmUsedSaved = false;
+
+  VolumeUsage(File dataLoc, Configuration conf)
+      throws IOException {
+    this.rootDir = dataLoc;
+
+    // SCM used cache file
+    scmUsedFile = new File(rootDir, DU_CACHE_FILE);
+    // get overall disk df
+    this.df = new DF(rootDir, conf);
+
+    startScmUsageThread(conf);
+  }
+
+  void startScmUsageThread(Configuration conf) throws IOException {
+    // get SCM specific df
+    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(rootDir)
+        .setConf(conf)
+        .setInitialUsed(loadScmUsed())
+        .build();
+
+    // Ensure scm df is saved during shutdown.
+    shutdownHook = () -> {
+      if (!scmUsedSaved) {
+        saveScmUsed();
+      }
+    };
+    ShutdownHookManager.get().addShutdownHook(shutdownHook,
+        SHUTDOWN_HOOK_PRIORITY);
+  }
+
+  long getCapacity() {
+    long capacity = df.getCapacity();
+    return (capacity > 0) ? capacity : 0;
+  }
+
+  /*
+   * Calculate the available space in the volume.
+   */
+  long getAvailable() throws IOException {
+    long remaining = getCapacity() - getScmUsed();
+    long available = df.getAvailable();
+    if (remaining > available) {
+      remaining = available;
+    }
+    return (remaining > 0) ? remaining : 0;
+  }
+
+  long getScmUsed() throws IOException{
+    return scmUsage.getUsed();
+  }
+
+  public void shutdown() {
+    saveScmUsed();
+    scmUsedSaved = true;
+
+    if (shutdownHook != null) {
+      ShutdownHookManager.get().removeShutdownHook(shutdownHook);
+    }
+
+    if (scmUsage instanceof CachingGetSpaceUsed) {
+      IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
+    }
+  }
+
+  /**
+   * Read in the cached DU value and return it if it is less than 600 seconds
+   * old (DU update interval). Slight imprecision of scmUsed is not critical
+   * and skipping DU can significantly shorten the startup time.
+   * If the cached value is not available or too old, -1 is returned.
+   */
+  long loadScmUsed() {
+    long cachedScmUsed;
+    long mtime;
+    Scanner sc;
+
+    try {
+      sc = new Scanner(scmUsedFile, "UTF-8");
+    } catch (FileNotFoundException fnfe) {
+      return -1;
+    }
+
+    try {
+      // Get the recorded scmUsed from the file.
+      if (sc.hasNextLong()) {
+        cachedScmUsed = sc.nextLong();
+      } else {
+        return -1;
+      }
+      // Get the recorded mtime from the file.
+      if (sc.hasNextLong()) {
+        mtime = sc.nextLong();
+      } else {
+        return -1;
+      }
+
+      // Return the cached value if mtime is okay.
+      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+        LOG.info("Cached ScmUsed found for {} : {} ", rootDir,
+            cachedScmUsed);
+        return cachedScmUsed;
+      }
+      return -1;
+    } finally {
+      sc.close();
+    }
+  }
+
+  /**
+   * Write the current scmUsed to the cache file.
+   */
+  void saveScmUsed() {
+    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
+      LOG.warn("Failed to delete old scmUsed file in {}.", rootDir);
+    }
+    OutputStreamWriter out = null;
+    try {
+      long used = getScmUsed();
+      if (used > 0) {
+        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
+            StandardCharsets.UTF_8);
+        // mtime is written last, so that truncated writes won't be valid.
+        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException ioe) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error and continue.
+      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
+    } finally {
+      IOUtils.cleanupWithLogger(null, out);
+    }
+  }
+
+  /**
+   * Only for testing. Do not use otherwise.
+   */
+  @VisibleForTesting
+  public void setScmUsageForTesting(GetSpaceUsed scmUsageForTest) {
+    this.scmUsage = scmUsageForTest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java
new file mode 100644
index 0000000..86093c6
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.volume;
+/**
+ This package contains volume/ disk related classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9a5552bf/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java
new file mode 100644
index 0000000..5cabef2
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeLayOutVersion.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests DatanodeLayOutVersion.
+ */
+public class TestDatanodeLayOutVersion {
+
+  @Test
+  public void testDatanodeLayOutVersion() {
+    // Check Latest Version and description
+    Assert.assertEquals(1, DataNodeLayoutVersion.getLatestVersion()
+        .getVersion());
+    Assert.assertEquals("HDDS Datanode LayOut Version 1", DataNodeLayoutVersion
+        .getLatestVersion().getDescription());
+    Assert.assertEquals(DataNodeLayoutVersion.getAllVersions().length,
+        DataNodeLayoutVersion.getAllVersions().length);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org