You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/21 03:48:43 UTC
[incubator-skywalking] branch cluster-performance updated: Make
sure bulk consume consume cycle consist with the config.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch cluster-performance
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/cluster-performance by this push:
new eb31d26 Make sure bulk consume consume cycle consist with the config.
new 99bc0ec Merge branch 'cluster-performance' of https://github.com/apache/incubator-skywalking into cluster-performance
eb31d26 is described below
commit eb31d26472a99a49a0f5063b0210188129716496
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Feb 21 11:47:50 2019 +0800
Make sure bulk consume consume cycle consist with the config.
---
.../apm/commons/datacarrier/EnvUtil.java | 13 ++++++++++
.../datacarrier/consumer/BulkConsumePool.java | 1 +
.../consumer/MultipleChannelsConsumer.java | 29 ++++++++++++++--------
.../analysis/worker/IndicatorAggregateWorker.java | 4 +--
.../analysis/worker/IndicatorPersistentWorker.java | 2 +-
5 files changed, 35 insertions(+), 14 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
index b748ce6..15e8743 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
@@ -36,4 +36,17 @@ public class EnvUtil {
}
return value;
}
+
+ public static long getLong(String envName, long defaultValue) {
+ long value = defaultValue;
+ String envValue = System.getenv(envName);
+ if (envValue != null) {
+ try {
+ value = Integer.parseInt(envValue);
+ } catch (NumberFormatException e) {
+
+ }
+ }
+ return value;
+ }
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 0043f09..87e7074 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -37,6 +37,7 @@ public class BulkConsumePool implements ConsumerPool {
public BulkConsumePool(String name, int size, long consumeCycle) {
size = EnvUtil.getInt(name + "_THREAD", size);
+ consumeCycle = EnvUtil.getLong(name + "_CONSUME_CYCLE", consumeCycle);
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 1877446..b52dc9f 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -28,10 +28,11 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
* @author wusheng
*/
public class MultipleChannelsConsumer extends Thread {
+ private final long consumeCycle;
private volatile boolean running;
private volatile ArrayList<Group> consumeTargets;
private volatile long size;
- private final long consumeCycle;
+ private volatile long lastConsumeTimestamp = 0;
public MultipleChannelsConsumer(String threadName, long consumeCycle) {
super(threadName);
@@ -44,18 +45,11 @@ public class MultipleChannelsConsumer extends Thread {
running = true;
while (running) {
- boolean hasData = false;
- for (Group target : consumeTargets) {
- hasData = hasData || consume(target);
- }
+ waitUntil();
- if (!hasData) {
- try {
- Thread.sleep(consumeCycle);
- } catch (InterruptedException e) {
- }
+ for (Group target : consumeTargets) {
+ consume(target);
}
-
}
// consumer thread is going to stop
@@ -67,6 +61,19 @@ public class MultipleChannelsConsumer extends Thread {
}
}
+ private void waitUntil() {
+ long now = System.currentTimeMillis();
+
+ long waitTime = consumeCycle - (now - lastConsumeTimestamp);
+ if (waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ }
+ }
+ lastConsumeTimestamp = System.currentTimeMillis();
+ }
+
private boolean consume(Group target) {
boolean hasData;
LinkedList consumeList = new LinkedList();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index ee90a0f..8b17e5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -53,9 +53,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
String name = "INDICATOR_L1_AGGREGATION";
- this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, name, 2, 10000);
+ this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, name, 4, 10000);
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 1000);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 53134cf..a3f676e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -62,7 +62,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
if (size == 0) {
size = 1;
}
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 100);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {