You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/13 13:25:31 UTC
[incubator-skywalking] branch datacarrier-consumer-pool updated:
Change indicator aggregate worker and persistent worker to use default
consumer pool.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch datacarrier-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/datacarrier-consumer-pool by this push:
new 566c873 Change indicator aggregate worker and persistent worker to use default consumer pool.
566c873 is described below
commit 566c87308a17dd5bb88a507582a238b18e07302d
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 13 21:25:23 2019 +0800
Change indicator aggregate worker and persistent worker to use default consumer pool.
---
.../org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java | 1 +
.../oap/server/core/analysis/worker/IndicatorAggregateWorker.java | 5 +++--
.../oap/server/core/analysis/worker/IndicatorPersistentWorker.java | 5 +++--
3 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 70a2366..2e2bb4c 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -150,6 +150,7 @@ public class DataCarrier<T> {
public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
driver = consumerPool;
consumerPool.add(this.name, channels, consumer);
+ driver.begin(channels);
return this;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 9d28c1c..f11d014 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -52,7 +52,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+
+ this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 8763d8b..172874a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -23,7 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -56,7 +56,8 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
+
+ this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
}
@Override void onWork(Indicator indicator) {