You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/02/13 15:19:54 UTC

[incubator-skywalking] branch master updated: Add consumer pool to Datacarrier (#2245)

This is an automated email from the ASF dual-hosted git repository.

pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new b86426c  Add consumer pool to Datacarrier (#2245)
b86426c is described below

commit b86426c9a211e80eb7dcbecb11e4fea57540b19f
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 13 23:19:47 2019 +0800

    Add consumer pool to Datacarrier (#2245)
    
    * Extend consume pool to data carrier.
    
    * Change indicator aggregate worker and persistent worker to use default consumer pool.
    
    * Fix CI.
---
 .../apm/commons/datacarrier/DataCarrier.java       |  64 +++++-----
 .../{ConsumerPool.java => ConsumeDriver.java}      |  20 ++--
 .../commons/datacarrier/consumer/ConsumerPool.java | 130 ++-------------------
 .../datacarrier/consumer/ConsumerPoolFactory.java  |  85 ++++++++++++++
 .../apm/commons/datacarrier/consumer/IDriver.java  |  32 +++++
 ...onsumerPoolTest.java => ConsumeDriverTest.java} |  20 ++--
 .../commons/datacarrier/consumer/ConsumerTest.java |   4 +-
 .../analysis/worker/IndicatorAggregateWorker.java  |   5 +-
 .../analysis/worker/IndicatorPersistentWorker.java |   5 +-
 9 files changed, 191 insertions(+), 174 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 76ad609..2e2bb4c 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -18,12 +18,9 @@
 
 package org.apache.skywalking.apm.commons.datacarrier;
 
-import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
-import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.apm.commons.datacarrier.partition.*;
 
 /**
  * DataCarrier main class. use this instance to set Producer/Consumer Model.
@@ -32,7 +29,7 @@ public class DataCarrier<T> {
     private final int bufferSize;
     private final int channelSize;
     private Channels<T> channels;
-    private ConsumerPool<T> consumerPool;
+    private IDriver driver;
     private String name;
 
     public DataCarrier(int channelSize, int bufferSize) {
@@ -47,7 +44,8 @@ public class DataCarrier<T> {
     }
 
     /**
-     * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner}
+     * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
+     * SimpleRollingPartitioner}
      *
      * @param dataPartitioner to partition data into different channel by some rules.
      * @return DataCarrier instance for chain
@@ -79,8 +77,8 @@ public class DataCarrier<T> {
      * @return false means produce data failure. The data will not be consumed.
      */
     public boolean produce(T data) {
-        if (consumerPool != null) {
-            if (!consumerPool.isRunning()) {
+        if (driver != null) {
+            if (!driver.isRunning(channels)) {
                 return false;
             }
         }
@@ -89,22 +87,22 @@ public class DataCarrier<T> {
     }
 
     /**
-     * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
      *
      * @param consumerClass class of consumer
      * @param num number of consumer threads
      */
     public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
-        if (consumerPool != null) {
-            consumerPool.close();
+        if (driver != null) {
+            driver.close(channels);
         }
-        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
-        consumerPool.begin();
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
+        driver.begin(channels);
         return this;
     }
 
     /**
-     * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
      * millis consume cycle.
      *
      * @param consumerClass class of consumer
@@ -115,23 +113,23 @@ public class DataCarrier<T> {
     }
 
     /**
-     * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
      *
      * @param consumer single instance of consumer, all consumer threads will all use this instance.
      * @param num number of consumer threads
      * @return
      */
     public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
-        if (consumerPool != null) {
-            consumerPool.close();
+        if (driver != null) {
+            driver.close(channels);
         }
-        consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
-        consumerPool.begin();
+        driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
+        driver.begin(channels);
         return this;
     }
 
     /**
-     * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+     * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
      * millis consume cycle.
      *
      * @param consumer single instance of consumer, all consumer threads will all use this instance.
@@ -143,13 +141,27 @@ public class DataCarrier<T> {
     }
 
     /**
+     * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
+     * model to adjust the consumer thread and throughput.
+     *
+     * @param consumerPool
+     * @return
+     */
+    public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
+        driver = consumerPool;
+        consumerPool.add(this.name, channels, consumer);
+        driver.begin(channels);
+        return this;
+    }
+
+    /**
      * shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
-     * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
-     * Better way to change consumers are use {@link DataCarrier#consume}
+     * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
+     * Better way to change consumeDriver are use {@link DataCarrier#consume}
      */
     public void shutdownConsumers() {
-        if (consumerPool != null) {
-            consumerPool.close();
+        if (driver != null) {
+            driver.close(channels);
         }
     }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
similarity index 89%
copy from apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
copy to apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index d3ba23e..3dcc26b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -20,19 +20,18 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
 import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
 
 /**
  * Pool of consumers <p> Created by wusheng on 2016/10/25.
  */
-public class ConsumerPool<T> {
+public class ConsumeDriver<T> implements IDriver {
     private boolean running;
     private ConsumerThread[] consumerThreads;
     private Channels<T> channels;
     private ReentrantLock lock;
 
-    public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
+    public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
         long consumeCycle) {
         this(channels, num);
         for (int i = 0; i < num; i++) {
@@ -41,7 +40,7 @@ public class ConsumerPool<T> {
         }
     }
 
-    public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+    public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
         this(channels, num);
         prototype.init();
         for (int i = 0; i < num; i++) {
@@ -51,7 +50,7 @@ public class ConsumerPool<T> {
 
     }
 
-    private ConsumerPool(Channels<T> channels, int num) {
+    private ConsumeDriver(Channels<T> channels, int num) {
         running = false;
         this.channels = channels;
         consumerThreads = new ConsumerThread[num];
@@ -70,7 +69,8 @@ public class ConsumerPool<T> {
         }
     }
 
-    public void begin() {
+    @Override
+    public void begin(Channels channels) {
         if (running) {
             return;
         }
@@ -86,7 +86,8 @@ public class ConsumerPool<T> {
         }
     }
 
-    public boolean isRunning() {
+    @Override
+    public boolean isRunning(Channels channels) {
         return running;
     }
 
@@ -134,7 +135,8 @@ public class ConsumerPool<T> {
 
     }
 
-    public void close() {
+    @Override
+    public void close(Channels channels) {
         try {
             lock.lock();
             this.running = false;
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
index d3ba23e..a098274 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
@@ -18,131 +18,15 @@
 
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
-import java.util.ArrayList;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 /**
- * Pool of consumers <p> Created by wusheng on 2016/10/25.
+ * The Consumer pool could support data consumer from multiple {@link DataCarrier}s,
+ * by using different consume thread management models.
+ *
+ * @author wusheng
  */
-public class ConsumerPool<T> {
-    private boolean running;
-    private ConsumerThread[] consumerThreads;
-    private Channels<T> channels;
-    private ReentrantLock lock;
-
-    public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
-        long consumeCycle) {
-        this(channels, num);
-        for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
-            consumerThreads[i].setDaemon(true);
-        }
-    }
-
-    public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
-        this(channels, num);
-        prototype.init();
-        for (int i = 0; i < num; i++) {
-            consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
-            consumerThreads[i].setDaemon(true);
-        }
-
-    }
-
-    private ConsumerPool(Channels<T> channels, int num) {
-        running = false;
-        this.channels = channels;
-        consumerThreads = new ConsumerThread[num];
-        lock = new ReentrantLock();
-    }
-
-    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
-        try {
-            IConsumer<T> inst = consumerClass.newInstance();
-            inst.init();
-            return inst;
-        } catch (InstantiationException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        } catch (IllegalAccessException e) {
-            throw new ConsumerCannotBeCreatedException(e);
-        }
-    }
-
-    public void begin() {
-        if (running) {
-            return;
-        }
-        try {
-            lock.lock();
-            this.allocateBuffer2Thread();
-            for (ConsumerThread consumerThread : consumerThreads) {
-                consumerThread.start();
-            }
-            running = true;
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    public boolean isRunning() {
-        return running;
-    }
-
-    private void allocateBuffer2Thread() {
-        int channelSize = this.channels.getChannelSize();
-        if (channelSize < consumerThreads.length) {
-            /**
-             * if consumerThreads.length > channelSize
-             * each channel will be process by several consumers.
-             */
-            ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
-            for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
-                int index = threadIndex % channelSize;
-                if (threadAllocation[index] == null) {
-                    threadAllocation[index] = new ArrayList<Integer>();
-                }
-                threadAllocation[index].add(threadIndex);
-            }
-
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
-                Buffer<T> channel = this.channels.getBuffer(channelIndex);
-                int bufferSize = channel.getBufferSize();
-                int step = bufferSize / threadAllocationPerChannel.size();
-                for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
-                    int threadIndex = threadAllocationPerChannel.get(i);
-                    int start = i * step;
-                    int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
-                    consumerThreads[threadIndex].addDataSource(channel, start, end);
-                }
-            }
-        } else {
-            /**
-             * if consumerThreads.length < channelSize
-             * each consumer will process several channels.
-             *
-             * if consumerThreads.length == channelSize
-             * each consumer will process one channel.
-             */
-            for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
-                int consumerIndex = channelIndex % consumerThreads.length;
-                consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
-            }
-        }
-
-    }
-
-    public void close() {
-        try {
-            lock.lock();
-            this.running = false;
-            for (ConsumerThread consumerThread : consumerThreads) {
-                consumerThread.shutdown();
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
+public interface ConsumerPool extends IDriver {
+    void add(String name, Channels channels, IConsumer consumer);
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
new file mode 100644
index 0000000..e95e18f
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * Consumer Pool Factory provides global management for all Consumer Pool.
+ *
+ * @author wusheng
+ */
+public enum ConsumerPoolFactory {
+    INSTANCE;
+
+    private Map<String, ConsumerPool> pools;
+
+    ConsumerPoolFactory() {
+        pools = new HashMap<String, ConsumerPool>();
+    }
+
+    public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
+        if (pools.containsKey(poolName)) {
+            return false;
+        } else {
+            pools.put(poolName, creator.call());
+            return true;
+        }
+    }
+
+    public ConsumerPool get(String poolName) {
+        return pools.get(poolName);
+    }
+
+    /**
+     * Default pool provides the same capabilities as DataCarrier#consume(IConsumer, 1), which alloc one thread for one
+     * DataCarrier.
+     */
+    public static final ConsumerPool DEFAULT_POOL = new ConsumerPool() {
+        private Map<Channels, ConsumeDriver> allDrivers = new HashMap<Channels, ConsumeDriver>();
+
+        @Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
+            if (!allDrivers.containsKey(channels)) {
+                ConsumeDriver consumeDriver = new ConsumeDriver(name, channels, consumer, 1, 20);
+                allDrivers.put(channels, consumeDriver);
+            }
+        }
+
+        @Override public boolean isRunning(Channels channels) {
+            ConsumeDriver driver = allDrivers.get(channels);
+            return driver == null ? false : driver.isRunning(channels);
+        }
+
+        @Override public void close(Channels channels) {
+            ConsumeDriver driver = allDrivers.get(channels);
+            if (driver != null) {
+                driver.close(channels);
+            }
+        }
+
+        @Override public void begin(Channels channels) {
+            ConsumeDriver driver = allDrivers.get(channels);
+            if (driver != null) {
+                driver.begin(channels);
+            }
+        }
+    };
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java
new file mode 100644
index 0000000..74368ce
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * The driver of consumer.
+ *
+ * @author wusheng
+ */
+public interface IDriver {
+    boolean isRunning(Channels channels);
+    void close(Channels channels);
+    void begin(Channels channels);
+}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
similarity index 77%
rename from apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
rename to apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
index 885a5bf..ec59162 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
@@ -30,28 +30,28 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
 /**
  * Created by wusheng on 2016/10/26.
  */
-public class ConsumerPoolTest {
+public class ConsumeDriverTest {
     @Test
-    public void testBeginConsumerPool() throws IllegalAccessException {
+    public void testBeginConsumeDriver() throws IllegalAccessException {
         Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
-        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
-        pool.begin();
+        ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
+        pool.begin(channels);
 
-        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
         Assert.assertEquals(2, threads.length);
         Assert.assertTrue(threads[0].isAlive());
         Assert.assertTrue(threads[1].isAlive());
     }
 
     @Test
-    public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
+    public void testCloseConsumeDriver() throws InterruptedException, IllegalAccessException {
         Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
-        ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
-        pool.begin();
+        ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
+        pool.begin(channels);
 
         Thread.sleep(5000);
-        pool.close();
-        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+        pool.close(channels);
+        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
 
         Assert.assertEquals(2, threads.length);
         Assert.assertFalse((Boolean)MemberModifier.field(ConsumerThread.class, "running").get(threads[0]));
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 3a5d583..4433b9c 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -129,8 +129,8 @@ public class ConsumerTest {
     }
 
     private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
-        ConsumerPool pool = (ConsumerPool)MemberModifier.field(DataCarrier.class, "consumerPool").get(carrier);
-        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+        ConsumeDriver pool = (ConsumeDriver)MemberModifier.field(DataCarrier.class, "driver").get(carrier);
+        ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
 
         return (IConsumer)MemberModifier.field(ConsumerThread.class, "consumer").get(threads[0]);
     }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 9d28c1c..f11d014 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -52,7 +52,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+
+        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 8763d8b..172874a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -23,7 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -56,7 +56,8 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
         this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
+
+        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
     }
 
     @Override void onWork(Indicator indicator) {