You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/24 06:38:57 UTC
[shardingsphere] branch master updated: Refactor
ParallelRunnerExecutorFactory (#9484)
This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dabb911 Refactor ParallelRunnerExecutorFactory (#9484)
dabb911 is described below
commit dabb91164f74d66170600884d63bc16b09671242
Author: Liang Zhang <te...@163.com>
AuthorDate: Wed Feb 24 14:38:23 2021 +0800
Refactor ParallelRunnerExecutorFactory (#9484)
* Refactor CaseParallelRunnerExecutor
* Code cleanup for ScenarioParallelRunnerExecutor
* Refactor ParallelRunnerExecutorFactory
---
.../parallel/ParallelRunnerExecutorFactory.java | 21 +++---
.../junit/parallel/ParallelRunnerScheduler.java | 6 +-
.../parallel/impl/CaseParallelRunnerExecutor.java | 17 ++---
.../impl/ScenarioParallelRunnerExecutor.java | 79 +++++++---------------
4 files changed, 44 insertions(+), 79 deletions(-)
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
index 3cdb646..479de64 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.test.integration.engine.junit.parallel;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.test.integration.engine.junit.parallel.annotaion.ParallelLevel;
import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.CaseParallelRunnerExecutor;
@@ -31,10 +29,9 @@ import java.util.concurrent.ConcurrentMap;
/**
* Parallel runner executor factory.
*/
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ParallelRunnerExecutorFactory {
- private static final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> EXECUTORS = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
/**
* Get parallel runner executor.
@@ -43,15 +40,15 @@ public final class ParallelRunnerExecutorFactory {
* @param parallelLevel parallel level
* @return parallel runner executor
*/
- public static ParallelRunnerExecutor getExecutor(final DatabaseType databaseType, final ParallelLevel parallelLevel) {
- if (EXECUTORS.containsKey(databaseType)) {
- return EXECUTORS.get(databaseType);
+ public ParallelRunnerExecutor getExecutor(final DatabaseType databaseType, final ParallelLevel parallelLevel) {
+ if (executors.containsKey(databaseType)) {
+ return executors.get(databaseType);
}
- EXECUTORS.putIfAbsent(databaseType, newInstance(parallelLevel));
- return EXECUTORS.get(databaseType);
+ executors.putIfAbsent(databaseType, newInstance(parallelLevel));
+ return executors.get(databaseType);
}
- private static ParallelRunnerExecutor newInstance(final ParallelLevel parallelLevel) {
+ private ParallelRunnerExecutor newInstance(final ParallelLevel parallelLevel) {
switch (parallelLevel) {
case CASE:
return new CaseParallelRunnerExecutor();
@@ -67,7 +64,7 @@ public final class ParallelRunnerExecutorFactory {
*
* @return all executors
*/
- public static Collection<ParallelRunnerExecutor> getAllExecutors() {
- return EXECUTORS.values();
+ public Collection<ParallelRunnerExecutor> getAllExecutors() {
+ return executors.values();
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
index d8c8321..7135665 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
@@ -31,14 +31,16 @@ public final class ParallelRunnerScheduler implements RunnerScheduler {
private final ParallelLevel parallelLevel;
+ private final ParallelRunnerExecutorFactory executorFactory = new ParallelRunnerExecutorFactory();
+
@Override
public void schedule(final Runnable childStatement) {
ParameterizedArray parameterizedArray = new RunnerParameters(childStatement).getParameterizedArray();
- ParallelRunnerExecutorFactory.getExecutor(parameterizedArray.getDatabaseType(), parallelLevel).execute(parameterizedArray, childStatement);
+ executorFactory.getExecutor(parameterizedArray.getDatabaseType(), parallelLevel).execute(parameterizedArray, childStatement);
}
@Override
public void finished() {
- ParallelRunnerExecutorFactory.getAllExecutors().forEach(ParallelRunnerExecutor::finished);
+ executorFactory.getAllExecutors().forEach(ParallelRunnerExecutor::finished);
}
}
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
index eeabd10..8db9b19 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
@@ -17,40 +17,35 @@
package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
+import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* Parallel runner executor with case.
*/
-@RequiredArgsConstructor
public final class CaseParallelRunnerExecutor implements ParallelRunnerExecutor {
private final ExecutorServiceManager executorServiceManager = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
- private final List<Future<?>> caseTaskResults = new LinkedList<>();
+ private final Collection<Future<?>> caseTaskResults = new LinkedList<>();
@Override
public void execute(final ParameterizedArray parameterizedArray, final Runnable childStatement) {
- caseTaskResults.add(executorServiceManager.getExecutorService().submit(() -> childStatement.run()));
+ caseTaskResults.add(executorServiceManager.getExecutorService().submit(childStatement));
}
- @SneakyThrows
@Override
public void finished() {
- caseTaskResults.forEach(future -> {
+ caseTaskResults.forEach(each -> {
try {
- future.get();
- } catch (InterruptedException ignore) {
- } catch (ExecutionException ignore) {
+ each.get();
+ } catch (final InterruptedException | ExecutionException ignored) {
}
});
executorServiceManager.getExecutorService().shutdownNow();
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index 31b950d..a906af9 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -18,16 +18,15 @@
package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.RequiredArgsConstructor;
+import lombok.EqualsAndHashCode;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
import java.io.Closeable;
-import java.io.IOException;
+import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -50,27 +49,26 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
private final Lock lock = new ReentrantLock();
- private final List<Future<?>> scenarioTaskResults = new LinkedList<>();
+ private final Collection<Future<?>> scenarioTaskResults = new LinkedList<>();
@Override
public void execute(final ParameterizedArray parameterizedArray, final Runnable childStatement) {
- scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray.getAdapter(), parameterizedArray.getScenario(), parameterizedArray.getDatabaseType().getName()))
- .submit(childStatement));
+ scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray)).submit(childStatement));
}
private ScenarioExecutorQueue getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
- ScenarioExecutorQueue scenarioExecutorQueue = this.scenarioExecutorQueues.get(scenarioKey);
+ ScenarioExecutorQueue scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
try {
lock.lock();
- scenarioExecutorQueue = this.scenarioExecutorQueues.get(scenarioKey);
+ scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
scenarioExecutorQueue = new ScenarioExecutorQueue(scenarioKey);
- this.scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
+ scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
return scenarioExecutorQueue;
} finally {
lock.unlock();
@@ -82,22 +80,16 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
scenarioTaskResults.forEach(future -> {
try {
future.get();
- } catch (InterruptedException ignore) {
- } catch (ExecutionException ignore) {
- }
- });
- scenarioExecutorQueues.values().forEach(scenarioExecutorQueue -> {
- try {
- scenarioExecutorQueue.close();
- } catch (IOException ignore) {
+ } catch (final InterruptedException | ExecutionException ignored) {
}
});
+ scenarioExecutorQueues.values().forEach(ScenarioExecutorQueue::close);
}
/**
* Scenario key.
*/
- @RequiredArgsConstructor
+ @EqualsAndHashCode
private static final class ScenarioKey {
private final String adapter;
@@ -105,33 +97,13 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
private final String scenario;
private final String databaseTypeName;
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ScenarioKey that = (ScenarioKey) o;
- if (!adapter.equals(that.adapter)) {
- return false;
- }
- if (!scenario.equals(that.scenario)) {
- return false;
- }
- return databaseTypeName.equals(that.databaseTypeName);
- }
-
- @Override
- public int hashCode() {
- int result = adapter.hashCode();
- result = 31 * result + scenario.hashCode();
- result = 31 * result + databaseTypeName.hashCode();
- return result;
+
+ private ScenarioKey(final ParameterizedArray parameterizedArray) {
+ adapter = parameterizedArray.getAdapter();
+ scenario = parameterizedArray.getScenario();
+ databaseTypeName = parameterizedArray.getDatabaseType().getName();
}
-
+
@Override
public String toString() {
return String.join("-", adapter, scenario, databaseTypeName);
@@ -145,22 +117,21 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
private static final class ScenarioExecutorQueue implements Closeable {
private final BlockingQueue<Runnable> executorQueue;
-
+
private final ExecutorService executorService;
-
+
ScenarioExecutorQueue(final ScenarioKey scenarioKey) {
- this.executorQueue = new LinkedBlockingQueue<>();
- this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
- this.executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
+ executorQueue = new LinkedBlockingQueue<>();
+ executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
}
-
+
public Future<?> submit(final Runnable childStatement) {
- return this.executorService.submit(childStatement);
+ return executorService.submit(childStatement);
}
-
+
@Override
- public void close() throws IOException {
- this.executorService.shutdownNow();
+ public void close() {
+ executorService.shutdownNow();
}
}
}