You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/03/10 16:40:10 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 28482bf  HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)
28482bf is described below

commit 28482bf2b927d53bc74681198b03631bce7a8dee
Author: Gui Hecheng <ma...@tencent.com>
AuthorDate: Fri Mar 11 00:39:52 2022 +0800

    HDDS-6422. EC: Fix too many idle threads during reconstruct read. (#3168)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 19 +++++++++++
 .../client/io/BlockInputStreamFactoryImpl.java     | 17 +++++++---
 .../client/io/ECBlockInputStreamFactoryImpl.java   | 15 ++++++---
 .../io/ECBlockReconstructedStripeInputStream.java  | 15 +++------
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 39 +++++++++++++++++++++-
 .../read/TestECBlockReconstructedInputStream.java  | 12 ++++++-
 .../TestECBlockReconstructedStripeInputStream.java | 12 ++++++-
 .../ozone/freon/OzoneClientKeyValidator.java       |  2 ++
 8 files changed, 109 insertions(+), 22 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 351f69d..63dd511 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -151,6 +151,17 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private long excludeNodesExpiryTime = 10 * 60 * 1000;
 
+  @Config(key = "ec.reconstruct.stripe.read.pool.limit",
+      defaultValue = "30",
+      description = "Thread pool max size for parallelly read" +
+          " available ec chunks to reconstruct the whole stripe.",
+      tags = ConfigTag.CLIENT)
+  // For the largest recommended EC policy rs-10-4-1024k,
+  // 10 chunks are required at least for stripe reconstruction,
+  // so 1 core thread for each chunk and
+  // 3 concurrent stripe read should be enough.
+  private int ecReconstructStripeReadPoolLimit = 10 * 3;
+
   @Config(key = "checksum.combine.mode",
       defaultValue = "COMPOSITE_CRC",
       description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
@@ -288,4 +299,12 @@ public class OzoneClientConfig {
           ChecksumCombineMode.COMPOSITE_CRC.name());
     }
   }
