You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/21 03:48:43 UTC

[incubator-skywalking] branch cluster-performance updated: Make sure bulk consume consume cycle consist with the config.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch cluster-performance
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/cluster-performance by this push:
     new eb31d26  Make sure bulk consume consume cycle consist with the config.
     new 99bc0ec  Merge branch 'cluster-performance' of https://github.com/apache/incubator-skywalking into cluster-performance
eb31d26 is described below

commit eb31d26472a99a49a0f5063b0210188129716496
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Thu Feb 21 11:47:50 2019 +0800

    Make sure bulk consume consume cycle consist with the config.
---
 .../apm/commons/datacarrier/EnvUtil.java           | 13 ++++++++++
 .../datacarrier/consumer/BulkConsumePool.java      |  1 +
 .../consumer/MultipleChannelsConsumer.java         | 29 ++++++++++++++--------
 .../analysis/worker/IndicatorAggregateWorker.java  |  4 +--
 .../analysis/worker/IndicatorPersistentWorker.java |  2 +-
 5 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
index b748ce6..15e8743 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/EnvUtil.java
@@ -36,4 +36,17 @@ public class EnvUtil {
         }
         return value;
     }
+
+    public static long getLong(String envName, long defaultValue) {
+        long value = defaultValue;
+        String envValue = System.getenv(envName);
+        if (envValue != null) {
+            try {
+                value = Integer.parseInt(envValue);
+            } catch (NumberFormatException e) {
+
+            }
+        }
+        return value;
+    }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 0043f09..87e7074 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -37,6 +37,7 @@ public class BulkConsumePool implements ConsumerPool {
 
     public BulkConsumePool(String name, int size, long consumeCycle) {
         size = EnvUtil.getInt(name + "_THREAD", size);
+        consumeCycle = EnvUtil.getLong(name + "_CONSUME_CYCLE", consumeCycle);
         allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
         for (int i = 0; i < size; i++) {
             MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 1877446..b52dc9f 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -28,10 +28,11 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
  * @author wusheng
  */
 public class MultipleChannelsConsumer extends Thread {
+    private final long consumeCycle;
     private volatile boolean running;
     private volatile ArrayList<Group> consumeTargets;
     private volatile long size;
-    private final long consumeCycle;
+    private volatile long lastConsumeTimestamp = 0;
 
     public MultipleChannelsConsumer(String threadName, long consumeCycle) {
         super(threadName);
@@ -44,18 +45,11 @@ public class MultipleChannelsConsumer extends Thread {
         running = true;
 
         while (running) {
-            boolean hasData = false;
-            for (Group target : consumeTargets) {
-                hasData = hasData || consume(target);
-            }
+            waitUntil();
 
-            if (!hasData) {
-                try {
-                    Thread.sleep(consumeCycle);
-                } catch (InterruptedException e) {
-                }
+            for (Group target : consumeTargets) {
+                consume(target);
             }
-
         }
 
         // consumer thread is going to stop
@@ -67,6 +61,19 @@ public class MultipleChannelsConsumer extends Thread {
         }
     }
 
+    private void waitUntil() {
+        long now = System.currentTimeMillis();
+
+        long waitTime = consumeCycle - (now - lastConsumeTimestamp);
+        if (waitTime > 0) {
+            try {
+                Thread.sleep(waitTime);
+            } catch (InterruptedException e) {
+            }
+        }
+        lastConsumeTimestamp = System.currentTimeMillis();
+    }
+
     private boolean consume(Group target) {
         boolean hasData;
         LinkedList consumeList = new LinkedList();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index ee90a0f..8b17e5c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -53,9 +53,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.nextWorker = nextWorker;
         this.mergeDataCache = new MergeDataCache<>();
         String name = "INDICATOR_L1_AGGREGATION";
-        this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, name, 2, 10000);
+        this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, name, 4, 10000);
 
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 1000);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 53134cf..a3f676e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -62,7 +62,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         if (size == 0) {
             size = 1;
         }
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 100);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {