You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/13 13:25:31 UTC

[incubator-skywalking] branch datacarrier-consumer-pool updated: Change indicator aggregate worker and persistent worker to use default consumer pool.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch datacarrier-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/datacarrier-consumer-pool by this push:
     new 566c873  Change indicator aggregate worker and persistent worker to use default consumer pool.
566c873 is described below

commit 566c87308a17dd5bb88a507582a238b18e07302d
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 13 21:25:23 2019 +0800

    Change indicator aggregate worker and persistent worker to use default consumer pool.
---
 .../org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java   | 1 +
 .../oap/server/core/analysis/worker/IndicatorAggregateWorker.java    | 5 +++--
 .../oap/server/core/analysis/worker/IndicatorPersistentWorker.java   | 5 +++--
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
index 70a2366..2e2bb4c 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/DataCarrier.java
@@ -150,6 +150,7 @@ public class DataCarrier<T> {
     public DataCarrier consume(ConsumerPool consumerPool, IConsumer<T> consumer) {
         driver = consumerPool;
         consumerPool.add(this.name, channels, consumer);
+        driver.begin(channels);
         return this;
     }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 9d28c1c..f11d014 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -52,7 +52,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+
+        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 8763d8b..172874a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -23,7 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -56,7 +56,8 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
         this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
-        this.dataCarrier.consume(new IndicatorPersistentWorker.PersistentConsumer(this), 1);
+
+        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
     }
 
     @Override void onWork(Indicator indicator) {