You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by yx...@apache.org on 2022/04/27 00:09:15 UTC

[shardingsphere] branch master updated: Refactor ExecutorEngine (#17133)

This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f4ee8911ec6 Refactor ExecutorEngine (#17133)
f4ee8911ec6 is described below

commit f4ee8911ec615061156e3814ae3ba6242f5f083b
Author: Liang Zhang <zh...@apache.org>
AuthorDate: Wed Apr 27 08:08:58 2022 +0800

    Refactor ExecutorEngine (#17133)
    
    * Refactor SnowflakeKeyGenerateAlgorithmTest
    
    * Refactor ExecutorEngine
    
    * Refactor ExecutorEngine
    
    * For checkstyle
    
    * For checkstyle
    
    * For checkstyle
    
    * Refactor DatabaseDiscoveryEngine
---
 .../algorithm/DatabaseDiscoveryEngine.java         | 12 ++------
 .../DatabaseDiscoveryExecutorCallback.java         | 19 +++++-------
 .../keygen/SnowflakeKeyGenerateAlgorithmTest.java  |  6 ++--
 .../type/CreateIndexStatementSchemaRefresher.java  |  2 +-
 .../infra/executor/kernel/ExecutorEngine.java      | 36 +++++++++++++++++++++-
 .../infra/executor/kernel/ExecutorEngineTest.java  |  2 +-
 .../driver/executor/AbstractBaseExecutorTest.java  |  5 ++-
 .../natived/builder/StoragePrivilegeBuilder.java   | 10 +++---
 .../mode/metadata/MetaDataContextsBuilder.java     |  2 +-
 .../backend/context/BackendExecutorContext.java    |  4 +--
 .../dataset/DataSetEnvironmentManager.java         |  4 ++-
 .../parallel/impl/CaseParallelRunnerExecutor.java  |  4 ++-
 12 files changed, 65 insertions(+), 41 deletions(-)

diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 21c88fdede9..2318d1441b8 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -21,12 +21,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
 import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
-import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
 import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
-import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
 import org.apache.shardingsphere.infra.metadata.schema.QualifiedDatabase;
 import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
 import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
@@ -51,8 +49,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class DatabaseDiscoveryEngine {
     
-    private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
-    
     private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
     
     /**
@@ -63,12 +59,8 @@ public final class DatabaseDiscoveryEngine {
      * @throws SQLException SQL exception
      */
     public void checkEnvironment(final String databaseName, final Map<String, DataSource> dataSourceMap) throws SQLException {
-        ExecutorEngine executorEngine = new ExecutorEngine(Math.min(CPU_CORES * 2, dataSourceMap.isEmpty() ? 1 : dataSourceMap.entrySet().size()));
-        ExecutorDataMap.getValue().put(DatabaseDiscoveryExecutorCallback.DATABASE_NAME, databaseName);
-        Collection<String> result = executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseDiscoveryProviderAlgorithm));
-        if (!result.isEmpty()) {
-            throw new ShardingSphereConfigurationException(String.join(System.lineSeparator(), result));
-        }
+        ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSourceMap.size());
+        executorEngine.execute(createExecutionGroupContext(dataSourceMap), new DatabaseDiscoveryExecutorCallback(databaseName, databaseDiscoveryProviderAlgorithm));
     }
     
     private ExecutionGroupContext<DataSource> createExecutionGroupContext(final Map<String, DataSource> dataSourceMap) {
diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
index 13c543f9c81..89a5394330b 100644
--- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
+++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryExecutorCallback.java
@@ -19,36 +19,31 @@ package org.apache.shardingsphere.dbdiscovery.algorithm;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProviderAlgorithm;
+import org.apache.shardingsphere.infra.config.exception.ShardingSphereConfigurationException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
 import java.util.Map;
 
 @RequiredArgsConstructor
-public final class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, String> {
+public final class DatabaseDiscoveryExecutorCallback implements ExecutorCallback<DataSource, Void> {
     
-    public static final String DATABASE_NAME = "databaseName";
+    private final String databaseName;
     
     private final DatabaseDiscoveryProviderAlgorithm databaseDiscoveryProviderAlgorithm;
     
     @Override
-    public Collection<String> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
-        Collection<String> result = new LinkedList<>();
-        String databaseName = (String) dataMap.get(DATABASE_NAME);
+    public Collection<Void> execute(final Collection<DataSource> inputs, final boolean isTrunkThread, final Map<String, Object> dataMap) {
         for (DataSource each : inputs) {
             try {
                 databaseDiscoveryProviderAlgorithm.checkEnvironment(databaseName, each);
             } catch (final SQLException ex) {
-                result.add(String.format("Error while loading highly available Status with %s", databaseName));
-                // CHECKSTYLE:OFF
-            } catch (final Exception ex) {
-                // CHECKSTYLE:ON
-                result.add(ex.getMessage());
+                throw new ShardingSphereConfigurationException("Check environment error with database `%s`", databaseName);
             }
         }
-        return result;
+        return Collections.emptyList();
     }
 }
diff --git a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
index 56eec40cb47..87b12961e82 100644
--- a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
+++ b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/SnowflakeKeyGenerateAlgorithmTest.java
@@ -50,13 +50,13 @@ public final class SnowflakeKeyGenerateAlgorithmTest {
     
     private static final int DEFAULT_KEY_AMOUNT = 10;
     
-    private SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
+    private final SnowflakeKeyGenerateAlgorithm keyGenerateAlgorithm = new SnowflakeKeyGenerateAlgorithm();
     
     @Test
     public void assertGenerateKeyWithMultipleThreads() throws ExecutionException, InterruptedException {
-        int threadNumber = Runtime.getRuntime().availableProcessors() << 1;
+        int threadNumber = Runtime.getRuntime().availableProcessors() * 2;
         ExecutorService executor = Executors.newFixedThreadPool(threadNumber);
-        int taskNumber = threadNumber << 2;
+        int taskNumber = threadNumber * 4;
         keyGenerateAlgorithm.setProps(new Properties());
         keyGenerateAlgorithm.init();
         Set<Comparable<?>> actual = new HashSet<>(taskNumber, 1);
diff --git a/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java b/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
index 0ae3333d060..70235896a1e 100644
--- a/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
+++ b/shardingsphere-infra/shardingsphere-infra-context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/CreateIndexStatementSchemaRefresher.java
@@ -43,7 +43,7 @@ public final class CreateIndexStatementSchemaRefresher implements MetaDataRefres
     @Override
     public void refresh(final ShardingSphereMetaData metaData, final FederationDatabaseMetaData database, final Map<String, OptimizerPlannerContext> optimizerPlanners,
                         final Collection<String> logicDataSourceNames, final String schemaName, final CreateIndexStatement sqlStatement, final ConfigurationProperties props) throws SQLException {
-        String indexName = null != sqlStatement.getIndex() ? sqlStatement.getIndex().getIndexName().getIdentifier().getValue() 
+        String indexName = null != sqlStatement.getIndex() ? sqlStatement.getIndex().getIndexName().getIdentifier().getValue()
                 : IndexMetaDataUtil.getGeneratedLogicIndexName(sqlStatement.getColumns());
         if (Strings.isNullOrEmpty(indexName)) {
             return;
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
index a7cf3770c20..45cff96196c 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
@@ -41,12 +41,46 @@ import java.util.concurrent.ExecutionException;
 @Getter
 public final class ExecutorEngine implements AutoCloseable {
     
+    private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
+    
     private final ExecutorServiceManager executorServiceManager;
     
-    public ExecutorEngine(final int executorSize) {
+    private ExecutorEngine(final int executorSize) {
         executorServiceManager = new ExecutorServiceManager(executorSize);
     }
     
+    /**
+     * Create executor engine with executor size.
+     *
+     * @param executorSize executor size
+     * @return created executor engine
+     */
+    public static ExecutorEngine createExecutorEngineWithSize(final int executorSize) {
+        return new ExecutorEngine(executorSize);
+    }
+    
+    /**
+     * Create executor engine with CPU and resources.
+     * 
+     * @param resourceCount resource count
+     * @return created executor engine
+     */
+    public static ExecutorEngine createExecutorEngineWithCPUAndResources(final int resourceCount) {
+        int cpuThreadCount = CPU_CORES * 2 - 1;
+        int resourceThreadCount = Math.max(resourceCount, 1);
+        return new ExecutorEngine(Math.min(cpuThreadCount, resourceThreadCount));
+    }
+    
+    /**
+     * Create executor engine with CPU.
+     *
+     * @return created executor engine
+     */
+    public static ExecutorEngine createExecutorEngineWithCPU() {
+        int cpuThreadCount = CPU_CORES * 2 - 1;
+        return new ExecutorEngine(cpuThreadCount);
+    }
+    
     /**
      * Execute.
      *
diff --git a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
index 0630120f9ff..bb1a59c34bf 100644
--- a/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
+++ b/shardingsphere-infra/shardingsphere-infra-executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngineTest.java
@@ -37,7 +37,7 @@ import static org.mockito.Mockito.mock;
 
 public final class ExecutorEngineTest {
     
-    private final ExecutorEngine executorEngine = new ExecutorEngine(10);
+    private final ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithSize(10);
     
     private final CountDownLatch latch = new CountDownLatch(4);
     
diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
index be4e57a8542..137bfd8d705 100644
--- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
+++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/test/java/org/apache/shardingsphere/driver/executor/AbstractBaseExecutorTest.java
@@ -60,7 +60,7 @@ public abstract class AbstractBaseExecutorTest {
     @Before
     public void setUp() throws SQLException {
         SQLExecutorExceptionHandler.setExceptionThrown(true);
-        executorEngine = new ExecutorEngine(Runtime.getRuntime().availableProcessors());
+        executorEngine = ExecutorEngine.createExecutorEngineWithCPU();
         TransactionTypeHolder.set(TransactionType.LOCAL);
         connection = new ShardingSphereConnection(DefaultSchema.LOGIC_NAME, mockContextManager());
     }
@@ -93,8 +93,7 @@ public abstract class AbstractBaseExecutorTest {
     }
     
     private TransactionContexts mockTransactionContexts() {
-        TransactionContexts result = mock(TransactionContexts.class);
-        when(result.getEngines()).thenReturn(mock(Map.class));
+        TransactionContexts result = mock(TransactionContexts.class, RETURNS_DEEP_STUBS);
         when(result.getEngines().get(DefaultSchema.LOGIC_NAME)).thenReturn(new ShardingSphereTransactionManagerEngine());
         return result;
     }
diff --git a/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java b/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
index d3b7ab94a1f..4608abcde3e 100644
--- a/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
+++ b/shardingsphere-kernel/shardingsphere-authority/shardingsphere-authority-core/src/main/java/org/apache/shardingsphere/authority/provider/natived/builder/StoragePrivilegeBuilder.java
@@ -24,6 +24,7 @@ import org.apache.shardingsphere.authority.provider.natived.model.privilege.Nati
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRecognizer;
 import org.apache.shardingsphere.infra.exception.ShardingSphereException;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
@@ -40,7 +41,6 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -51,8 +51,6 @@ import java.util.concurrent.TimeoutException;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class StoragePrivilegeBuilder {
     
-    private static final int CPU_CORES = Runtime.getRuntime().availableProcessors();
-    
     private static final long FUTURE_GET_TIME_OUT_MILLISECONDS = 5000L;
     
     static {
@@ -98,7 +96,8 @@ public final class StoragePrivilegeBuilder {
     
     private static void save(final Collection<DataSource> dataSources,
                              final Collection<ShardingSphereUser> users, final StoragePrivilegeHandler handler) {
-        ExecutorService executorService = Executors.newFixedThreadPool(Math.min(CPU_CORES * 2, dataSources.isEmpty() ? 1 : dataSources.size()));
+        // TODO ExecutorEngine.execute and callback
+        ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
         Collection<Future<?>> tasks = new HashSet<>();
         for (DataSource each : dataSources) {
             tasks.add(executorService.submit(() -> save(each, users, handler)));
@@ -128,7 +127,8 @@ public final class StoragePrivilegeBuilder {
     private static Map<ShardingSphereUser, Collection<NativePrivileges>> load(final Collection<DataSource> dataSources,
                                                                               final Collection<ShardingSphereUser> users, final StoragePrivilegeHandler handler) {
         Map<ShardingSphereUser, Collection<NativePrivileges>> result = new LinkedHashMap<>(users.size(), 1);
-        ExecutorService executorService = Executors.newFixedThreadPool(Math.min(CPU_CORES * 2, dataSources.isEmpty() ? 1 : dataSources.size()));
+        // TODO ExecutorEngine.execute and callback
+        ExecutorService executorService = ExecutorEngine.createExecutorEngineWithCPUAndResources(dataSources.size()).getExecutorServiceManager().getExecutorService();
         Collection<Future<Map<ShardingSphereUser, NativePrivileges>>> futures = new HashSet<>(dataSources.size(), 1);
         for (DataSource each : dataSources) {
             futures.add(executorService.submit(() -> handler.load(users, each)));
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
index 0e07d93e006..c9da32be9d4 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsBuilder.java
@@ -67,7 +67,7 @@ public final class MetaDataContextsBuilder {
     public MetaDataContextsBuilder(final Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
         this.globalRuleConfigs = globalRuleConfigs;
         this.props = new ConfigurationProperties(props);
-        executorEngine = new ExecutorEngine(this.props.<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
+        executorEngine = ExecutorEngine.createExecutorEngineWithSize(this.props.<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
     }
     
     /**
diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
index 9200bb709c2..83dffd90659 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/context/BackendExecutorContext.java
@@ -32,8 +32,8 @@ public final class BackendExecutorContext {
     
     private static final BackendExecutorContext INSTANCE = new BackendExecutorContext();
     
-    private final ExecutorEngine executorEngine =
-            new ExecutorEngine(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
+    private final ExecutorEngine executorEngine = ExecutorEngine.createExecutorEngineWithSize(
+            ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
     
     /**
      * Get executor context instance.
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
index 4aa1a4aabf5..8dc43f8b318 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/env/scenario/dataset/DataSetEnvironmentManager.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
 import org.apache.shardingsphere.infra.expr.InlineExpressionParser;
 import org.apache.shardingsphere.test.integration.cases.dataset.DataSet;
@@ -53,7 +54,8 @@ import java.util.concurrent.Callable;
  */
 public final class DataSetEnvironmentManager {
     
-    private static final ExecutorServiceManager EXECUTOR_SERVICE_MANAGER = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
+    // TODO ExecutorEngine.execute and callback
+    private static final ExecutorServiceManager EXECUTOR_SERVICE_MANAGER = ExecutorEngine.createExecutorEngineWithCPU().getExecutorServiceManager();
     
     private final DataSet dataSet;
     
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
index 259d4699e15..82edbeb27a1 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/framework/runner/parallel/impl/CaseParallelRunnerExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.test.integration.framework.runner.parallel.impl;
 
+import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
 import org.apache.shardingsphere.test.integration.framework.runner.parallel.ParallelRunnerExecutor;
 import org.apache.shardingsphere.test.integration.framework.param.model.ParameterizedArray;
@@ -31,7 +32,8 @@ import java.util.concurrent.Future;
  */
 public final class CaseParallelRunnerExecutor implements ParallelRunnerExecutor {
     
-    private final ExecutorServiceManager executorServiceManager = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
+    // TODO ExecutorEngine.execute and callback
+    private final ExecutorServiceManager executorServiceManager = ExecutorEngine.createExecutorEngineWithCPU().getExecutorServiceManager();
     
     private final Collection<Future<?>> taskFeatures = new LinkedList<>();