You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/09/25 23:51:09 UTC

hadoop git commit: Block Storage: volume creation times out while creating 3TB volume because of too many containers. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 e01245495 -> 087c69ba2


Block Storage: volume creation times out while creating 3TB volume because of too many containers. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/087c69ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/087c69ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/087c69ba

Branch: refs/heads/HDFS-7240
Commit: 087c69ba2434813044089f733581607d172efbd1
Parents: e012454
Author: Chen Liang <cl...@apache.org>
Authored: Mon Sep 25 16:50:55 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Mon Sep 25 16:50:55 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/cblock/CBlockConfigKeys.java  |  15 ++
 .../cblock/client/CBlockVolumeClient.java       |  28 ++-
 .../cblock/jscsiHelper/BlockWriterTask.java     |   6 +-
 .../cache/impl/AsyncBlockWriter.java            |   8 +-
 .../hadoop/cblock/meta/VolumeDescriptor.java    |   8 +-
 .../hadoop/cblock/storage/StorageManager.java   | 188 ++++++++++++++-----
 .../src/main/resources/ozone-default.xml        |  16 ++
 .../cblock/util/ContainerLookUpService.java     |   6 +-
 .../hadoop/cblock/util/MockStorageClient.java   |  19 +-
 9 files changed, 212 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index d77091f..87f40b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -172,6 +172,21 @@ public final class CBlockConfigKeys {
   public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
       64 * 1024;
 
+  /**
+   * Cblock CLI configs.
+   */
+  public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
+      "dfs.cblock.manager.pool.size";
+  public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;
+
+  /**
+   * currently the largest supported volume is about 8TB, which might take
+   * > 20 seconds to finish creating containers. thus set timeout to 30 sec.
+   */
+  public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS =
+      "dfs.cblock.rpc.timeout.seconds";
+  public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300;
+
   private CBlockConfigKeys() {
 
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
index 90a16ce..11965a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.cblock.client;
 
+import org.apache.hadoop.cblock.CBlockConfigKeys;
 import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -36,32 +37,25 @@ import java.util.concurrent.TimeUnit;
  */
 public class CBlockVolumeClient {
   private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
-  private final OzoneConfiguration conf;
 
   public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
-    this.conf = conf;
-    long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
-    InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
-    // currently the largest supported volume is about 8TB, which might take
-    // > 20 seconds to finish creating containers. thus set timeout to 30 sec.
-    cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
-        RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
-            address, UserGroupInformation.getCurrentUser(), conf,
-            NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
-                .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
-                    .SECONDS)).getProxy());
+    this(conf, null);
   }
 
   public CBlockVolumeClient(OzoneConfiguration conf,
       InetSocketAddress serverAddress) throws IOException {
-    this.conf = conf;
+    InetSocketAddress address = serverAddress != null ? serverAddress :
+        OzoneClientUtils.getCblockServiceRpcAddr(conf);
     long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
+    int rpcTimeout =
+        conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS,
+        CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000;
     cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
         RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
-            serverAddress, UserGroupInformation.getCurrentUser(), conf,
-            NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
-                .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
-                    .SECONDS)).getProxy());
+            address, UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
+                .retryUpToMaximumCountWithFixedSleep(
+                    300, 1, TimeUnit.SECONDS)).getProxy());
   }
 
   public void createVolume(String userName, String volumeName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
index c446dac..04fe3a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
@@ -77,6 +77,7 @@ public class BlockWriterTask implements Runnable {
     String containerName = null;
     XceiverClientSpi client = null;
     LevelDBStore levelDBStore = null;
+    String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
     flusher.getLOG().debug(
         "Writing block to remote. block ID: {}", block.getBlockID());
     try {
@@ -94,8 +95,7 @@ public class BlockWriterTask implements Runnable {
       Preconditions.checkState(data.length > 0, "Block data is zero length");
       startTime = Time.monotonicNow();
       ContainerProtocolCalls.writeSmallFile(client, containerName,
-          Long.toString(block.getBlockID()), data,
-          flusher.getTraceID(new File(dbPath), block.getBlockID()));
+          Long.toString(block.getBlockID()), data, traceID);
       endTime = Time.monotonicNow();
       flusher.getTargetMetrics().updateContainerWriteLatency(
           endTime - startTime);
@@ -107,7 +107,7 @@ public class BlockWriterTask implements Runnable {
     } catch (Exception ex) {
       flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
               "to write this block {} times to the container {}.Trace ID:{}",
-          block.getBlockID(), this.getTryCount(), containerName, "", ex);
+          block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
       writeRetryBlock(block);
       if (ex instanceof IOException) {
         flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 6ee67c0..992578f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -151,6 +151,7 @@ public class AsyncBlockWriter {
    */
   public void writeBlock(LogicalBlock block) throws IOException {
     byte[] keybuf = Longs.toByteArray(block.getBlockID());
+    String traceID = parentCache.getTraceID(block.getBlockID());
     if (parentCache.isShortCircuitIOEnabled()) {
       long startTime = Time.monotonicNow();
       getCacheDB().put(keybuf, block.getData().array());
@@ -176,7 +177,7 @@ public class AsyncBlockWriter {
             .acquireClient(parentCache.getPipeline(block.getBlockID()));
         ContainerProtocolCalls.writeSmallFile(client, containerName,
             Long.toString(block.getBlockID()), block.getData().array(),
-            parentCache.getTraceID(block.getBlockID()));
+            traceID);
         long endTime = Time.monotonicNow();
         if (parentCache.isTraceEnabled()) {
           String datahash = DigestUtils.sha256Hex(block.getData().array());
@@ -189,8 +190,9 @@ public class AsyncBlockWriter {
         parentCache.getTargetMetrics().incNumDirectBlockWrites();
       } catch (Exception ex) {
         parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
-        LOG.error("Direct I/O writing of block:{} to container {} failed",
-            block.getBlockID(), containerName, ex);
+        LOG.error("Direct I/O writing of block:{} traceID:{} to "
+            + "container {} failed", block.getBlockID(), traceID,
+            containerName, ex);
         throw ex;
       } finally {
         if (client != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
index c0031ba..4f5930d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * The internal representation maintained by CBlock server as the info for
@@ -53,7 +54,7 @@ public class VolumeDescriptor {
   private static final Logger LOG =
       LoggerFactory.getLogger(VolumeDescriptor.class);
 
-  private HashMap<String, ContainerDescriptor> containerMap;
+  private ConcurrentHashMap<String, ContainerDescriptor> containerMap;
   private String userName;
   private int blockSize;
   private long volumeSize;
@@ -72,13 +73,12 @@ public class VolumeDescriptor {
    * and set*() methods are for the same purpose also.
    */
   public VolumeDescriptor() {
-    containerMap = new HashMap<>();
-    containerIdOrdered = new ArrayList<>();
+    this(null, null, 0, 0);
   }
 
   public VolumeDescriptor(String userName, String volumeName, long volumeSize,
       int blockSize) {
-    this.containerMap = new HashMap<>();
+    this.containerMap = new ConcurrentHashMap<>();
     this.userName = userName;
     this.volumeName = volumeName;
     this.blockSize = blockSize;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
index 1f22aa8..711e763 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.cblock.storage;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
 import org.apache.hadoop.cblock.exception.CBlockException;
 import org.apache.hadoop.cblock.meta.ContainerDescriptor;
 import org.apache.hadoop.cblock.meta.VolumeDescriptor;
@@ -37,25 +38,27 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class maintains the key space of CBlock, more specifically, the
  * volume to container mapping. The core data structure
  * is a map from users to their volumes info, where volume info is a handler
- * to a volume, containing information for IO on that volume.
- *
- * and a storage client responsible for talking to the SCM
- *
- * TODO : all the volume operations are fully serialized, which can potentially
- * be optimized.
- *
- * TODO : if the certain operations (e.g. create) failed, the failure-handling
- * logic may not be properly implemented currently.
+ * to a volume, containing information for IO on that volume and a storage
+ * client responsible for talking to the SCM.
  */
 public class StorageManager {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(StorageManager.class);
   private final ScmClient storageClient;
+  private final int numThreads;
+  private static final int MAX_THREADS =
+      Runtime.getRuntime().availableProcessors() * 2;
+  private static final int MAX_QUEUE_CAPACITY = 1024;
+
   /**
    * We will NOT have the situation where same kv pair getting
    * processed, but it is possible to have multiple kv pair being
@@ -78,6 +81,9 @@ public class StorageManager {
     this.storageClient = storageClient;
     this.user2VolumeMap = new ConcurrentHashMap<>();
     this.containerSizeB = storageClient.getContainerSize(null);
+    this.numThreads =
+        ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
+            CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
   }
 
   /**
@@ -149,6 +155,127 @@ public class StorageManager {
     makeVolumeReady(userName, volumeName, volumeDescriptor);
   }
 
+  private class CreateContainerTask implements Runnable {
+    private final VolumeDescriptor volume;
+    private final int containerIdx;
+    private final ArrayList<String> containerIds;
+    private final AtomicInteger numFailed;
+
+    CreateContainerTask(VolumeDescriptor volume, int containerIdx,
+                        ArrayList<String> containerIds,
+                        AtomicInteger numFailed) {
+      this.volume = volume;
+      this.containerIdx = containerIdx;
+      this.containerIds = containerIds;
+      this.numFailed = numFailed;
+    }
+
+    /**
+     * When an object implementing interface <code>Runnable</code> is used
+     * to create a thread, starting the thread causes the object's
+     * <code>run</code> method to be called in that separately executing
+     * thread.
+     * <p>
+     * The general contract of the method <code>run</code> is that it may
+     * take any action whatsoever.
+     *
+     * @see Thread#run()
+     */
+    public void run() {
+      ContainerDescriptor container = null;
+      try {
+        Pipeline pipeline = storageClient.createContainer(
+            OzoneProtos.ReplicationType.STAND_ALONE,
+            OzoneProtos.ReplicationFactor.ONE,
+            KeyUtil.getContainerName(volume.getUserName(),
+                volume.getVolumeName(), containerIdx));
+
+        container = new ContainerDescriptor(pipeline.getContainerName());
+
+        container.setPipeline(pipeline);
+        container.setContainerIndex(containerIdx);
+        volume.addContainer(container);
+        containerIds.set(containerIdx, container.getContainerID());
+      } catch (Exception e) {
+        numFailed.incrementAndGet();
+        if (container != null) {
+          LOGGER.error("Error creating container Container:{}:" +
+              " index:{} error:{}", container.getContainerID(),
+              containerIdx, e);
+        }
+      }
+    }
+  }
+
+  private boolean createVolumeContainers(VolumeDescriptor volume) {
+    ArrayList<String> containerIds = new ArrayList<>();
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
+        MAX_THREADS, 1, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+    AtomicInteger numFailedCreates = new AtomicInteger(0);
+    long allocatedSize = 0;
+    int containerIdx = 0;
+    while (allocatedSize < volume.getVolumeSize()) {
+      // adding null to allocate space in ArrayList
+      containerIds.add(containerIdx, null);
+      Runnable task = new CreateContainerTask(volume, containerIdx,
+          containerIds, numFailedCreates);
+      executor.submit(task);
+      allocatedSize += containerSizeB;
+      containerIdx += 1;
+    }
+
+    // issue the command and then wait for it to finish
+    executor.shutdown();
+    try {
+      executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error creating volume:{} error:{}",
+          volume.getVolumeName(), e);
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+
+    volume.setContainerIDs(containerIds);
+    return numFailedCreates.get() == 0;
+  }
+
+  private void deleteContainer(String containerID, boolean force) {
+    try {
+      Pipeline pipeline = storageClient.getContainer(containerID);
+      storageClient.deleteContainer(pipeline, force);
+    } catch (Exception e) {
+      LOGGER.error("Error deleting container Container:{} error:{}",
+          containerID, e);
+    }
+  }
+
+  private void deleteVolumeContainers(List<String> containers, boolean force)
+      throws CBlockException {
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
+        MAX_THREADS, 1, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+    for (String deleteContainer : containers) {
+      if (deleteContainer != null) {
+        Runnable task = () -> deleteContainer(deleteContainer, force);
+        executor.submit(task);
+      }
+    }
+
+    // issue the command and then wait for it to finish
+    executor.shutdown();
+    try {
+      executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOGGER.error("Error deleting containers error:{}", e);
+      executor.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
 
   /**
    * Called by CBlock server when creating a fresh volume. The core
@@ -172,31 +299,13 @@ public class StorageManager {
       throw new CBlockException("Volume size smaller than block size? " +
           "volume size:" + volumeSize + " block size:" + blockSize);
     }
-    VolumeDescriptor volume;
-    int containerIdx = 0;
-    try {
-      volume = new VolumeDescriptor(userName, volumeName,
-          volumeSize, blockSize);
-      long allocatedSize = 0;
-      ArrayList<String> containerIds = new ArrayList<>();
-      while (allocatedSize < volumeSize) {
-        Pipeline pipeline = storageClient.createContainer(OzoneProtos
-                .ReplicationType.STAND_ALONE,
-            OzoneProtos.ReplicationFactor.ONE,
-            KeyUtil.getContainerName(userName, volumeName, containerIdx));
-        ContainerDescriptor container =
-            new ContainerDescriptor(pipeline.getContainerName());
-        container.setPipeline(pipeline);
-        container.setContainerIndex(containerIdx);
-        volume.addContainer(container);
-        containerIds.add(container.getContainerID());
-        allocatedSize += containerSizeB;
-        containerIdx += 1;
-      }
-      volume.setContainerIDs(containerIds);
-    } catch (IOException e) {
-      throw new CBlockException("Error when creating volume:" + e.getMessage());
-      // TODO : delete already created containers? or re-try policy
+    VolumeDescriptor volume
+        = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
+    boolean success = createVolumeContainers(volume);
+    if (!success) {
+      // cleanup the containers and throw the exception
+      deleteVolumeContainers(volume.getContainerIDsList(), true);
+      throw new CBlockException("Error when creating volume:" + volumeName);
     }
     makeVolumeReady(userName, volumeName, volume);
   }
@@ -223,16 +332,7 @@ public class StorageManager {
       throw new CBlockException("Deleting a non-empty volume without force!");
     }
     VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
-    for (String containerID : volume.getContainerIDsList()) {
-      try {
-        Pipeline pipeline = storageClient.getContainer(containerID);
-        storageClient.deleteContainer(pipeline, force);
-      } catch (IOException e) {
-        LOGGER.error("Error deleting container Container:{} error:{}",
-            containerID, e);
-        throw new CBlockException(e.getMessage());
-      }
-    }
+    deleteVolumeContainers(volume.getContainerIDsList(), force);
     if (user2VolumeMap.get(userName).size() == 0) {
       user2VolumeMap.remove(userName);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 4e0028f..0860db8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -898,6 +898,22 @@
   </property>
 
   <property>
+    <name>dfs.cblock.manager.pool.size</name>
+    <value>16</value>
+    <description>
+      Number of threads that cblock manager will use for container operations.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.cblock.rpc.timeout.seconds</name>
+    <value>300</value>
+    <description>
+      RPC timeout in seconds used for cblock CLI operations.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.cblock.scm.ipaddress</name>
     <value>127.0.0.1</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
index 8d9c2a0..8cb57d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * NOTE : This class is only for testing purpose.
@@ -34,8 +34,8 @@ import java.util.HashMap;
  * This is to allow volume creation call and perform standalone tests.
  */
 public final class ContainerLookUpService {
-  private static HashMap<String, ContainerDescriptor>
-      containers = new HashMap<>();
+  private static ConcurrentHashMap<String, ContainerDescriptor>
+      containers = new ConcurrentHashMap<>();
 
   /**
    * Return an *existing* container with given Id.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
index f45ea15..a318876 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class is the one that directly talks to SCM server.
@@ -37,7 +38,8 @@ import java.util.List;
  *
  */
 public class MockStorageClient implements ScmClient {
-  private static long currentContainerId = -1;
+  private static AtomicInteger currentContainerId =
+      new AtomicInteger(0);
 
   /**
    * Ask SCM to get a exclusive container.
@@ -48,9 +50,9 @@ public class MockStorageClient implements ScmClient {
   @Override
   public Pipeline createContainer(String containerId)
       throws IOException {
-    currentContainerId += 1;
-    ContainerLookUpService.addContainer(Long.toString(currentContainerId));
-    return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
+    int contId = currentContainerId.getAndIncrement();
+    ContainerLookUpService.addContainer(Long.toString(contId));
+    return ContainerLookUpService.lookUp(Long.toString(contId))
         .getPipeline();
   }
 
@@ -126,10 +128,11 @@ public class MockStorageClient implements ScmClient {
   public Pipeline createContainer(OzoneProtos.ReplicationType type,
       OzoneProtos.ReplicationFactor replicationFactor, String containerId)
       throws IOException {
-    currentContainerId += 1;
-    ContainerLookUpService.addContainer(Long.toString(currentContainerId));
-    return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
-        .getPipeline();  }
+    int contId = currentContainerId.getAndIncrement();
+    ContainerLookUpService.addContainer(Long.toString(contId));
+    return ContainerLookUpService.lookUp(Long.toString(contId))
+        .getPipeline();
+  }
 
   /**
    * Returns a set of Nodes that meet a query criteria.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org