You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by pe...@apache.org on 2019/02/13 15:19:54 UTC
[incubator-skywalking] branch master updated: Add consumer pool to
Datacarrier (#2245)
This is an automated email from the ASF dual-hosted git repository.
pengys pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new b86426c Add consumer pool to Datacarrier (#2245)
b86426c is described below
commit b86426c9a211e80eb7dcbecb11e4fea57540b19f
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 13 23:19:47 2019 +0800
Add consumer pool to Datacarrier (#2245)
* Extend consume pool to data carrier.
* Change indicator aggregate worker and persistent worker to use default consumer pool.
* Fix CI.
---
.../apm/commons/datacarrier/DataCarrier.java | 64 +++++-----
.../{ConsumerPool.java => ConsumeDriver.java} | 20 ++--
.../commons/datacarrier/consumer/ConsumerPool.java | 130 ++-------------------
.../datacarrier/consumer/ConsumerPoolFactory.java | 85 ++++++++++++++
.../apm/commons/datacarrier/consumer/IDriver.java | 32 +++++
...onsumerPoolTest.java => ConsumeDriverTest.java} | 20 ++--
.../commons/datacarrier/consumer/ConsumerTest.java | 4 +-
.../analysis/worker/IndicatorAggregateWorker.java | 5 +-
.../analysis/worker/IndicatorPersistentWorker.java | 5 +-
9 files changed, 191 insertions(+), 174 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 76ad609..2e2bb4c 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -18,12 +18,9 @@
package org.apache.skywalking.apm.commons.datacarrier;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
-import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
-import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.apm.commons.datacarrier.partition.*;
/**
* DataCarrier main class. use this instance to set Producer/Consumer Model.
@@ -32,7 +29,7 @@ public class DataCarrier<T> {
private final int bufferSize;
private final int channelSize;
private Channels<T> channels;
- private ConsumerPool<T> consumerPool;
+ private IDriver driver;
private String name;
public DataCarrier(int channelSize, int bufferSize) {
@@ -47,7 +44,8 @@ public class DataCarrier<T> {
}
/**
- * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner}
+ * set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
+ * SimpleRollingPartitioner}
*
* @param dataPartitioner to partition data into different channel by some rules.
* @return DataCarrier instance for chain
@@ -79,8 +77,8 @@ public class DataCarrier<T> {
* @return false means produce data failure. The data will not be consumed.
*/
public boolean produce(T data) {
- if (consumerPool != null) {
- if (!consumerPool.isRunning()) {
+ if (driver != null) {
+ if (!driver.isRunning(channels)) {
return false;
}
}
@@ -89,22 +87,22 @@ public class DataCarrier<T> {
}
/**
- * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
- if (consumerPool != null) {
- consumerPool.close();
+ if (driver != null) {
+ driver.close(channels);
}
- consumerPool = new ConsumerPool<T>(this.name, this.channels, consumerClass, num, consumeCycle);
- consumerPool.begin();
+ driver = new ConsumeDriver<T>(this.name, this.channels, consumerClass, num, consumeCycle);
+ driver.begin(channels);
return this;
}
/**
- * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* millis consume cycle.
*
* @param consumerClass class of consumer
@@ -115,23 +113,23 @@ public class DataCarrier<T> {
}
/**
- * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
- if (consumerPool != null) {
- consumerPool.close();
+ if (driver != null) {
+ driver.close(channels);
}
- consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
- consumerPool.begin();
+ driver = new ConsumeDriver<T>(this.name, this.channels, consumer, num, consumeCycle);
+ driver.begin(channels);
return this;
}
/**
- * set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
+ * set consumeDriver to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work with 20
* millis consume cycle.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
@@ -143,13 +141,27 @@ public class DataCarrier<T> {
}
/**
+ * Set a consumer pool to manage the channels of this DataCarrier. Then consumerPool could use its own consuming
+ * model to adjust the consumer thread and throughput.
+ *
+ * @param consumerPool
+ * @return
+ */
+ public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
+ driver = consumerPool;
+ consumerPool.add(this.name, channels, consumer);
+ driver.begin(channels);
+ return this;
+ }
+
+ /**
* shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
- * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
- * Better way to change consumers are use {@link DataCarrier#consume}
+ * BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumeDriver maybe cause blocking when producing.
+ * Better way to change consumeDriver are use {@link DataCarrier#consume}
*/
public void shutdownConsumers() {
- if (consumerPool != null) {
- consumerPool.close();
+ if (driver != null) {
+ driver.close(channels);
}
}
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
similarity index 89%
copy from apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
copy to apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
index d3ba23e..3dcc26b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriver.java
@@ -20,19 +20,18 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
/**
* Pool of consumers <p> Created by wusheng on 2016/10/25.
*/
-public class ConsumerPool<T> {
+public class ConsumeDriver<T> implements IDriver {
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
- public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
+ public ConsumeDriver(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
@@ -41,7 +40,7 @@ public class ConsumerPool<T> {
}
}
- public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
+ public ConsumeDriver(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
@@ -51,7 +50,7 @@ public class ConsumerPool<T> {
}
- private ConsumerPool(Channels<T> channels, int num) {
+ private ConsumeDriver(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
@@ -70,7 +69,8 @@ public class ConsumerPool<T> {
}
}
- public void begin() {
+ @Override
+ public void begin(Channels channels) {
if (running) {
return;
}
@@ -86,7 +86,8 @@ public class ConsumerPool<T> {
}
}
- public boolean isRunning() {
+ @Override
+ public boolean isRunning(Channels channels) {
return running;
}
@@ -134,7 +135,8 @@ public class ConsumerPool<T> {
}
- public void close() {
+ @Override
+ public void close(Channels channels) {
try {
lock.lock();
this.running = false;
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
index d3ba23e..a098274 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.java
@@ -18,131 +18,15 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
-import java.util.ArrayList;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
- * Pool of consumers <p> Created by wusheng on 2016/10/25.
+ * The Consumer pool could support data consumer from multiple {@link DataCarrier}s,
+ * by using different consume thread management models.
+ *
+ * @author wusheng
*/
-public class ConsumerPool<T> {
- private boolean running;
- private ConsumerThread[] consumerThreads;
- private Channels<T> channels;
- private ReentrantLock lock;
-
- public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
- long consumeCycle) {
- this(channels, num);
- for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
- consumerThreads[i].setDaemon(true);
- }
- }
-
- public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
- this(channels, num);
- prototype.init();
- for (int i = 0; i < num; i++) {
- consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
- consumerThreads[i].setDaemon(true);
- }
-
- }
-
- private ConsumerPool(Channels<T> channels, int num) {
- running = false;
- this.channels = channels;
- consumerThreads = new ConsumerThread[num];
- lock = new ReentrantLock();
- }
-
- private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> consumerClass) {
- try {
- IConsumer<T> inst = consumerClass.newInstance();
- inst.init();
- return inst;
- } catch (InstantiationException e) {
- throw new ConsumerCannotBeCreatedException(e);
- } catch (IllegalAccessException e) {
- throw new ConsumerCannotBeCreatedException(e);
- }
- }
-
- public void begin() {
- if (running) {
- return;
- }
- try {
- lock.lock();
- this.allocateBuffer2Thread();
- for (ConsumerThread consumerThread : consumerThreads) {
- consumerThread.start();
- }
- running = true;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isRunning() {
- return running;
- }
-
- private void allocateBuffer2Thread() {
- int channelSize = this.channels.getChannelSize();
- if (channelSize < consumerThreads.length) {
- /**
- * if consumerThreads.length > channelSize
- * each channel will be process by several consumers.
- */
- ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
- for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
- int index = threadIndex % channelSize;
- if (threadAllocation[index] == null) {
- threadAllocation[index] = new ArrayList<Integer>();
- }
- threadAllocation[index].add(threadIndex);
- }
-
- for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
- ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
- Buffer<T> channel = this.channels.getBuffer(channelIndex);
- int bufferSize = channel.getBufferSize();
- int step = bufferSize / threadAllocationPerChannel.size();
- for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
- int threadIndex = threadAllocationPerChannel.get(i);
- int start = i * step;
- int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
- consumerThreads[threadIndex].addDataSource(channel, start, end);
- }
- }
- } else {
- /**
- * if consumerThreads.length < channelSize
- * each consumer will process several channels.
- *
- * if consumerThreads.length == channelSize
- * each consumer will process one channel.
- */
- for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
- int consumerIndex = channelIndex % consumerThreads.length;
- consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
- }
- }
-
- }
-
- public void close() {
- try {
- lock.lock();
- this.running = false;
- for (ConsumerThread consumerThread : consumerThreads) {
- consumerThread.shutdown();
- }
- } finally {
- lock.unlock();
- }
- }
+public interface ConsumerPool extends IDriver {
+ void add(String name, Channels channels, IConsumer consumer);
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
new file mode 100644
index 0000000..e95e18f
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * Consumer Pool Factory provides global management for all Consumer Pool.
+ *
+ * @author wusheng
+ */
+public enum ConsumerPoolFactory {
+ INSTANCE;
+
+ private Map<String, ConsumerPool> pools;
+
+ ConsumerPoolFactory() {
+ pools = new HashMap<String, ConsumerPool>();
+ }
+
+ public synchronized boolean createIfAbsent(String poolName, Callable<ConsumerPool> creator) throws Exception {
+ if (pools.containsKey(poolName)) {
+ return false;
+ } else {
+ pools.put(poolName, creator.call());
+ return true;
+ }
+ }
+
+ public ConsumerPool get(String poolName) {
+ return pools.get(poolName);
+ }
+
+ /**
+ * Default pool provides the same capabilities as DataCarrier#consume(IConsumer, 1), which alloc one thread for one
+ * DataCarrier.
+ */
+ public static final ConsumerPool DEFAULT_POOL = new ConsumerPool() {
+ private Map<Channels, ConsumeDriver> allDrivers = new HashMap<Channels, ConsumeDriver>();
+
+ @Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
+ if (!allDrivers.containsKey(channels)) {
+ ConsumeDriver consumeDriver = new ConsumeDriver(name, channels, consumer, 1, 20);
+ allDrivers.put(channels, consumeDriver);
+ }
+ }
+
+ @Override public boolean isRunning(Channels channels) {
+ ConsumeDriver driver = allDrivers.get(channels);
+ return driver == null ? false : driver.isRunning(channels);
+ }
+
+ @Override public void close(Channels channels) {
+ ConsumeDriver driver = allDrivers.get(channels);
+ if (driver != null) {
+ driver.close(channels);
+ }
+ }
+
+ @Override public void begin(Channels channels) {
+ ConsumeDriver driver = allDrivers.get(channels);
+ if (driver != null) {
+ driver.begin(channels);
+ }
+ }
+ };
+}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java
new file mode 100644
index 0000000..74368ce
--- /dev/null
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/IDriver.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.commons.datacarrier.consumer;
+
+import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
+
+/**
+ * The driver of consumer.
+ *
+ * @author wusheng
+ */
+public interface IDriver {
+ boolean isRunning(Channels channels);
+ void close(Channels channels);
+ void begin(Channels channels);
+}
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
similarity index 77%
rename from apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
rename to apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
index 885a5bf..ec59162 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPoolTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumeDriverTest.java
@@ -30,28 +30,28 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
/**
* Created by wusheng on 2016/10/26.
*/
-public class ConsumerPoolTest {
+public class ConsumeDriverTest {
@Test
- public void testBeginConsumerPool() throws IllegalAccessException {
+ public void testBeginConsumeDriver() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
- ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
- pool.begin();
+ ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
+ pool.begin(channels);
- ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+ ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertTrue(threads[0].isAlive());
Assert.assertTrue(threads[1].isAlive());
}
@Test
- public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
+ public void testCloseConsumeDriver() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
- ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>("default", channels, new SampleConsumer(), 2, 20);
- pool.begin();
+ ConsumeDriver<SampleData> pool = new ConsumeDriver<SampleData>("default", channels, new SampleConsumer(), 2, 20);
+ pool.begin(channels);
Thread.sleep(5000);
- pool.close();
- ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+ pool.close(channels);
+ ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
Assert.assertEquals(2, threads.length);
Assert.assertFalse((Boolean)MemberModifier.field(ConsumerThread.class, "running").get(threads[0]));
diff --git a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
index 3a5d583..4433b9c 100644
--- a/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
+++ b/apm-commons/apm-datacarrier/src/test/java/org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerTest.java
@@ -129,8 +129,8 @@ public class ConsumerTest {
}
private IConsumer getConsumer(DataCarrier<SampleData> carrier) throws IllegalAccessException {
- ConsumerPool pool = (ConsumerPool)MemberModifier.field(DataCarrier.class, "consumerPool").get(carrier);
- ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
+ ConsumeDriver pool = (ConsumeDriver)MemberModifier.field(DataCarrier.class, "driver").get(carrier);
+ ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumeDriver.class, "consumerThreads").get(pool);
return (IConsumer)MemberModifier.field(ConsumerThread.class, "consumer").get(threads[0]);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 9d28c1c..f11d014 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -52,7 +52,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+
+ this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 8763d8b..172874a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -23,7 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -56,7 +56,8 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
+
+ this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
}
@Override void onWork(Indicator indicator) {