You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2020/06/25 15:40:25 UTC
[shardingsphere] branch master updated: Change ThreadPoolExecutor
accept policy, can accept more than max thtead number (#6186)
This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e7e599d Change ThreadPoolExecutor accept policy, can accept more than max thtead number (#6186)
e7e599d is described below
commit e7e599de97a0e7dd173753bf7999b6bdd83be9f0
Author: 邱鹿 Lucas <lu...@163.com>
AuthorDate: Thu Jun 25 23:40:16 2020 +0800
Change ThreadPoolExecutor accept policy, can accept more than max thtead number (#6186)
Co-authored-by: Lucas <qi...@jd.com>
---
.../shardingsphere-scaling-core/pom.xml | 5 ++
.../engine/ShardingScalingExecuteEngine.java | 13 ++---
.../engine/ShardingScalingExecuteEngineTest.java | 62 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 8 deletions(-)
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
index 15031f4..38afeb1 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/pom.xml
@@ -40,6 +40,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.shardingsphere</groupId>
+ <artifactId>shardingsphere-infra-executor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<scope>compile</scope>
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
index 66dfd55..d0e6c8e 100644
--- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngine.java
@@ -17,22 +17,19 @@
package org.apache.shardingsphere.scaling.core.execute.engine;
-import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
-
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.executor.kernel.impl.ShardingSphereThreadFactoryBuilder;
+import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import lombok.RequiredArgsConstructor;
/**
* Sharding scaling executor engine.
@@ -43,7 +40,7 @@ public class ShardingScalingExecuteEngine {
public ShardingScalingExecuteEngine(final int maxWorkerNumber) {
this.executorService = MoreExecutors.listeningDecorator(
- new ThreadPoolExecutor(maxWorkerNumber, maxWorkerNumber, 0, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.AbortPolicy()));
+ Executors.newFixedThreadPool(maxWorkerNumber, ShardingSphereThreadFactoryBuilder.build("ShardingScaling-execute-%d")));
}
/**
diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
new file mode 100644
index 0000000..a89299e
--- /dev/null
+++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/engine/ShardingScalingExecuteEngineTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.engine;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.ShardingScalingExecutor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.RejectedExecutionException;
+
+public final class ShardingScalingExecuteEngineTest {
+
+ @Test
+ public void assertSubmitMoreThanMaxWorkerNumber() {
+ ShardingScalingExecuteEngine executeEngine = new ShardingScalingExecuteEngine(2);
+ try {
+ for (int i = 0; i < 5; i++) {
+ executeEngine.submit(mockShardingScalingExecutor());
+ }
+ } catch (RejectedExecutionException ex) {
+ Assert.fail();
+ }
+ }
+
+ private ShardingScalingExecutor mockShardingScalingExecutor() {
+ return new ShardingScalingExecutor() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+
+ }
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+ };
+ }
+}