You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/03/10 16:40:10 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 28482bf HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)
28482bf is described below
commit 28482bf2b927d53bc74681198b03631bce7a8dee
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Fri Mar 11 00:39:52 2022 +0800
HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 19 +++++++++++
.../client/io/BlockInputStreamFactoryImpl.java | 17 +++++++---
.../client/io/ECBlockInputStreamFactoryImpl.java | 15 ++++++---
.../io/ECBlockReconstructedStripeInputStream.java | 15 +++------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 39 +++++++++++++++++++++-
.../read/TestECBlockReconstructedInputStream.java | 12 ++++++-
.../TestECBlockReconstructedStripeInputStream.java | 12 ++++++-
.../ozone/freon/OzoneClientKeyValidator.java | 2 ++
8 files changed, 109 insertions(+), 22 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 351f69d..63dd511 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -151,6 +151,17 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private long excludeNodesExpiryTime = 10 * 60 * 1000;
+ @Config(key = "ec.reconstruct.stripe.read.pool.limit",
+ defaultValue = "30",
+ description = "Thread pool max size for parallelly read" +
+ " available ec chunks to reconstruct the whole stripe.",
+ tags = ConfigTag.CLIENT)
+ // For the largest recommended EC policy rs-10-4-1024k,
+ // 10 chunks are required at least for stripe reconstruction,
+ // so 1 core thread for each chunk and
+ // 3 concurrent stripe read should be enough.
+ private int ecReconstructStripeReadPoolLimit = 10 * 3;
+
@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
@@ -288,4 +299,12 @@ public class OzoneClientConfig {
ChecksumCombineMode.COMPOSITE_CRC.name());
}
}
+
+ public void setEcReconstructStripeReadPoolLimit(int poolLimit) {
+ this.ecReconstructStripeReadPoolLimit = poolLimit;
+ }
+
+ public int getEcReconstructStripeReadPoolLimit() {
+ return ecReconstructStripeReadPoolLimit;
+ }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 1d46acd..104e5bc 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -31,7 +31,10 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* Factory class to create various BlockStream instances.
@@ -41,17 +44,21 @@ public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
private ECBlockInputStreamFactory ecBlockStreamFactory;
public static BlockInputStreamFactory getInstance(
- ByteBufferPool byteBufferPool) {
- return new BlockInputStreamFactoryImpl(byteBufferPool);
+ ByteBufferPool byteBufferPool,
+ Supplier<ExecutorService> ecReconstructExecutorSupplier) {
+ return new BlockInputStreamFactoryImpl(byteBufferPool,
+ ecReconstructExecutorSupplier);
}
public BlockInputStreamFactoryImpl() {
- this(new ElasticByteBufferPool());
+ this(new ElasticByteBufferPool(), Executors::newSingleThreadExecutor);
}
- public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool) {
+ public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool,
+ Supplier<ExecutorService> ecReconstructExecutorSupplier) {
this.ecBlockStreamFactory =
- ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool);
+ ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool,
+ ecReconstructExecutorSupplier);
}
/**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index cc03e80..470df0c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* Factory class to create various BlockStream instances.
@@ -38,16 +40,21 @@ public final class ECBlockInputStreamFactoryImpl implements
private final BlockInputStreamFactory inputStreamFactory;
private final ByteBufferPool byteBufferPool;
+ private final Supplier<ExecutorService> ecReconstructExecutorSupplier;
public static ECBlockInputStreamFactory getInstance(
- BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool) {
- return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool);
+ BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool,
+ Supplier<ExecutorService> ecReconstructExecutorSupplier) {
+ return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool,
+ ecReconstructExecutorSupplier);
}
private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
- ByteBufferPool byteBufferPool) {
+ ByteBufferPool byteBufferPool,
+ Supplier<ExecutorService> ecReconstructExecutorSupplier) {
this.byteBufferPool = byteBufferPool;
this.inputStreamFactory = streamFactory;
+ this.ecReconstructExecutorSupplier = ecReconstructExecutorSupplier;
}
/**
@@ -77,7 +84,7 @@ public final class ECBlockInputStreamFactoryImpl implements
new ECBlockReconstructedStripeInputStream(
(ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
xceiverFactory, refreshFunction, inputStreamFactory,
- byteBufferPool);
+ byteBufferPool, ecReconstructExecutorSupplier.get());
if (failedLocations != null) {
sis.addFailedDatanodes(failedLocations);
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 30af56d..91c3797 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.client.io;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.client.BlockID;
@@ -48,9 +47,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
/**
@@ -117,23 +114,22 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private ExecutorService executor;
+ @SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
OmKeyLocationInfo blockInfo, boolean verifyChecksum,
- XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
- ByteBufferPool byteBufferPool) {
+ ByteBufferPool byteBufferPool,
+ ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
refreshFunction, streamFactory);
this.byteBufferPool = byteBufferPool;
+ this.executor = ecReconstructExecutor;
decoder = CodecUtil.createRawDecoderWithFallback(repConfig);
// The EC decoder needs an array data+parity long, with missing or not
// needed indexes set to null.
decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
- "ec-reader-for-" + blockInfo.getBlockID() + "-TID-%d").build();
- executor = Executors.newFixedThreadPool(repConfig.getData(), threadFactory);
}
/**
@@ -579,7 +575,6 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
@Override
public synchronized void close() {
super.close();
- executor.shutdownNow();
// Inside this class, we only allocate buffers to read parity into. Data
// is reconstructed or read into a set of buffers passed in from the calling
// class. Therefore we only need to ensure we free the parity buffers here.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 5051b0d..d736e89 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -37,10 +37,14 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -163,6 +167,11 @@ public class RpcClient implements ClientProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(RpcClient.class);
+ // For the minimal recommended EC policy rs-3-2-1024k,
+ // we should have at least 1 core thread for each necessary chunk
+ // for reconstruction.
+ private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+
private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
private final XceiverClientFactory xceiverClientManager;
@@ -181,6 +190,7 @@ public class RpcClient implements ClientProtocol {
private final boolean getLatestVersionLocation;
private final ByteBufferPool byteBufferPool;
private final BlockInputStreamFactory blockInputStreamFactory;
+ private volatile ExecutorService ecReconstructExecutor;
/**
* Creates RpcClient instance with the given configuration.
@@ -295,7 +305,7 @@ public class RpcClient implements ClientProtocol {
}).build();
this.byteBufferPool = new ElasticByteBufferPool();
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
- .getInstance(byteBufferPool);
+ .getInstance(byteBufferPool, this::getECReconstructExecutor);
}
public XceiverClientFactory getXceiverClientManager() {
@@ -1147,6 +1157,10 @@ public class RpcClient implements ClientProtocol {
@Override
public void close() throws IOException {
+ if (ecReconstructExecutor != null) {
+ ecReconstructExecutor.shutdownNow();
+ ecReconstructExecutor = null;
+ }
IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
keyProviderCache.invalidateAll();
keyProviderCache.cleanUp();
@@ -1707,4 +1721,27 @@ public class RpcClient implements ClientProtocol {
.setOwnerName(owner);
return ozoneManagerClient.setBucketOwner(builder.build());
}
+
+ public ExecutorService getECReconstructExecutor() {
+ // local ref to a volatile to ensure access
+ // to a completed initialized object
+ ExecutorService executor = ecReconstructExecutor;
+ if (executor == null) {
+ synchronized (this) {
+ executor = ecReconstructExecutor;
+ if (executor == null) {
+ ecReconstructExecutor = new ThreadPoolExecutor(
+ EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+ clientConfig.getEcReconstructStripeReadPoolLimit(),
+ 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("ec-reconstruct-reader-TID-%d")
+ .build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ executor = ecReconstructExecutor;
+ }
+ }
+ }
+ return executor;
+ }
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index 5d73b94..d625b76 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SplittableRandom;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
@@ -50,6 +53,8 @@ public class TestECBlockReconstructedInputStream {
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGenerator;
private ByteBufferPool bufferPool = new ElasticByteBufferPool();
+ private ExecutorService ecReconstructExecutor =
+ Executors.newFixedThreadPool(3);
@Before
public void setup() throws IOException {
@@ -60,13 +65,18 @@ public class TestECBlockReconstructedInputStream {
dataGenerator = new SplittableRandom(randomSeed);
}
+ @After
+ public void teardown() {
+ ecReconstructExecutor.shutdownNow();
+ }
+
private ECBlockReconstructedStripeInputStream createStripeInputStream(
Map<DatanodeDetails, Integer> dnMap, long blockLength) {
OmKeyLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory, bufferPool);
+ null, null, streamFactory, bufferPool, ecReconstructExecutor);
}
@Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 3381ef5..a5ea3a4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputSt
import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.SplittableRandom;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
@@ -55,6 +58,8 @@ public class TestECBlockReconstructedStripeInputStream {
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGen;
private ByteBufferPool bufferPool = new ElasticByteBufferPool();
+ private ExecutorService ecReconstructExecutor =
+ Executors.newFixedThreadPool(3);
@Before
public void setup() {
@@ -66,6 +71,11 @@ public class TestECBlockReconstructedStripeInputStream {
dataGen = new SplittableRandom(randomSeed);
}
+ @After
+ public void teardown() {
+ ecReconstructExecutor.shutdownNow();
+ }
+
@Test
public void testSufficientLocations() throws IOException {
// One chunk, only 1 location.
@@ -624,7 +634,7 @@ public class TestECBlockReconstructedStripeInputStream {
private ECBlockReconstructedStripeInputStream createInputStream(
OmKeyLocationInfo keyInfo) {
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
- null, null, streamFactory, bufferPool);
+ null, null, streamFactory, bufferPool, ecReconstructExecutor);
}
private List<Integer> indexesToList(int... indexes) {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
index 03de15c..1784ec3 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
@@ -93,6 +93,8 @@ public class OzoneClientKeyValidator extends BaseFreonGenerator
runTests(this::validateKey);
+ rpcClient.close();
+
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org