You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/24 07:11:12 UTC
[shardingsphere] branch master updated: Close useless executor if
put not success (#9485)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ea7e071 Close useless executor if put not success (#9485)
ea7e071 is described below
commit ea7e071f90f7e2a3024256ee93000237e6e0d58a
Author: Liang Zhang <te...@163.com>
AuthorDate: Wed Feb 24 15:10:49 2021 +0800
Close useless executor if put not success (#9485)
* Rename ScenarioExecutorService
* Close useless executor if put not success
---
.../parallel/ParallelRunnerExecutorFactory.java | 9 ++++--
.../impl/ScenarioParallelRunnerExecutor.java | 35 +++++++++++-----------
2 files changed, 24 insertions(+), 20 deletions(-)
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
index 479de64..6d6a234 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
@@ -23,15 +23,15 @@ import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.Cas
import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.ScenarioParallelRunnerExecutor;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
/**
* Parallel runner executor factory.
*/
public final class ParallelRunnerExecutorFactory {
- private final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
+ private final Map<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
/**
* Get parallel runner executor.
@@ -44,7 +44,10 @@ public final class ParallelRunnerExecutorFactory {
if (executors.containsKey(databaseType)) {
return executors.get(databaseType);
}
- executors.putIfAbsent(databaseType, newInstance(parallelLevel));
+ ParallelRunnerExecutor newExecutor = newInstance(parallelLevel);
+ if (null != executors.putIfAbsent(databaseType, newExecutor)) {
+ newExecutor.finished();
+ }
return executors.get(databaseType);
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index a906af9..0541f35 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.EqualsAndHashCode;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.test.integration.engine.param.model.Parameteriz
import java.io.Closeable;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -45,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecutor {
- private final ConcurrentMap<ScenarioKey, ScenarioExecutorQueue> scenarioExecutorQueues = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ScenarioKey, ScenarioExecutorService> scenarioExecutorServices = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
@@ -56,19 +54,19 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray)).submit(childStatement));
}
- private ScenarioExecutorQueue getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
- ScenarioExecutorQueue scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
+ private ScenarioExecutorService getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
+ ScenarioExecutorService scenarioExecutorQueue = scenarioExecutorServices.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
try {
lock.lock();
- scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
+ scenarioExecutorQueue = scenarioExecutorServices.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
- scenarioExecutorQueue = new ScenarioExecutorQueue(scenarioKey);
- scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
+ scenarioExecutorQueue = new ScenarioExecutorService(scenarioKey);
+ scenarioExecutorServices.put(scenarioKey, scenarioExecutorQueue);
return scenarioExecutorQueue;
} finally {
lock.unlock();
@@ -83,7 +81,7 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
} catch (final InterruptedException | ExecutionException ignored) {
}
});
- scenarioExecutorQueues.values().forEach(ScenarioExecutorQueue::close);
+ scenarioExecutorServices.values().forEach(ScenarioExecutorService::close);
}
/**
@@ -111,20 +109,23 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
}
/**
- * Scenario executor queue.
+ * Scenario executor service.
*/
- @Setter
- private static final class ScenarioExecutorQueue implements Closeable {
-
- private final BlockingQueue<Runnable> executorQueue;
+ private static final class ScenarioExecutorService implements Closeable {
private final ExecutorService executorService;
- ScenarioExecutorQueue(final ScenarioKey scenarioKey) {
- executorQueue = new LinkedBlockingQueue<>();
- executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
+ ScenarioExecutorService(final ScenarioKey scenarioKey) {
+ String threadPoolNameFormat = String.join("-", "ScenarioExecutorPool", scenarioKey.toString(), "%d");
+ executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat(threadPoolNameFormat).build());
}
+ /**
+ * Submit task.
+ *
+ * @param childStatement child statement
+ * @return task future
+ */
public Future<?> submit(final Runnable childStatement) {
return executorService.submit(childStatement);
}