You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/09/25 23:51:09 UTC
hadoop git commit: Block Storage: volume creation times out while
creating 3TB volume because of too many containers. Contributed by Mukul
Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 e01245495 -> 087c69ba2
Block Storage: volume creation times out while creating 3TB volume because of too many containers. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/087c69ba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/087c69ba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/087c69ba
Branch: refs/heads/HDFS-7240
Commit: 087c69ba2434813044089f733581607d172efbd1
Parents: e012454
Author: Chen Liang <cl...@apache.org>
Authored: Mon Sep 25 16:50:55 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Mon Sep 25 16:50:55 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/cblock/CBlockConfigKeys.java | 15 ++
.../cblock/client/CBlockVolumeClient.java | 28 ++-
.../cblock/jscsiHelper/BlockWriterTask.java | 6 +-
.../cache/impl/AsyncBlockWriter.java | 8 +-
.../hadoop/cblock/meta/VolumeDescriptor.java | 8 +-
.../hadoop/cblock/storage/StorageManager.java | 188 ++++++++++++++-----
.../src/main/resources/ozone-default.xml | 16 ++
.../cblock/util/ContainerLookUpService.java | 6 +-
.../hadoop/cblock/util/MockStorageClient.java | 19 +-
9 files changed, 212 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
index d77091f..87f40b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java
@@ -172,6 +172,21 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
64 * 1024;
+ /**
+ * Cblock CLI configs.
+ */
+ public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
+ "dfs.cblock.manager.pool.size";
+ public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;
+
+ /**
+ * currently the largest supported volume is about 8TB, which might take
+ * > 20 seconds to finish creating containers. thus set timeout to 30 sec.
+ */
+ public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS =
+ "dfs.cblock.rpc.timeout.seconds";
+ public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300;
+
private CBlockConfigKeys() {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
index 90a16ce..11965a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.cblock.client;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -36,32 +37,25 @@ import java.util.concurrent.TimeUnit;
*/
public class CBlockVolumeClient {
private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
- private final OzoneConfiguration conf;
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
- this.conf = conf;
- long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
- InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
- // currently the largest supported volume is about 8TB, which might take
- // > 20 seconds to finish creating containers. thus set timeout to 30 sec.
- cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
- RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
- address, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
- .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
- .SECONDS)).getProxy());
+ this(conf, null);
}
public CBlockVolumeClient(OzoneConfiguration conf,
InetSocketAddress serverAddress) throws IOException {
- this.conf = conf;
+ InetSocketAddress address = serverAddress != null ? serverAddress :
+ OzoneClientUtils.getCblockServiceRpcAddr(conf);
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
+ int rpcTimeout =
+ conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS,
+ CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000;
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
- serverAddress, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
- .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
- .SECONDS)).getProxy());
+ address, UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(
+ 300, 1, TimeUnit.SECONDS)).getProxy());
}
public void createVolume(String userName, String volumeName,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
index c446dac..04fe3a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java
@@ -77,6 +77,7 @@ public class BlockWriterTask implements Runnable {
String containerName = null;
XceiverClientSpi client = null;
LevelDBStore levelDBStore = null;
+ String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
flusher.getLOG().debug(
"Writing block to remote. block ID: {}", block.getBlockID());
try {
@@ -94,8 +95,7 @@ public class BlockWriterTask implements Runnable {
Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow();
ContainerProtocolCalls.writeSmallFile(client, containerName,
- Long.toString(block.getBlockID()), data,
- flusher.getTraceID(new File(dbPath), block.getBlockID()));
+ Long.toString(block.getBlockID()), data, traceID);
endTime = Time.monotonicNow();
flusher.getTargetMetrics().updateContainerWriteLatency(
endTime - startTime);
@@ -107,7 +107,7 @@ public class BlockWriterTask implements Runnable {
} catch (Exception ex) {
flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}",
- block.getBlockID(), this.getTryCount(), containerName, "", ex);
+ block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
writeRetryBlock(block);
if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
index 6ee67c0..992578f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java
@@ -151,6 +151,7 @@ public class AsyncBlockWriter {
*/
public void writeBlock(LogicalBlock block) throws IOException {
byte[] keybuf = Longs.toByteArray(block.getBlockID());
+ String traceID = parentCache.getTraceID(block.getBlockID());
if (parentCache.isShortCircuitIOEnabled()) {
long startTime = Time.monotonicNow();
getCacheDB().put(keybuf, block.getData().array());
@@ -176,7 +177,7 @@ public class AsyncBlockWriter {
.acquireClient(parentCache.getPipeline(block.getBlockID()));
ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), block.getData().array(),
- parentCache.getTraceID(block.getBlockID()));
+ traceID);
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
String datahash = DigestUtils.sha256Hex(block.getData().array());
@@ -189,8 +190,9 @@ public class AsyncBlockWriter {
parentCache.getTargetMetrics().incNumDirectBlockWrites();
} catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
- LOG.error("Direct I/O writing of block:{} to container {} failed",
- block.getBlockID(), containerName, ex);
+ LOG.error("Direct I/O writing of block:{} traceID:{} to "
+ + "container {} failed", block.getBlockID(), traceID,
+ containerName, ex);
throw ex;
} finally {
if (client != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
index c0031ba..4f5930d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* The internal representation maintained by CBlock server as the info for
@@ -53,7 +54,7 @@ public class VolumeDescriptor {
private static final Logger LOG =
LoggerFactory.getLogger(VolumeDescriptor.class);
- private HashMap<String, ContainerDescriptor> containerMap;
+ private ConcurrentHashMap<String, ContainerDescriptor> containerMap;
private String userName;
private int blockSize;
private long volumeSize;
@@ -72,13 +73,12 @@ public class VolumeDescriptor {
* and set*() methods are for the same purpose also.
*/
public VolumeDescriptor() {
- containerMap = new HashMap<>();
- containerIdOrdered = new ArrayList<>();
+ this(null, null, 0, 0);
}
public VolumeDescriptor(String userName, String volumeName, long volumeSize,
int blockSize) {
- this.containerMap = new HashMap<>();
+ this.containerMap = new ConcurrentHashMap<>();
this.userName = userName;
this.volumeName = volumeName;
this.blockSize = blockSize;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
index 1f22aa8..711e763 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.cblock.storage;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.cblock.meta.VolumeDescriptor;
@@ -37,25 +38,27 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This class maintains the key space of CBlock, more specifically, the
* volume to container mapping. The core data structure
* is a map from users to their volumes info, where volume info is a handler
- * to a volume, containing information for IO on that volume.
- *
- * and a storage client responsible for talking to the SCM
- *
- * TODO : all the volume operations are fully serialized, which can potentially
- * be optimized.
- *
- * TODO : if the certain operations (e.g. create) failed, the failure-handling
- * logic may not be properly implemented currently.
+ * to a volume, containing information for IO on that volume and a storage
+ * client responsible for talking to the SCM.
*/
public class StorageManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(StorageManager.class);
private final ScmClient storageClient;
+ private final int numThreads;
+ private static final int MAX_THREADS =
+ Runtime.getRuntime().availableProcessors() * 2;
+ private static final int MAX_QUEUE_CAPACITY = 1024;
+
/**
* We will NOT have the situation where same kv pair getting
* processed, but it is possible to have multiple kv pair being
@@ -78,6 +81,9 @@ public class StorageManager {
this.storageClient = storageClient;
this.user2VolumeMap = new ConcurrentHashMap<>();
this.containerSizeB = storageClient.getContainerSize(null);
+ this.numThreads =
+ ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
+ CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
}
/**
@@ -149,6 +155,127 @@ public class StorageManager {
makeVolumeReady(userName, volumeName, volumeDescriptor);
}
+ private class CreateContainerTask implements Runnable {
+ private final VolumeDescriptor volume;
+ private final int containerIdx;
+ private final ArrayList<String> containerIds;
+ private final AtomicInteger numFailed;
+
+ CreateContainerTask(VolumeDescriptor volume, int containerIdx,
+ ArrayList<String> containerIds,
+ AtomicInteger numFailed) {
+ this.volume = volume;
+ this.containerIdx = containerIdx;
+ this.containerIds = containerIds;
+ this.numFailed = numFailed;
+ }
+
+ /**
+ * When an object implementing interface <code>Runnable</code> is used
+ * to create a thread, starting the thread causes the object's
+ * <code>run</code> method to be called in that separately executing
+ * thread.
+ * <p>
+ * The general contract of the method <code>run</code> is that it may
+ * take any action whatsoever.
+ *
+ * @see Thread#run()
+ */
+ public void run() {
+ ContainerDescriptor container = null;
+ try {
+ Pipeline pipeline = storageClient.createContainer(
+ OzoneProtos.ReplicationType.STAND_ALONE,
+ OzoneProtos.ReplicationFactor.ONE,
+ KeyUtil.getContainerName(volume.getUserName(),
+ volume.getVolumeName(), containerIdx));
+
+ container = new ContainerDescriptor(pipeline.getContainerName());
+
+ container.setPipeline(pipeline);
+ container.setContainerIndex(containerIdx);
+ volume.addContainer(container);
+ containerIds.set(containerIdx, container.getContainerID());
+ } catch (Exception e) {
+ numFailed.incrementAndGet();
+ if (container != null) {
+ LOGGER.error("Error creating container Container:{}:" +
+ " index:{} error:{}", container.getContainerID(),
+ containerIdx, e);
+ }
+ }
+ }
+ }
+
+ private boolean createVolumeContainers(VolumeDescriptor volume) {
+ ArrayList<String> containerIds = new ArrayList<>();
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
+ MAX_THREADS, 1, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ AtomicInteger numFailedCreates = new AtomicInteger(0);
+ long allocatedSize = 0;
+ int containerIdx = 0;
+ while (allocatedSize < volume.getVolumeSize()) {
+ // adding null to allocate space in ArrayList
+ containerIds.add(containerIdx, null);
+ Runnable task = new CreateContainerTask(volume, containerIdx,
+ containerIds, numFailedCreates);
+ executor.submit(task);
+ allocatedSize += containerSizeB;
+ containerIdx += 1;
+ }
+
+ // issue the command and then wait for it to finish
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOGGER.error("Error creating volume:{} error:{}",
+ volume.getVolumeName(), e);
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ volume.setContainerIDs(containerIds);
+ return numFailedCreates.get() == 0;
+ }
+
+ private void deleteContainer(String containerID, boolean force) {
+ try {
+ Pipeline pipeline = storageClient.getContainer(containerID);
+ storageClient.deleteContainer(pipeline, force);
+ } catch (Exception e) {
+ LOGGER.error("Error deleting container Container:{} error:{}",
+ containerID, e);
+ }
+ }
+
+ private void deleteVolumeContainers(List<String> containers, boolean force)
+ throws CBlockException {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
+ MAX_THREADS, 1, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ for (String deleteContainer : containers) {
+ if (deleteContainer != null) {
+ Runnable task = () -> deleteContainer(deleteContainer, force);
+ executor.submit(task);
+ }
+ }
+
+ // issue the command and then wait for it to finish
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOGGER.error("Error deleting containers error:{}", e);
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
/**
* Called by CBlock server when creating a fresh volume. The core
@@ -172,31 +299,13 @@ public class StorageManager {
throw new CBlockException("Volume size smaller than block size? " +
"volume size:" + volumeSize + " block size:" + blockSize);
}
- VolumeDescriptor volume;
- int containerIdx = 0;
- try {
- volume = new VolumeDescriptor(userName, volumeName,
- volumeSize, blockSize);
- long allocatedSize = 0;
- ArrayList<String> containerIds = new ArrayList<>();
- while (allocatedSize < volumeSize) {
- Pipeline pipeline = storageClient.createContainer(OzoneProtos
- .ReplicationType.STAND_ALONE,
- OzoneProtos.ReplicationFactor.ONE,
- KeyUtil.getContainerName(userName, volumeName, containerIdx));
- ContainerDescriptor container =
- new ContainerDescriptor(pipeline.getContainerName());
- container.setPipeline(pipeline);
- container.setContainerIndex(containerIdx);
- volume.addContainer(container);
- containerIds.add(container.getContainerID());
- allocatedSize += containerSizeB;
- containerIdx += 1;
- }
- volume.setContainerIDs(containerIds);
- } catch (IOException e) {
- throw new CBlockException("Error when creating volume:" + e.getMessage());
- // TODO : delete already created containers? or re-try policy
+ VolumeDescriptor volume
+ = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
+ boolean success = createVolumeContainers(volume);
+ if (!success) {
+ // cleanup the containers and throw the exception
+ deleteVolumeContainers(volume.getContainerIDsList(), true);
+ throw new CBlockException("Error when creating volume:" + volumeName);
}
makeVolumeReady(userName, volumeName, volume);
}
@@ -223,16 +332,7 @@ public class StorageManager {
throw new CBlockException("Deleting a non-empty volume without force!");
}
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
- for (String containerID : volume.getContainerIDsList()) {
- try {
- Pipeline pipeline = storageClient.getContainer(containerID);
- storageClient.deleteContainer(pipeline, force);
- } catch (IOException e) {
- LOGGER.error("Error deleting container Container:{} error:{}",
- containerID, e);
- throw new CBlockException(e.getMessage());
- }
- }
+ deleteVolumeContainers(volume.getContainerIDsList(), force);
if (user2VolumeMap.get(userName).size() == 0) {
user2VolumeMap.remove(userName);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 4e0028f..0860db8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -898,6 +898,22 @@
</property>
<property>
+ <name>dfs.cblock.manager.pool.size</name>
+ <value>16</value>
+ <description>
+ Number of threads that cblock manager will use for container operations.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.cblock.rpc.timeout.seconds</name>
+ <value>300</value>
+ <description>
+ RPC timeout in seconds used for cblock CLI operations.
+ </description>
+ </property>
+
+ <property>
<name>dfs.cblock.scm.ipaddress</name>
<value>127.0.0.1</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
index 8d9c2a0..8cb57d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
/**
* NOTE : This class is only for testing purpose.
@@ -34,8 +34,8 @@ import java.util.HashMap;
* This is to allow volume creation call and perform standalone tests.
*/
public final class ContainerLookUpService {
- private static HashMap<String, ContainerDescriptor>
- containers = new HashMap<>();
+ private static ConcurrentHashMap<String, ContainerDescriptor>
+ containers = new ConcurrentHashMap<>();
/**
* Return an *existing* container with given Id.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/087c69ba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
index f45ea15..a318876 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This class is the one that directly talks to SCM server.
@@ -37,7 +38,8 @@ import java.util.List;
*
*/
public class MockStorageClient implements ScmClient {
- private static long currentContainerId = -1;
+ private static AtomicInteger currentContainerId =
+ new AtomicInteger(0);
/**
* Ask SCM to get a exclusive container.
@@ -48,9 +50,9 @@ public class MockStorageClient implements ScmClient {
@Override
public Pipeline createContainer(String containerId)
throws IOException {
- currentContainerId += 1;
- ContainerLookUpService.addContainer(Long.toString(currentContainerId));
- return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
+ int contId = currentContainerId.getAndIncrement();
+ ContainerLookUpService.addContainer(Long.toString(contId));
+ return ContainerLookUpService.lookUp(Long.toString(contId))
.getPipeline();
}
@@ -126,10 +128,11 @@ public class MockStorageClient implements ScmClient {
public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId)
throws IOException {
- currentContainerId += 1;
- ContainerLookUpService.addContainer(Long.toString(currentContainerId));
- return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
- .getPipeline(); }
+ int contId = currentContainerId.getAndIncrement();
+ ContainerLookUpService.addContainer(Long.toString(contId));
+ return ContainerLookUpService.lookUp(Long.toString(contId))
+ .getPipeline();
+ }
/**
* Returns a set of Nodes that meet a query criteria.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org