You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/04/27 00:09:15 UTC
[shardingsphere] branch master updated: Refactor ExecutorEngine (#17133)
This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f4ee8911ec6 Refactor ExecutorEngine (#17133)
f4ee8911ec6 is described below
commit f4ee8911ec615061156e3814ae3ba6242f5f083b
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Wed Apr 27 08:08:58 2022 +0800
Refactor ExecutorEngine (#17133)
* Refactor SnowflakeKeyGenerateAlgorithmTest
* Refactor ExecutorEngine
* Refactor ExecutorEngine
* For checkstyle
* For checkstyle
* For checkstyle
* Refactor DatabaseDiscoveryEngine
---
.../algorithm/DatabaseDiscoveryEngine.java | 12 ++------
.../DatabaseDiscoveryExecutorCallback.java | 19 +++++-------
.../keygen/SnowflakeKeyGenerateAlgorithmTest.java | 6 ++--
.../type/CreateIndexStatementSchemaRefresher.java | 2 +-
.../infra/executor/kernel/ExecutorEngine.java | 36 +++++++++++++++++++++-
.../infra/executor/kernel/ExecutorEngineTest.java | 2 +-
.../driver/executor/AbstractBaseExecutorTest.java | 5 ++-
.../natived/builder/StoragePrivilegeBuilder.java | 10 +++---
.../mode/metadata/MetaDataContextsBuilder.java | 2 +-
.../backend/context/BackendExecutorContext.java | 4 +--
.../dataset/DataSetEnvironmentManager.java | 4 ++-
.../parallel/impl/CaseParallelRunnerExecutor.java | 4 ++-
12 files changed, 65 insertions(+), 41 deletions(-)
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 21c88fdede9..2318d1441b8 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -21,12 +21,10 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
-import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.infra.metadata.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
@@ -51,8 +49,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class DatabaseDiscoveryEngine {
- private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
-
private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
/**
@@ -63,12 +59,8 @@ public final class DatabaseDiscoveryEngine {
* @throws SQLException SQL exception
*/
public void checkEnvironment(final String databaseName, final Map<String, DataSource> dataSourceMap) throws SQLException {
- ExecutorEngine executorEngine = new ExecutorEngine(Math.min(CPU_CORES * 2, dataSourceMap.isEmpty() ? 1 : dataSourceMap.entrySet().size()));
- ExecutorDataMap.getValue().put(DatabaseDiscoveryExecutorCallback.DATABASE_NAME, databaseName);
- Collection<String> result = executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseDiscoveryProviderAlgorithm));
- if (!result.isEmpty()) {
- throw new ShardingSphereConfigurationException(String.join(System.lineSeparator(), result));
- }
+ ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSourceMap.size());
+ executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseName, databaseDiscoveryProviderAlgorithm));
}
private ExecutionGroupContext<DataSource> createExecutionGroupContext(final Map<String, DataSource> dataSourceMap) {
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
index 13c543f9c81..89a5394330b 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
@@ -19,36 +19,31 @@ package org.apache.shardingsphere.dbdiscovery.algorithm;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
+import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.Map;
@RequiredArgsConstructor
-public final class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, String> {
+public final class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, Void> {
- public static final String DATABASE_NAME = "databaseName";
+ private final String databaseName;
private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
@Override
- public Collection<String> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
- Collection<String> result = new LinkedList<>();
- String databaseName = (String) dataMap.get(DATABASE_NAME);
+ public Collection<Void> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) {
for (DataSource each : inputs) {
try {
databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, each);
} catch (final SQLException ex) {
- result.add(String.format("Error while loading highly available Status with %s", databaseName));
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- result.add(ex.getMessage());
+ throw new ShardingSphereConfigurationException("Check environment error with database `%s`", databaseName);
}
}
- return result;
+ return Collections.emptyList();
}
}
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
index 56eec40cb47..87b12961e82 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
@@ -50,13 +50,13 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
private static final int DEFAULT_KEY_AMOUNT = 10;
- private SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
+ private final SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
@Test
public void assertGenerateKeyWithMultipleThreads() throws ExecutionException, InterruptedException {
- int threadNumber = Runtime.getRuntime().availableProcessors() << 1;
+ int threadNumber = Runtime.getRuntime().availableProcessors() * 2;
ExecutorService executor = Executors.newFixedThreadPool(threadNumber);
- int taskNumber = threadNumber << 2;
+ int taskNumber = threadNumber * 4;
keyGenerateAlgorithm.setProps(new Properties());
keyGenerateAlgorithm.init();
Set<Comparable<?>> actual = new HashSet<>(taskNumber, 1);
diff --git a/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java b/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
index 0ae3333d060..70235896a1e 100644
--- a/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
+++ b/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
@@ -43,7 +43,7 @@ public final class CreateIndexStatementSchemaRefresher implements MetaDataRefres
@Override
public void refresh(final ShardingSphereMetaData metaData, final FederationDatabaseMetaData database, final Map<String, OptimizerPlannerContext> optimizerPlanners,
final Collection<String> logicDataSourceNames, final String schemaName, final CreateIndexStatement sqlStatement, final ConfigurationProperties props) throws SQLException {
- String indexName = null != sqlStatement.getIndex() ? sqlStatement.getIndex().getIndexName().getIdentifier().getValue()
+ String indexName = null != sqlStatement.getIndex() ? sqlStatement.getIndex().getIndexName().getIdentifier().getValue()
: IndexMetaDataUtil.getGeneratedLogicIndexName(sqlStatement.getColumns());
if (Strings.isNullOrEmpty(indexName)) {
return;
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
index a7cf3770c20..45cff96196c 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
@@ -41,12 +41,46 @@ import java.util.concurrent.ExecutionException;
@Getter
public final class ExecutorEngine implements AutoCloseable {
+ private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
+
private final ExecutorServiceManager executorServiceManager;
- public ExecutorEngine(final int executorSize) {
+ private ExecutorEngine(final int executorSize) {
executorServiceManager = new ExecutorServiceManager(executorSize);
}
+ /**
+ * Create executor engine with executor size.
+ *
+ * @param executorSize executor size
+ * @return created executor engine
+ */
+ public static ExecutorEngine createExecutorEngineWithSize(final int executorSize) {
+ return new ExecutorEngine(executorSize);
+ }
+
+ /**
+ * Create executor engine with CPU and resources.
+ *
+ * @param resourceCount resource count
+ * @return created executor engine
+ */
+ public static ExecutorEngine createExecutorEngineWithCPUAndResources(final int resourceCount) {
+ int cpuThreadCount = CPU_CORES * 2 - 1;
+ int resourceThreadCount = Math.max(resourceCount, 1);
+ return new ExecutorEngine(Math.min(cpuThreadCount, resourceThreadCount));
+ }
+
+ /**
+ * Create executor engine with CPU.
+ *
+ * @return created executor engine
+ */
+ public static ExecutorEngine createExecutorEngineWithCPU() {
+ int cpuThreadCount = CPU_CORES * 2 - 1;
+ return new ExecutorEngine(cpuThreadCount);
+ }
+
/**
* Execute.
*
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
index 0630120f9ff..bb1a59c34bf 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.mock;
public final class ExecutorEngineTest {
- private final ExecutorEngine executorEngine = new ExecutorEngine(10);
+ private final ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithSize(10);
private final CountDownLatch latch = new CountDownLatch(4);
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
index be4e57a8542..137bfd8d705 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
@@ -60,7 +60,7 @@ public abstract class AbstractBaseExecutorTest {
@Before
public void setUp() throws SQLException {
SQLExecutorExceptionHandler.setExceptionThrown(true);
- executorEngine = new ExecutorEngine(Runtime.getRuntime().availableProcessors());
+ executorEngine = ExecutorEngine.createExecutorEngineWithCPU();
TransactionTypeHolder.set(TransactionType.LOCAL);
connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mockContextManager());
}
@@ -93,8 +93,7 @@ public abstract class AbstractBaseExecutorTest {
}
private TransactionContexts mockTransactionContexts() {
- TransactionContexts result = mock(TransactionContexts.class);
- when(result.getEngines()).thenReturn(mock(Map.class));
+ TransactionContexts result = mock(TransactionContexts.class, RETURNS_DEEP_STUBS);
when(result.getEngines().get(DefaultSchema.LOGIC_NAME)).thenReturn(new ShardingSphereTransactionManagerEngine());
return result;
}
diff --git a/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java b/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
index d3b7ab94a1f..4608abcde3e 100644
--- a/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.authority.provider.natived.model.privilege.Nati
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRecognizer;
import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
@@ -40,7 +41,6 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -51,8 +51,6 @@ import java.util.concurrent.TimeoutException;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class StoragePrivilegeBuilder {
- private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
-
private static final long FUTURE_GET_TIME_OUT_MILLISECONDS = 5000L;
static {
@@ -98,7 +96,8 @@ public final class StoragePrivilegeBuilder {
private static void save(final Collection<DataSource> dataSources,
final Collection<ShardingSphereUser> users, final StoragePrivilegeHandler handler) {
- ExecutorService executorService = Executors.newFixedThreadPool(Math.min(CPU_CORES * 2, dataSources.isEmpty() ? 1 : dataSources.size()));
+ // TODO ExecutorEngine.execute and callback
+ ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
Collection<Future<?>> tasks = new HashSet<>();
for (DataSource each : dataSources) {
tasks.add(executorService.submit(() -> save(each, users, handler)));
@@ -128,7 +127,8 @@ public final class StoragePrivilegeBuilder {
private static Map<ShardingSphereUser, Collection<NativePrivileges>> load(final Collection<DataSource> dataSources,
final Collection<ShardingSphereUser> users, final StoragePrivilegeHandler handler) {
Map<ShardingSphereUser, Collection<NativePrivileges>> result = new LinkedHashMap<>(users.size(), 1);
- ExecutorService executorService = Executors.newFixedThreadPool(Math.min(CPU_CORES * 2, dataSources.isEmpty() ? 1 : dataSources.size()));
+ // TODO ExecutorEngine.execute and callback
+ ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
Collection<Future<Map<ShardingSphereUser, NativePrivileges>>> futures = new HashSet<>(dataSources.size(), 1);
for (DataSource each : dataSources) {
futures.add(executorService.submit(() -> handler.load(users, each)));
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
index 0e07d93e006..c9da32be9d4 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
@@ -67,7 +67,7 @@ public final class MetaDataContextsBuilder {
public MetaDataContextsBuilder(final Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
this.globalRuleConfigs = globalRuleConfigs;
this.props = new ConfigurationProperties(props);
- executorEngine = new ExecutorEngine(this.props.<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
+ executorEngine = ExecutorEngine.createExecutorEngineWithSize(this.props.<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
}
/**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
index 9200bb709c2..83dffd90659 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
@@ -32,8 +32,8 @@ public final class BackendExecutorContext {
private static final BackendExecutorContext INSTANCE = new BackendExecutorContext();
- private final ExecutorEngine executorEngine =
- new ExecutorEngine(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
+ private final ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithSize(
+ ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
/**
* Get executor context instance.
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
index 4aa1a4aabf5..8dc43f8b318 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
import org.apache.shardingsphere.infra.expr.InlineExpressionParser;
import org.apache.shardingsphere.test.integration.cases.dataset.DataSet;
@@ -53,7 +54,8 @@ import java.util.concurrent.Callable;
*/
public final class DataSetEnvironmentManager {
- private static final ExecutorServiceManager EXECUTOR_SERVICE_MANAGER = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
+ // TODO ExecutorEngine.execute and callback
+ private static final ExecutorServiceManager EXECUTOR_SERVICE_MANAGER = ExecutorEngine.createExecutorEngineWithCPU().getExecutorServiceManager();
private final DataSet dataSet;
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
index 259d4699e15..82edbeb27a1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.test.integration.framework.runner.parallel.impl;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
import org.apache.shardingsphere.test.integration.framework.runner.parallel.ParallelRunnerExecutor;
import org.apache.shardingsphere.test.integration.framework.param.model.ParameterizedArray;
@@ -31,7 +32,8 @@ import java.util.concurrent.Future;
*/
public final class CaseParallelRunnerExecutor implements ParallelRunnerExecutor {
- private final ExecutorServiceManager executorServiceManager = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
+ // TODO ExecutorEngine.execute and callback
+ private final ExecutorServiceManager executorServiceManager = ExecutorEngine.createExecutorEngineWithCPU().getExecutorServiceManager();
private final Collection<Future<?>> taskFeatures = new LinkedList<>();