You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by il...@apache.org on 2018/04/09 14:56:15 UTC

[incubator-dubbo] branch master updated: Extension: Eager Thread Pool (#1568)

This is an automated email from the ASF dual-hosted git repository.

iluo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 38f45ee  Extension: Eager Thread Pool (#1568)
38f45ee is described below

commit 38f45eec3ad193a7b35ea0f9dfdce7cb30611705
Author: 时无两丶 <44...@qq.com>
AuthorDate: Mon Apr 9 22:55:56 2018 +0800

    Extension: Eager Thread Pool (#1568)
    
    * Extension: Enhanced Thread Pool
    A thread pool that can provide faster processing speeds when there are more tasks (of course it consumes more resources)
    * When the number of tasks exceeds the core size, a new thread is first started to execute the task instead of putting it into the queue.
    * When the number of tasks is lower than the core size for a long time, the core size threads are maintained and redundant threads are recycled.
    * Compared to the fixed pool:When there are more tasks, provide more workers to handle the tasks.
    * Compared to the cached pool:The task queue in the cached pool is actually a SynchronousQueue and does not have the ability to cache tasks.
    * Whether to fail fail or put into a queue when a thread runs out:Both are feasible and need to consider which way should be applied according to the business scenario. Delayed scenarios are not allowed. Failfast is more reasonable than queues. However, if there is a certain tolerance for delays, queues are more reasonable than failfast.
    
    * remove * in import
    
    * add license to fix ci failure
    
    * rename the thread pool to EagerThreadPool
    modify sth with the code review
    format the code file
    
    * remove '*' in import statement
    
    * throw NullPointerException if the param is null.
    
    * throw NullPointerException if the param is null.
    
    * catch throwable and decrease submitted task count anyway
---
 .../threadpool/support/eager/EagerThreadPool.java  | 56 +++++++++++++
 .../support/eager/EagerThreadPoolExecutor.java     | 84 +++++++++++++++++++
 .../common/threadpool/support/eager/TaskQueue.java | 79 ++++++++++++++++++
 .../com.alibaba.dubbo.common.threadpool.ThreadPool |  1 +
 .../support/eager/EagerThreadPoolExecutorTest.java | 93 ++++++++++++++++++++++
 .../dubbo/common/utils/ConfigUtilsTest.java        |  1 +
 6 files changed, 314 insertions(+)

diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
new file mode 100644
index 0000000..eb4e1f3
--- /dev/null
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPool.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadpool.support.eager;
+
+import com.alibaba.dubbo.common.Constants;
+import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.threadpool.ThreadPool;
+import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
+import com.alibaba.dubbo.common.utils.NamedThreadFactory;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * EagerThreadPool
+ * When the core threads are all in busy,
+ * create new thread instead of putting task into blocking queue.
+ */
+public class EagerThreadPool implements ThreadPool {
+
+    @Override
+    public Executor getExecutor(URL url) {
+        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
+        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
+        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
+        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
+        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
+
+        // init queue and executor
+        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
+        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
+                threads,
+                alive,
+                TimeUnit.MILLISECONDS,
+                taskQueue,
+                new NamedThreadFactory(name, true),
+                new AbortPolicyWithReport(name, url));
+        taskQueue.setExecutor(executor);
+        return executor;
+    }
+}
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
new file mode 100644
index 0000000..47f84c5
--- /dev/null
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadpool.support.eager;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * EagerThreadPoolExecutor
+ */
+public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
+
+    /**
+     * task count
+     */
+    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
+
+    public EagerThreadPoolExecutor(int corePoolSize,
+                                   int maximumPoolSize,
+                                   long keepAliveTime,
+                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
+                                   ThreadFactory threadFactory,
+                                   RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+    }
+
+    /**
+     * @return current tasks which are executed
+     */
+    public int getSubmittedTaskCount() {
+        return submittedTaskCount.get();
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        submittedTaskCount.decrementAndGet();
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        if (command == null) {
+            throw new NullPointerException();
+        }
+        // do not increment in method beforeExecute!
+        submittedTaskCount.incrementAndGet();
+        try {
+            super.execute(command);
+        } catch (RejectedExecutionException rx) {
+            // retry to offer the task into queue.
+            final TaskQueue queue = (TaskQueue) super.getQueue();
+            try {
+                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
+                    submittedTaskCount.decrementAndGet();
+                    throw new RejectedExecutionException("Queue capacity is full.");
+                }
+            } catch (InterruptedException x) {
+                submittedTaskCount.decrementAndGet();
+                throw new RejectedExecutionException(x);
+            }
+        } catch (Throwable t) {
+            // decrease any way
+            submittedTaskCount.decrementAndGet();
+        }
+    }
+}
diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueue.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueue.java
new file mode 100644
index 0000000..7e7d870
--- /dev/null
+++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/threadpool/support/eager/TaskQueue.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.dubbo.common.threadpool.support.eager;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TaskQueue in the EagerThreadPoolExecutor
+ * It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
+ * or the currentPoolThreadSize more than executor's maximumPoolSize.
+ * That can make the executor create new worker
+ * when the task num is bigger than corePoolSize but less than maximumPoolSize.
+ */
+public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
+
+    private static final long serialVersionUID = -2635853580887179627L;
+
+    private EagerThreadPoolExecutor executor;
+
+    public TaskQueue(int capacity) {
+        super(capacity);
+    }
+
+    public void setExecutor(EagerThreadPoolExecutor exec) {
+        executor = exec;
+    }
+
+    @Override
+    public boolean offer(Runnable runnable) {
+        if (executor == null) {
+            throw new RejectedExecutionException("The task queue does not have executor!");
+        }
+
+        int currentPoolThreadSize = executor.getPoolSize();
+        // have free worker. put task into queue to let the worker deal with task.
+        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
+            return super.offer(runnable);
+        }
+
+        // return false to let executor create new worker.
+        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
+            return false;
+        }
+
+        // currentPoolThreadSize >= max
+        return super.offer(runnable);
+    }
+
+    /**
+     * retry offer task
+     *
+     * @param o task
+     * @return offer success or not
+     * @throws RejectedExecutionException if executor is terminated.
+     */
+    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
+        if (executor.isShutdown()) {
+            throw new RejectedExecutionException("Executor is shutdown!");
+        }
+        return super.offer(o, timeout, unit);
+    }
+}
diff --git a/dubbo-common/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.common.threadpool.ThreadPool b/dubbo-common/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.common.threadpool.ThreadPool
index eeb2e10..8d87177 100644
--- a/dubbo-common/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.common.threadpool.ThreadPool
+++ b/dubbo-common/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.common.threadpool.ThreadPool
@@ -1,3 +1,4 @@
 fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
 cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
 limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
