You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/25 15:40:25 UTC

[shardingsphere] branch master updated: Change ThreadPoolExecutor accept policy, can accept more than max thtead number (#6186)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e599d  Change ThreadPoolExecutor accept policy, can accept more than max thtead number (#6186)
e7e599d is described below

commit e7e599de97a0e7dd173753bf7999b6bdd83be9f0
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jun 25 23:40:16 2020 +0800

    Change ThreadPoolExecutor accept policy, can accept more than max thtead number (#6186)
    
    Co-authored-by: Lucas <qi...@jd.com>
---
 .../shardingsphere-scaling-core/pom.xml            |  5 ++
 .../engine/ShardingScalingExecuteEngine.java       | 13 ++---
 .../engine/ShardingScalingExecuteEngineTest.java   | 62 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 8 deletions(-)

diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index 15031f4..38afeb1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -40,6 +40,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-infra-executor</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP</artifactId>
             <scope>compile</scope>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
index 66dfd55..d0e6c8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
@@ -17,22 +17,19 @@
 
 package org.apache.shardingsphere.scaling.core.execute.engine;
 
-import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
-
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.kernel.impl.ShardingSphereThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import lombok.RequiredArgsConstructor;
 
 /**
  * Sharding scaling executor engine.
@@ -43,7 +40,7 @@ public class ShardingScalingExecuteEngine {
     
     public ShardingScalingExecuteEngine(final int maxWorkerNumber) {
         this.executorService = MoreExecutors.listeningDecorator(
-            new ThreadPoolExecutor(maxWorkerNumber, maxWorkerNumber, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy()));
+                Executors.newFixedThreadPool(maxWorkerNumber, ShardingSphereThreadFactoryBuilder.build("ShardingScaling-execute-%d")));
     }
     
     /**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
new file mode 100644
index 0000000..a89299e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.engine;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.RejectedExecutionException;
+
+public final class ShardingScalingExecuteEngineTest {
+    
+    @Test
+    public void assertSubmitMoreThanMaxWorkerNumber() {
+        ShardingScalingExecuteEngine executeEngine = new ShardingScalingExecuteEngine(2);
+        try {
+            for (int i = 0; i < 5; i++) {
+                executeEngine.submit(mockShardingScalingExecutor());
+            }
+        } catch (RejectedExecutionException ex) {
+            Assert.fail();
+        }
+    }
+    
+    private ShardingScalingExecutor mockShardingScalingExecutor() {
+        return new ShardingScalingExecutor() {
+            @Override
+            public void run() {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException ignored) {
+                
+                }
+            }
+            
+            @Override
+            public void start() {
+            
+            }
+            
+            @Override
+            public void stop() {
+            
+            }
+        };
+    }
+}