You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/14 16:10:43 UTC

[incubator-skywalking] branch new-consumer-pool updated: Make L1 and L2 aggregation thread sharing works.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch new-consumer-pool
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/new-consumer-pool by this push:
     new a4169c2  Make L1 and L2 aggregation thread sharing works.
a4169c2 is described below

commit a4169c26a3c3ca1e4d8932ca843ad1a637cb15cc
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Feb 15 00:10:33 2019 +0800

    Make L1 and L2 aggregation thread sharing works.
---
 .../datacarrier/consumer/BulkConsumePool.java      | 36 ++++++++++++++++++++++
 .../oap/server/core/UnexpectedException.java       |  4 +++
 .../analysis/worker/IndicatorAggregateWorker.java  | 10 +++++-
 .../analysis/worker/IndicatorPersistentWorker.java | 17 ++++++++--
 4 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 5b06ca6..ba5ace8 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.apm.commons.datacarrier.consumer;
 
 import java.util.*;
+import java.util.concurrent.Callable;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
 
 /**
@@ -35,6 +36,14 @@ public class BulkConsumePool implements ConsumerPool {
 
     public BulkConsumePool(String name, int size, long consumeCycle) {
         allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
+        String threadNum = System.getenv(name + "_THREAD");
+        if (threadNum != null) {
+            try {
+                size = Integer.parseInt(threadNum);
+            } catch (NumberFormatException e) {
+
+            }
+        }
         for (int i = 0; i < size; i++) {
             MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
             multipleChannelsConsumer.setDaemon(true);
@@ -86,4 +95,31 @@ public class BulkConsumePool implements ConsumerPool {
         }
         isStarted = true;
     }
+
+    /**
+     * The creator for {@link BulkConsumePool}.
+     */
+    public static class Creator implements Callable<ConsumerPool> {
+        private String name;
+        private int size;
+        private long consumeCycle;
+
+        public Creator(String name, int size, long consumeCycle) {
+            this.name = name;
+            this.size = size;
+            this.consumeCycle = consumeCycle;
+        }
+
+        @Override public ConsumerPool call() {
+            return new BulkConsumePool(name, size, consumeCycle);
+        }
+
+        public static int recommendMaxSize() {
+            int processorNum = Runtime.getRuntime().availableProcessors();
+            if (processorNum > 1) {
+                processorNum -= 1;
+            }
+            return processorNum;
+        }
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
index f290fd0..f5dc51b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -25,4 +25,8 @@ public class UnexpectedException extends RuntimeException {
     public UnexpectedException(String message) {
         super(message);
     }
+
+    public UnexpectedException(String message, Exception cause) {
+        super(message, cause);
+    }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index f11d014..148534a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -53,7 +54,14 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.mergeDataCache = new MergeDataCache<>();
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
 
-        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new AggregatorConsumer(this));
+        String name = "INDICATOR_L1_AGGREGATION";
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer(this));
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index 172874a..e90d98a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
 import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
 import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
@@ -55,9 +56,21 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.mergeDataCache = new MergeDataCache<>();
         this.indicatorDAO = indicatorDAO;
         this.nextWorker = nextWorker;
-        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
 
-        this.dataCarrier.consume(ConsumerPoolFactory.DEFAULT_POOL, new PersistentConsumer(this));
+        String name = "INDICATOR_L2_AGGREGATION";
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 20);
+        try {
+            ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
+        } catch (Exception e) {
+            throw new UnexpectedException(e.getMessage(), e);
+        }
+
+        this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 2000);
+        this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
     }
 
     @Override void onWork(Indicator indicator) {