You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by me...@apache.org on 2021/02/24 06:38:57 UTC

[shardingsphere] branch master updated: Refactor ParallelRunnerExecutorFactory (#9484)

This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dabb911  Refactor ParallelRunnerExecutorFactory (#9484)
dabb911 is described below

commit dabb91164f74d66170600884d63bc16b09671242
Author: Liang Zhang <te...@163.com>
AuthorDate: Wed Feb 24 14:38:23 2021 +0800

    Refactor ParallelRunnerExecutorFactory (#9484)
    
    * Refactor CaseParallelRunnerExecutor
    
    * Code cleanup for ScenarioParallelRunnerExecutor
    
    * Refactor ParallelRunnerExecutorFactory
---
 .../parallel/ParallelRunnerExecutorFactory.java    | 21 +++---
 .../junit/parallel/ParallelRunnerScheduler.java    |  6 +-
 .../parallel/impl/CaseParallelRunnerExecutor.java  | 17 ++---
 .../impl/ScenarioParallelRunnerExecutor.java       | 79 +++++++---------------
 4 files changed, 44 insertions(+), 79 deletions(-)

diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
index 3cdb646..479de64 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.shardingsphere.test.integration.engine.junit.parallel;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.annotaion.ParallelLevel;
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.CaseParallelRunnerExecutor;
@@ -31,10 +29,9 @@ import java.util.concurrent.ConcurrentMap;
 /**
  * Parallel runner executor factory.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class ParallelRunnerExecutorFactory {
     
-    private static final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> EXECUTORS = new ConcurrentHashMap<>();
+    private final ConcurrentMap<DatabaseType, ParallelRunnerExecutor> executors = new ConcurrentHashMap<>();
     
     /**
      * Get parallel runner executor.
@@ -43,15 +40,15 @@ public final class ParallelRunnerExecutorFactory {
      * @param parallelLevel parallel level
      * @return parallel runner executor
      */
-    public static ParallelRunnerExecutor getExecutor(final DatabaseType databaseType, final ParallelLevel parallelLevel) {
-        if (EXECUTORS.containsKey(databaseType)) {
-            return EXECUTORS.get(databaseType);
+    public ParallelRunnerExecutor getExecutor(final DatabaseType databaseType, final ParallelLevel parallelLevel) {
+        if (executors.containsKey(databaseType)) {
+            return executors.get(databaseType);
         }
-        EXECUTORS.putIfAbsent(databaseType, newInstance(parallelLevel));
-        return EXECUTORS.get(databaseType);
+        executors.putIfAbsent(databaseType, newInstance(parallelLevel));
+        return executors.get(databaseType);
     }
     
-    private static ParallelRunnerExecutor newInstance(final ParallelLevel parallelLevel) {
+    private ParallelRunnerExecutor newInstance(final ParallelLevel parallelLevel) {
         switch (parallelLevel) {
             case CASE:
                 return new CaseParallelRunnerExecutor();
@@ -67,7 +64,7 @@ public final class ParallelRunnerExecutorFactory {
      * 
      * @return all executors
      */
-    public static Collection<ParallelRunnerExecutor> getAllExecutors() {
-        return EXECUTORS.values();
+    public Collection<ParallelRunnerExecutor> getAllExecutors() {
+        return executors.values();
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
index d8c8321..7135665 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerScheduler.java
@@ -31,14 +31,16 @@ public final class ParallelRunnerScheduler implements RunnerScheduler {
     
     private final ParallelLevel parallelLevel;
     
+    private final ParallelRunnerExecutorFactory executorFactory = new ParallelRunnerExecutorFactory();
+    
     @Override
     public void schedule(final Runnable childStatement) {
         ParameterizedArray parameterizedArray = new RunnerParameters(childStatement).getParameterizedArray();
-        ParallelRunnerExecutorFactory.getExecutor(parameterizedArray.getDatabaseType(), parallelLevel).execute(parameterizedArray, childStatement);
+        executorFactory.getExecutor(parameterizedArray.getDatabaseType(), parallelLevel).execute(parameterizedArray, childStatement);
     }
     
     @Override
     public void finished() {
-        ParallelRunnerExecutorFactory.getAllExecutors().forEach(ParallelRunnerExecutor::finished);
+        executorFactory.getAllExecutors().forEach(ParallelRunnerExecutor::finished);
     }
 }
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
index eeabd10..8db9b19 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/CaseParallelRunnerExecutor.java
@@ -17,40 +17,35 @@
 
 package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
 
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
 import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
 import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
 
+import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 /**
  * Parallel runner executor with case.
  */
-@RequiredArgsConstructor
 public final class CaseParallelRunnerExecutor implements ParallelRunnerExecutor {
     
     private final ExecutorServiceManager executorServiceManager = new ExecutorServiceManager(Runtime.getRuntime().availableProcessors() * 2 - 1);
     
-    private final List<Future<?>> caseTaskResults = new LinkedList<>();
+    private final Collection<Future<?>> caseTaskResults = new LinkedList<>();
     
     @Override
     public void execute(final ParameterizedArray parameterizedArray, final Runnable childStatement) {
-        caseTaskResults.add(executorServiceManager.getExecutorService().submit(() -> childStatement.run()));
+        caseTaskResults.add(executorServiceManager.getExecutorService().submit(childStatement));
     }
     
-    @SneakyThrows
     @Override
     public void finished() {
-        caseTaskResults.forEach(future -> {
+        caseTaskResults.forEach(each -> {
             try {
-                future.get();
-            } catch (InterruptedException ignore) {
-            } catch (ExecutionException ignore) {
+                each.get();
+            } catch (final InterruptedException | ExecutionException ignored) {
             }
         });
         executorServiceManager.getExecutorService().shutdownNow();
diff --git a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index 31b950d..a906af9 100644
--- a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++ b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -18,16 +18,15 @@
 package org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.RequiredArgsConstructor;
+import lombok.EqualsAndHashCode;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
 import org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
 
 import java.io.Closeable;
-import java.io.IOException;
+import java.util.Collection;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -50,27 +49,26 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
     
     private final Lock lock = new ReentrantLock();
     
-    private final List<Future<?>> scenarioTaskResults = new LinkedList<>();
+    private final Collection<Future<?>> scenarioTaskResults = new LinkedList<>();
     
     @Override
     public void execute(final ParameterizedArray parameterizedArray, final Runnable childStatement) {
-        scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray.getAdapter(), parameterizedArray.getScenario(), parameterizedArray.getDatabaseType().getName()))
-                .submit(childStatement));
+        scenarioTaskResults.add(getScenarioExecutorQueue(new ScenarioKey(parameterizedArray)).submit(childStatement));
     }
     
     private ScenarioExecutorQueue getScenarioExecutorQueue(final ScenarioKey scenarioKey) {
-        ScenarioExecutorQueue scenarioExecutorQueue = this.scenarioExecutorQueues.get(scenarioKey);
+        ScenarioExecutorQueue scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
         if (null != scenarioExecutorQueue) {
             return scenarioExecutorQueue;
         }
         try {
             lock.lock();
-            scenarioExecutorQueue = this.scenarioExecutorQueues.get(scenarioKey);
+            scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
             if (null != scenarioExecutorQueue) {
                 return scenarioExecutorQueue;
             }
             scenarioExecutorQueue = new ScenarioExecutorQueue(scenarioKey);
-            this.scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
+            scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
             return scenarioExecutorQueue;
         } finally {
             lock.unlock();
@@ -82,22 +80,16 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
         scenarioTaskResults.forEach(future -> {
             try {
                 future.get();
-            } catch (InterruptedException ignore) {
-            } catch (ExecutionException ignore) {
-            }
-        });
-        scenarioExecutorQueues.values().forEach(scenarioExecutorQueue -> {
-            try {
-                scenarioExecutorQueue.close();
-            } catch (IOException ignore) {
+            } catch (final InterruptedException | ExecutionException ignored) {
             }
         });
+        scenarioExecutorQueues.values().forEach(ScenarioExecutorQueue::close);
     }
     
     /**
      * Scenario key.
      */
-    @RequiredArgsConstructor
+    @EqualsAndHashCode
     private static final class ScenarioKey {
         
         private final String adapter;
@@ -105,33 +97,13 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
         private final String scenario;
         
         private final String databaseTypeName;
-    
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ScenarioKey that = (ScenarioKey) o;
-            if (!adapter.equals(that.adapter)) {
-                return false;
-            }
-            if (!scenario.equals(that.scenario)) {
-                return false;
-            }
-            return databaseTypeName.equals(that.databaseTypeName);
-        }
-    
-        @Override
-        public int hashCode() {
-            int result = adapter.hashCode();
-            result = 31 * result + scenario.hashCode();
-            result = 31 * result + databaseTypeName.hashCode();
-            return result;
+        
+        private ScenarioKey(final ParameterizedArray parameterizedArray) {
+            adapter = parameterizedArray.getAdapter();
+            scenario = parameterizedArray.getScenario();
+            databaseTypeName = parameterizedArray.getDatabaseType().getName();
         }
-    
+        
         @Override
         public String toString() {
             return String.join("-", adapter, scenario, databaseTypeName);
@@ -145,22 +117,21 @@ public final class ScenarioParallelRunnerExecutor implements ParallelRunnerExecu
     private static final class ScenarioExecutorQueue implements Closeable {
         
         private final BlockingQueue<Runnable> executorQueue;
-    
+        
         private final ExecutorService executorService;
-    
+        
         ScenarioExecutorQueue(final ScenarioKey scenarioKey) {
-            this.executorQueue = new LinkedBlockingQueue<>();
-            this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
-                    this.executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
+            executorQueue = new LinkedBlockingQueue<>();
+            executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, executorQueue, new ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey + "-pool-%d").build());
         }
-    
+        
         public Future<?> submit(final Runnable childStatement) {
-            return this.executorService.submit(childStatement);
+            return executorService.submit(childStatement);
         }
-    
+        
         @Override
-        public void close() throws IOException {
-            this.executorService.shutdownNow();
+        public void close() {
+            executorService.shutdownNow();
         }
     }
 }