+
+  public void setEcReconstructStripeReadPoolLimit(int poolLimit) {
+    this.ecReconstructStripeReadPoolLimit = poolLimit;
+  }
+
+  public int getEcReconstructStripeReadPoolLimit() {
+    return ecReconstructStripeReadPoolLimit;
+  }
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index 1d46acd..104e5bc 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -31,7 +31,10 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * Factory class to create various BlockStream instances.
@@ -41,17 +44,21 @@ public class BlockInputStreamFactoryImpl implements BlockInputStreamFactory {
   private ECBlockInputStreamFactory ecBlockStreamFactory;
 
   public static BlockInputStreamFactory getInstance(
-      ByteBufferPool byteBufferPool) {
-    return new BlockInputStreamFactoryImpl(byteBufferPool);
+      ByteBufferPool byteBufferPool,
+      Supplier<ExecutorService> ecReconstructExecutorSupplier) {
+    return new BlockInputStreamFactoryImpl(byteBufferPool,
+        ecReconstructExecutorSupplier);
   }
 
   public BlockInputStreamFactoryImpl() {
-    this(new ElasticByteBufferPool());
+    this(new ElasticByteBufferPool(), Executors::newSingleThreadExecutor);
   }
 
-  public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool) {
+  public BlockInputStreamFactoryImpl(ByteBufferPool byteBufferPool,
+      Supplier<ExecutorService> ecReconstructExecutorSupplier) {
     this.ecBlockStreamFactory =
-        ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool);
+        ECBlockInputStreamFactoryImpl.getInstance(this, byteBufferPool,
+            ecReconstructExecutorSupplier);
   }
 
   /**
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index cc03e80..470df0c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.io.ByteBufferPool;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 /**
  * Factory class to create various BlockStream instances.
@@ -38,16 +40,21 @@ public final class ECBlockInputStreamFactoryImpl implements
 
   private final BlockInputStreamFactory inputStreamFactory;
   private final ByteBufferPool byteBufferPool;
+  private final Supplier<ExecutorService> ecReconstructExecutorSupplier;
 
   public static ECBlockInputStreamFactory getInstance(
-      BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool) {
-    return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool);
+      BlockInputStreamFactory streamFactory, ByteBufferPool byteBufferPool,
+      Supplier<ExecutorService> ecReconstructExecutorSupplier) {
+    return new ECBlockInputStreamFactoryImpl(streamFactory, byteBufferPool,
+        ecReconstructExecutorSupplier);
   }
 
   private ECBlockInputStreamFactoryImpl(BlockInputStreamFactory streamFactory,
-      ByteBufferPool byteBufferPool) {
+      ByteBufferPool byteBufferPool,
+      Supplier<ExecutorService> ecReconstructExecutorSupplier) {
     this.byteBufferPool = byteBufferPool;
     this.inputStreamFactory = streamFactory;
+    this.ecReconstructExecutorSupplier = ecReconstructExecutorSupplier;
   }
 
   /**
@@ -77,7 +84,7 @@ public final class ECBlockInputStreamFactoryImpl implements
           new ECBlockReconstructedStripeInputStream(
               (ECReplicationConfig)repConfig, blockInfo, verifyChecksum,
               xceiverFactory, refreshFunction, inputStreamFactory,
-              byteBufferPool);
+              byteBufferPool, ecReconstructExecutorSupplier.get());
       if (failedLocations != null) {
         sis.addFailedDatanodes(failedLocations);
       }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 30af56d..91c3797 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -48,9 +47,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.function.Function;
 
 /**
@@ -117,23 +114,22 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
 
   private ExecutorService executor;
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
       OmKeyLocationInfo blockInfo, boolean verifyChecksum,
-       XceiverClientFactory xceiverClientFactory, Function<BlockID,
+      XceiverClientFactory xceiverClientFactory, Function<BlockID,
       Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
-      ByteBufferPool byteBufferPool) {
+      ByteBufferPool byteBufferPool,
+      ExecutorService ecReconstructExecutor) {
     super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
         refreshFunction, streamFactory);
     this.byteBufferPool = byteBufferPool;
+    this.executor = ecReconstructExecutor;
     decoder = CodecUtil.createRawDecoderWithFallback(repConfig);
 
     // The EC decoder needs an array data+parity long, with missing or not
     // needed indexes set to null.
     decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-
-    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
-        "ec-reader-for-" + blockInfo.getBlockID() + "-TID-%d").build();
-    executor = Executors.newFixedThreadPool(repConfig.getData(), threadFactory);
   }
 
   /**
@@ -579,7 +575,6 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   @Override
   public synchronized void close() {
     super.close();
-    executor.shutdownNow();
     // Inside this class, we only allocate buffers to read parity into. Data
     // is reconstructed or read into a set of buffers passed in from the calling
     // class. Therefore we only need to ensure we free the parity buffers here.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 5051b0d..d736e89 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -37,10 +37,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoOutputStream;
 import org.apache.hadoop.crypto.key.KeyProvider;
@@ -163,6 +167,11 @@ public class RpcClient implements ClientProtocol {
   private static final Logger LOG =
       LoggerFactory.getLogger(RpcClient.class);
 
+  // For the minimal recommended EC policy rs-3-2-1024k,
+  // we should have at least 1 core thread for each necessary chunk
+  // for reconstruction.
+  private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
+
   private final ConfigurationSource conf;
   private final OzoneManagerClientProtocol ozoneManagerClient;
   private final XceiverClientFactory xceiverClientManager;
@@ -181,6 +190,7 @@ public class RpcClient implements ClientProtocol {
   private final boolean getLatestVersionLocation;
   private final ByteBufferPool byteBufferPool;
   private final BlockInputStreamFactory blockInputStreamFactory;
+  private volatile ExecutorService ecReconstructExecutor;
 
   /**
    * Creates RpcClient instance with the given configuration.
@@ -295,7 +305,7 @@ public class RpcClient implements ClientProtocol {
         }).build();
     this.byteBufferPool = new ElasticByteBufferPool();
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
-        .getInstance(byteBufferPool);
+        .getInstance(byteBufferPool, this::getECReconstructExecutor);
   }
 
   public XceiverClientFactory getXceiverClientManager() {
@@ -1147,6 +1157,10 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   public void close() throws IOException {
+    if (ecReconstructExecutor != null) {
+      ecReconstructExecutor.shutdownNow();
+      ecReconstructExecutor = null;
+    }
     IOUtils.cleanupWithLogger(LOG, ozoneManagerClient, xceiverClientManager);
     keyProviderCache.invalidateAll();
     keyProviderCache.cleanUp();
@@ -1707,4 +1721,27 @@ public class RpcClient implements ClientProtocol {
         .setOwnerName(owner);
     return ozoneManagerClient.setBucketOwner(builder.build());
   }
+
+  public ExecutorService getECReconstructExecutor() {
+    // local ref to a volatile to ensure access
+    // to a completed initialized object
+    ExecutorService executor = ecReconstructExecutor;
+    if (executor == null) {
+      synchronized (this) {
+        executor = ecReconstructExecutor;
+        if (executor == null) {
+          ecReconstructExecutor = new ThreadPoolExecutor(
+              EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
+              clientConfig.getEcReconstructStripeReadPoolLimit(),
+              60, TimeUnit.SECONDS, new SynchronousQueue<>(),
+              new ThreadFactoryBuilder()
+                  .setNameFormat("ec-reconstruct-reader-TID-%d")
+                  .build(),
+              new ThreadPoolExecutor.CallerRunsPolicy());
+          executor = ecReconstructExecutor;
+        }
+      }
+    }
+    return executor;
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index 5d73b94..d625b76 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.SplittableRandom;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
@@ -50,6 +53,8 @@ public class TestECBlockReconstructedInputStream {
   private ThreadLocalRandom random = ThreadLocalRandom.current();
   private SplittableRandom dataGenerator;
   private ByteBufferPool bufferPool = new ElasticByteBufferPool();
+  private ExecutorService ecReconstructExecutor =
+      Executors.newFixedThreadPool(3);
 
   @Before
   public void setup() throws IOException {
@@ -60,13 +65,18 @@ public class TestECBlockReconstructedInputStream {
     dataGenerator = new SplittableRandom(randomSeed);
   }
 
+  @After
+  public void teardown() {
+    ecReconstructExecutor.shutdownNow();
+  }
+
   private ECBlockReconstructedStripeInputStream createStripeInputStream(
       Map<DatanodeDetails, Integer> dnMap, long blockLength) {
     OmKeyLocationInfo keyInfo =
         ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
     streamFactory.setCurrentPipeline(keyInfo.getPipeline());
     return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-        null, null, streamFactory, bufferPool);
+        null, null, streamFactory, bufferPool, ecReconstructExecutor);
   }
 
   @Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 3381ef5..a5ea3a4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputSt
 import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.SplittableRandom;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
@@ -55,6 +58,8 @@ public class TestECBlockReconstructedStripeInputStream {
   private ThreadLocalRandom random = ThreadLocalRandom.current();
   private SplittableRandom dataGen;
   private ByteBufferPool bufferPool = new ElasticByteBufferPool();
+  private ExecutorService ecReconstructExecutor =
+      Executors.newFixedThreadPool(3);
 
   @Before
   public void setup() {
@@ -66,6 +71,11 @@ public class TestECBlockReconstructedStripeInputStream {
     dataGen = new SplittableRandom(randomSeed);
   }
 
+  @After
+  public void teardown() {
+    ecReconstructExecutor.shutdownNow();
+  }
+
   @Test
   public void testSufficientLocations() throws IOException {
     // One chunk, only 1 location.
@@ -624,7 +634,7 @@ public class TestECBlockReconstructedStripeInputStream {
   private ECBlockReconstructedStripeInputStream createInputStream(
       OmKeyLocationInfo keyInfo) {
     return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
-        null, null, streamFactory, bufferPool);
+        null, null, streamFactory, bufferPool, ecReconstructExecutor);
   }
 
   private List<Integer> indexesToList(int... indexes) {
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
index 03de15c..1784ec3 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OzoneClientKeyValidator.java
@@ -93,6 +93,8 @@ public class OzoneClientKeyValidator extends BaseFreonGenerator
 
     runTests(this::validateKey);
 
+    rpcClient.close();
+
     return null;
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org