You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/24 07:11:12 UTC

[shardingsphere] branch master updated: Close useless executor if put not success (#9485)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ea7e071  Close useless executor if put not success (#9485)
ea7e071 is described below

commit ea7e071f90f7e2a3024256ee93000237e6e0d58a
Author: Liang Zhang <te...@163.com>
AuthorDate: Wed Feb 24 15:10:49 2021 +0800

    Close useless executor if put not success (#9485)
    
    * Rename ScenarioExecutorService
    
    * Close useless executor if put not success
---
 .../parallel/ParallelRunnerExecutorFactory.java    |  9 ++++--
 .../impl/ScenarioParallelRunnerExecutor.java       | 35 +++++++++++-----------
 2 files changed, 24 insertions(+), 20 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
index 479de64..6d6a234 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
@@ -23,15 +23,15 @@ import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.Cas
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.ScenarioParallelRunnerExecutor;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 /**
  * Parallel runner executor factory.
  */
 public final class ParallelRunnerExecutorFactory {
     
-    private final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
+    private final Map<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
     
     /**
      * Get parallel runner executor.
@@ -44,7 +44,10 @@ public final class ParallelRunnerExecutorFactory {
         if (executors.containsKey(databaseType)) {
             return executors.get(databaseType);
         }
-        executors.putIfAbsent(databaseType, newInstance(parallelLevel));
+        ParallelRunnerExecutor newExecutor = newInstance(parallelLevel);
+        if (null != executors.putIfAbsent(databaseType, newExecutor)) {
+            newExecutor.finished();
+        }
         return executors.get(databaseType);
     }
     
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index a906af9..0541f35 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import lombok.EqualsAndHashCode;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
 import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
@@ -27,7 +26,6 @@ import org.apache.shardingsphere.test.integration.engine.param.model.Parameteriz
 import java.io.Closeable;
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -45,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
 @Slf4j
 public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecutor {
     
-    private final ConcurrentMap<ScenarioKey, ScenarioExecutorQueue> scenarioExecutorQueues = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ScenarioKey, ScenarioExecutorService> scenarioExecutorServices = new ConcurrentHashMap<>();
     
     private final Lock lock = new ReentrantLock();
     
@@ -56,19 +54,19 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
         scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray)).submit(childStatement));
     }
     
-    private ScenarioExecutorQueue getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
-        ScenarioExecutorQueue scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
+    private ScenarioExecutorService getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
+        ScenarioExecutorService scenarioExecutorQueue = scenarioExecutorServices.get(scenarioKey);
         if (null != scenarioExecutorQueue) {
             return scenarioExecutorQueue;
         }
         try {
             lock.lock();
-            scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
+            scenarioExecutorQueue = scenarioExecutorServices.get(scenarioKey);
             if (null != scenarioExecutorQueue) {
                 return scenarioExecutorQueue;
             }
-            scenarioExecutorQueue = new ScenarioExecutorQueue(scenarioKey);
-            scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
+            scenarioExecutorQueue = new ScenarioExecutorService(scenarioKey);
+            scenarioExecutorServices.put(scenarioKey, scenarioExecutorQueue);
             return scenarioExecutorQueue;
         } finally {
             lock.unlock();
@@ -83,7 +81,7 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
             } catch (final InterruptedException | ExecutionException ignored) {
             }
         });
-        scenarioExecutorQueues.values().forEach(ScenarioExecutorQueue::close);
+        scenarioExecutorServices.values().forEach(ScenarioExecutorService::close);
     }
     
     /**
@@ -111,20 +109,23 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
     }
     
     /**
-     * Scenario executor queue.
+     * Scenario executor service.
      */
-    @Setter
-    private static final class ScenarioExecutorQueue implements Closeable {
-        
-        private final BlockingQueue<Runnable> executorQueue;
+    private static final class ScenarioExecutorService implements Closeable {
         
         private final ExecutorService executorService;
         
-        ScenarioExecutorQueue(final ScenarioKey scenarioKey) {
-            executorQueue = new LinkedBlockingQueue<>();
-            executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
+        ScenarioExecutorService(final ScenarioKey scenarioKey) {
+            String threadPoolNameFormat = String.join("-", "ScenarioExecutorPool", scenarioKey.toString(), "%d");
+            executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat(threadPoolNameFormat).build());
         }
         
+        /**
+         * Submit task.
+         * 
+         * @param childStatement child statement
+         * @return task future
+         */
         public Future<?> submit(final Runnable childStatement) {
             return executorService.submit(childStatement);
         }