You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/20 00:29:25 UTC

[incubator-skywalking] branch master updated: Fix mesh telemetry performance issue and adjust default thread number (#2261)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new c019535  Fix mesh telemetry performance issue and adjust default thread number (#2261)
c019535 is described below

commit c019535ee92affa79f713008fe046f2353b31033
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 20 08:29:17 2019 +0800

    Fix mesh telemetry performance issue and adjust default thread number (#2261)
    
    * Try adjustment.
    
    * Remove heartbeat.
    
    * Make heartbeat back to work. And default threadpool size more sense.
    
    * Make L2 to less than before.
    
    * Make instance heartbeat works.
    
    * Try L1 aggregation thread = core * 2 * 2.
---
 .../apm/commons/datacarrier/buffer/Channels.java   |  8 ++---
 .../datacarrier/consumer/BulkConsumePool.java      |  8 ++---
 .../consumer/MultipleChannelsConsumer.java         |  3 +-
 .../analysis/worker/IndicatorAggregateWorker.java  |  5 +--
 .../analysis/worker/IndicatorPersistentWorker.java |  2 +-
 .../register/worker/RegisterDistinctWorker.java    |  6 +++-
 .../register/worker/RegisterPersistentWorker.java  |  6 +++-
 .../server/library/server/jetty/JettyServer.java   | 19 ++---------
 .../receiver/mesh/TelemetryDataDispatcher.java     | 37 ++++++++++------------
 9 files changed, 38 insertions(+), 56 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index 2a83ba0..e0f3026 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -29,6 +29,7 @@ public class Channels<T> {
     private final Buffer<T>[] bufferChannels;
     private IDataPartitioner<T> dataPartitioner;
     private BufferStrategy strategy;
+    private final long size;
 
     public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
         this.dataPartitioner = partitioner;
@@ -37,6 +38,7 @@ public class Channels<T> {
         for (int i = 0; i < channelSize; i++) {
             bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
         }
+        size = channelSize * bufferSize;
     }
 
     public boolean save(T data) {
@@ -81,12 +83,8 @@ public class Channels<T> {
         return this.bufferChannels.length;
     }
 
-    public int getBufferSize() {
-        return bufferChannels[0].getBufferSize();
-    }
-
     public long size() {
-        return (long)getChannelSize() * getBufferSize();
+        return size;
     }
 
     public Buffer<T> getBuffer(int index) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 798a601..1b95c1b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -66,7 +66,7 @@ public class BulkConsumePool implements ConsumerPool {
         for (int i = 1; i < allConsumers.size(); i++) {
             MultipleChannelsConsumer option = allConsumers.get(i);
             if (option.size() < winner.size()) {
-                return option;
+                winner = option;
             }
         }
         return winner;
@@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool {
         }
 
         public static int recommendMaxSize() {
-            int processorNum = Runtime.getRuntime().availableProcessors();
-            if (processorNum > 1) {
-                processorNum -= 1;
-            }
-            return processorNum;
+            return Runtime.getRuntime().availableProcessors() * 2;
         }
     }
 }
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 1679302..1877446 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -74,9 +74,8 @@ public class MultipleChannelsConsumer extends Thread {
             Buffer buffer = target.channels.getBuffer(i);
             consumeList.addAll(buffer.obtain());
         }
-        hasData = consumeList.size() > 0;
 
-        if (consumeList.size() > 0) {
+        if (hasData = consumeList.size() > 0) {
             try {
                 target.consumer.consume(consumeList);
             } catch (Throwable t) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 148534a..f0c3886 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -47,7 +47,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
     private final String modelName;
     private CounterMetric aggregationCounter;
 
-    IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
+    IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker,
+        String modelName) {
         super(workerId);
         this.modelName = modelName;
         this.nextWorker = nextWorker;
@@ -55,7 +56,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
         this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
 
         String name = "INDICATOR_L1_AGGREGATION";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index e90d98a..0e49e16 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -58,7 +58,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
         this.nextWorker = nextWorker;
 
         String name = "INDICATOR_L2_AGGREGATION";
-        int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
         if (size == 0) {
             size = 1;
         }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index ae4bc67..158f01f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
         this.sources = new HashMap<>();
         this.dataCarrier = new DataCarrier<>(1, 1000);
         String name = "REGISTER_L1";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 1ccd21d..2bd11e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
         this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
 
         String name = "REGISTER_L2";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index 97f4c85..af8c91e 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -18,12 +18,10 @@
 
 package org.apache.skywalking.oap.server.library.server.jetty;
 
+import java.net.InetSocketAddress;
 import java.util.Objects;
-import org.apache.skywalking.oap.server.library.server.Server;
 import org.apache.skywalking.oap.server.library.server.*;
-import org.eclipse.jetty.server.*;
 import org.eclipse.jetty.servlet.*;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.*;
 
 /**
@@ -63,20 +61,7 @@ public class JettyServer implements Server {
 
     @Override
     public void initialize() {
-        QueuedThreadPool threadPool = new QueuedThreadPool();
-        if (selectorNum > 0) {
-            threadPool.setMaxThreads(selectorNum * 2 + 2);
-        }
-
-        server = new org.eclipse.jetty.server.Server(threadPool);
-
-        HttpConfiguration httpConfig = new HttpConfiguration();
-        ServerConnector http = new ServerConnector(server, null, null, null,
-            1, selectorNum, new HttpConnectionFactory(httpConfig));
-        http.setPort(port);
-        http.setHost(host);
-
-        server.addConnector(http);
+        server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
 
         servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
         servletContextHandler.setContextPath(contextPath);
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 4f3e54b..b0c1326 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh;
 
 import java.util.Objects;
 import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.source.All;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.Service;
-import org.apache.skywalking.oap.server.core.source.ServiceInstance;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
-import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -103,22 +91,29 @@ public class TelemetryDataDispatcher {
     private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
         ServiceMeshMetric metric = decorator.getMetric();
 
+        int heartbeatCycle = 10000;
         // source
-        SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
         int instanceId = metric.getSourceServiceInstanceId();
         ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+                // trigger heartbeat every 10s.
+                SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }
 
         // dest
-        SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
         instanceId = metric.getDestServiceInstanceId();
         serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+                // trigger heartbeat every 10s.
+                SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
+                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }