You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/19 15:19:08 UTC
[incubator-skywalking] branch thread-model updated: Make heartbeat
back to work. And default threadpool size more sense.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch thread-model
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/thread-model by this push:
new f12f07f Make heartbeat back to work. And default threadpool size more sense.
f12f07f is described below
commit f12f07f6412bf50ca953b830a7a73a036ad87ee2
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Feb 19 23:14:16 2019 +0800
Make heartbeat back to work. And default threadpool size more sense.
---
.../datacarrier/consumer/BulkConsumePool.java | 6 +---
.../register/worker/RegisterDistinctWorker.java | 6 +++-
.../register/worker/RegisterPersistentWorker.java | 6 +++-
.../server/library/server/jetty/JettyServer.java | 3 --
.../receiver/mesh/TelemetryDataDispatcher.java | 34 +++++++++-------------
5 files changed, 25 insertions(+), 30 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 6befe2c..1b95c1b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool {
}
public static int recommendMaxSize() {
- int processorNum = Runtime.getRuntime().availableProcessors();
- if (processorNum > 1) {
- processorNum -= 1;
- }
- return processorNum;
+ return Runtime.getRuntime().availableProcessors() * 2;
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index ae4bc67..158f01f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
this.sources = new HashMap<>();
this.dataCarrier = new DataCarrier<>(1, 1000);
String name = "REGISTER_L1";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 1ccd21d..2bd11e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
String name = "REGISTER_L2";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index dcd66d0..af8c91e 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -20,11 +20,8 @@ package org.apache.skywalking.oap.server.library.server.jetty;
import java.net.InetSocketAddress;
import java.util.Objects;
-import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.*;
-import org.eclipse.jetty.server.*;
import org.eclipse.jetty.servlet.*;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.*;
/**
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index a74afac..e0452f8 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh;
import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.source.All;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.Service;
-import org.apache.skywalking.oap.server.core.source.ServiceInstance;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
-import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -89,7 +77,7 @@ public class TelemetryDataDispatcher {
ServiceMeshMetric metric = decorator.getMetric();
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime());
- //heartbeat(decorator, minuteTimeBucket);
+ heartbeat(decorator, minuteTimeBucket);
if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) {
toAll(decorator, minuteTimeBucket);
toService(decorator, minuteTimeBucket);
@@ -108,7 +96,10 @@ public class TelemetryDataDispatcher {
int instanceId = metric.getSourceServiceInstanceId();
ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
- SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+ // trigger heartbeat every 10s.
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ }
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}
@@ -118,7 +109,10 @@ public class TelemetryDataDispatcher {
instanceId = metric.getDestServiceInstanceId();
serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
- SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > 10 * 1000L) {
+ // trigger heartbeat every 10s.
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ }
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}