You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/19 15:19:08 UTC

[incubator-skywalking] branch thread-model updated: Make heartbeat back to work. And default threadpool size more sense.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch thread-model
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/thread-model by this push:
     new f12f07f  Make heartbeat back to work. And default threadpool size more sense.
f12f07f is described below

commit f12f07f6412bf50ca953b830a7a73a036ad87ee2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Feb 19 23:14:16 2019 +0800

    Make heartbeat back to work. And default threadpool size more sense.
---
 .../datacarrier/consumer/BulkConsumePool.java      |  6 +---
 .../register/worker/RegisterDistinctWorker.java    |  6 +++-
 .../register/worker/RegisterPersistentWorker.java  |  6 +++-
 .../server/library/server/jetty/JettyServer.java   |  3 --
 .../receiver/mesh/TelemetryDataDispatcher.java     | 34 +++++++++-------------
 5 files changed, 25 insertions(+), 30 deletions(-)

diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 6befe2c..1b95c1b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool {
         }
 
         public static int recommendMaxSize() {
-            int processorNum = Runtime.getRuntime().availableProcessors();
-            if (processorNum > 1) {
-                processorNum -= 1;
-            }
-            return processorNum;
+            return Runtime.getRuntime().availableProcessors() * 2;
         }
     }
 }
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index ae4bc67..158f01f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
         this.sources = new HashMap<>();
         this.dataCarrier = new DataCarrier<>(1, 1000);
         String name = "REGISTER_L1";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 1ccd21d..2bd11e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
         this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
 
         String name = "REGISTER_L2";
-        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+        int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+        if (size == 0) {
+            size = 1;
+        }
+        BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
         try {
             ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
         } catch (Exception e) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index dcd66d0..af8c91e 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -20,11 +20,8 @@ package org.apache.skywalking.oap.server.library.server.jetty;
 
 import java.net.InetSocketAddress;
 import java.util.Objects;
-import org.apache.skywalking.oap.server.library.server.Server;
 import org.apache.skywalking.oap.server.library.server.*;
-import org.eclipse.jetty.server.*;
 import org.eclipse.jetty.servlet.*;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.*;
 
 /**
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index a74afac..e0452f8 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh;
 
 import java.util.Objects;
 import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
 import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.*;
 import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.source.All;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.Service;
-import org.apache.skywalking.oap.server.core.source.ServiceInstance;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
-import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.source.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -89,7 +77,7 @@ public class TelemetryDataDispatcher {
         ServiceMeshMetric metric = decorator.getMetric();
         long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime());
 
-        //heartbeat(decorator, minuteTimeBucket);
+        heartbeat(decorator, minuteTimeBucket);
         if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) {
             toAll(decorator, minuteTimeBucket);
             toService(decorator, minuteTimeBucket);
@@ -108,7 +96,10 @@ public class TelemetryDataDispatcher {
         int instanceId = metric.getSourceServiceInstanceId();
         ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+                // trigger heartbeat every 10s.
+                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }
@@ -118,7 +109,10 @@ public class TelemetryDataDispatcher {
         instanceId = metric.getDestServiceInstanceId();
         serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
         if (Objects.nonNull(serviceInstanceInventory)) {
-            SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+                // trigger heartbeat every 10s.
+                SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+            }
         } else {
             logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
         }