+eager=com.alibaba.dubbo.common.threadpool.support.eager.EagerThreadPool
diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
new file mode 100644
index 0000000..a9aeca2
--- /dev/null
+++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutorTest.java
@@ -0,0 +1,93 @@
+package com.alibaba.dubbo.common.threadpool.support.eager;
+
+
+import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.extension.ExtensionLoader;
+import com.alibaba.dubbo.common.threadpool.ThreadPool;
+import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
+import com.alibaba.dubbo.common.utils.NamedThreadFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class EagerThreadPoolExecutorTest {
+
+    private static final URL URL = new URL("dubbo", "localhost", 8080);
+
+    /**
+     * It print like this:
+     * thread number in current pool:1,  task number in task queue:0 executor size: 1
+     * thread number in current pool:2,  task number in task queue:0 executor size: 2
+     * thread number in current pool:3,  task number in task queue:0 executor size: 3
+     * thread number in current pool:4,  task number in task queue:0 executor size: 4
+     * thread number in current pool:5,  task number in task queue:0 executor size: 5
+     * thread number in current pool:6,  task number in task queue:0 executor size: 6
+     * thread number in current pool:7,  task number in task queue:0 executor size: 7
+     * thread number in current pool:8,  task number in task queue:0 executor size: 8
+     * thread number in current pool:9,  task number in task queue:0 executor size: 9
+     * thread number in current pool:10,  task number in task queue:0 executor size: 10
+     * thread number in current pool:10,  task number in task queue:4 executor size: 10
+     * thread number in current pool:10,  task number in task queue:3 executor size: 10
+     * thread number in current pool:10,  task number in task queue:2 executor size: 10
+     * thread number in current pool:10,  task number in task queue:1 executor size: 10
+     * thread number in current pool:10,  task number in task queue:0 executor size: 10
+     * <p>
+     * We can see , when the core threads are in busy,
+     * the thread pool create thread (but thread nums always less than max) instead of put task into queue.
+     */
+    @Test
+    public void testEagerThreadPool() throws Exception {
+        String name = "eager-tf";
+        int queues = 5;
+        int cores = 5;
+        int threads = 10;
+        // alive 1 second
+        long alive = 1000;
+
+        //init queue and executor
+        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues);
+        final EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
+                threads,
+                alive,
+                TimeUnit.MILLISECONDS,
+                taskQueue,
+                new NamedThreadFactory(name, true),
+                new AbortPolicyWithReport(name, URL));
+        taskQueue.setExecutor(executor);
+
+        for (int i = 0; i < 15; i++) {
+            Thread.sleep(50);
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    System.out.println("thread number in current pool:"
+                            + executor.getPoolSize()
+                            + ",  task number in task queue:"
+                            + executor.getQueue().size()
+                            + " executor size: "
+                            + executor.getPoolSize());
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+        Thread.sleep(5000);
+        // cores theads are all alive.
+        Assert.assertTrue("more than cores threads alive!", executor.getPoolSize() == cores);
+    }
+
+    @Test
+    public void testSPI() {
+        ExecutorService executorService = (ExecutorService) ExtensionLoader
+                .getExtensionLoader(ThreadPool.class)
+                .getExtension("eager")
+                .getExecutor(URL);
+        Assert.assertTrue("test spi fail!",
+                executorService.getClass().getSimpleName().equals("EagerThreadPoolExecutor"));
+    }
+}
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/com/alibaba/dubbo/common/utils/ConfigUtilsTest.java b/dubbo-common/src/test/java/com/alibaba/dubbo/common/utils/ConfigUtilsTest.java
index 8d32e7e..8b11281 100644
--- a/dubbo-common/src/test/java/com/alibaba/dubbo/common/utils/ConfigUtilsTest.java
+++ b/dubbo-common/src/test/java/com/alibaba/dubbo/common/utils/ConfigUtilsTest.java
@@ -116,6 +116,7 @@ public class ConfigUtilsTest {
         expected.put("fixed", "com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool");
         expected.put("cached", "com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool");
         expected.put("limited", "com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool");
+        expected.put("eager", "com.alibaba.dubbo.common.threadpool.support.eager.EagerThreadPool");
 
         Assert.assertEquals(expected, p);
     }

-- 
To stop receiving notification emails like this one, please contact
iluo@apache.org.