You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/14 16:10:43 UTC
[incubator-skywalking] branch new-consumer-pool updated: Make L1
and L2 aggregation thread sharing works.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch new-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/new-consumer-pool by this push:
new a4169c2 Make L1 and L2 aggregation thread sharing works.
a4169c2 is described below
commit a4169c26a3c3ca1e4d8932ca843ad1a637cb15cc
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Feb 15 00:10:33 2019 +0800
Make L1 and L2 aggregation thread sharing works.
---
.../datacarrier/consumer/BulkConsumePool.java | 36 ++++++++++++++++++++++
.../oap/server/core/UnexpectedException.java | 4 +++
.../analysis/worker/IndicatorAggregateWorker.java | 10 +++++-
.../analysis/worker/IndicatorPersistentWorker.java | 17 ++++++++--
4 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 5b06ca6..ba5ace8 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.*;
+import java.util.concurrent.Callable;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
@@ -35,6 +36,14 @@ public class BulkConsumePool implements ConsumerPool {
public BulkConsumePool(String name, int size, long consumeCycle) {
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+ String threadNum = System.getenv(name + "_THREAD");
+ if (threadNum != null) {
+ try {
+ size = Integer.parseInt(threadNum);
+ } catch (NumberFormatException e) {
+
+ }
+ }
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
@@ -86,4 +95,31 @@ public class BulkConsumePool implements ConsumerPool {
}
isStarted = true;
}
+
+ /**
+ * The creator for {@link BulkConsumePool}.
+ */
+ public static class Creator implements Callable<ConsumerPool> {
+ private String name;
+ private int size;
+ private long consumeCycle;
+
+ public Creator(String name, int size, long consumeCycle) {
+ this.name = name;
+ this.size = size;
+ this.consumeCycle = consumeCycle;
+ }
+
+ @Override public ConsumerPool call() {
+ return new BulkConsumePool(name, size, consumeCycle);
+ }
+
+ public static int recommendMaxSize() {
+ int processorNum = Runtime.getRuntime().availableProcessors();
+ if (processorNum > 1) {
+ processorNum -= 1;
+ }
+ return processorNum;
+ }
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
index f290fd0..f5dc51b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -25,4 +25,8 @@ public class UnexpectedException extends RuntimeException {
public UnexpectedException(String message) {
super(message);
}
+
+ public UnexpectedException(String message, Exception cause) {
+ super(message, cause);
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index f11d014..148534a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -53,7 +54,14 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
+ String name = "INDICATOR_L1_AGGREGATION";
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 172874a..e90d98a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -55,9 +56,21 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
- this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
+ String name = "INDICATOR_L2_AGGREGATION";
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+ try {
+ ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+ } catch (Exception e) {
+ throw new UnexpectedException(e.getMessage(), e);
+ }
+
+ this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 2000);
+ this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
}
@Override void onWork(Indicator indicator) {