You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/02/20 00:29:25 UTC
[incubator-skywalking] branch master updated: Fix mesh telemetry
performance issue and adjust default thread number (#2261)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new c019535 Fix mesh telemetry performance issue and adjust default thread number (#2261)
c019535 is described below
commit c019535ee92affa79f713008fe046f2353b31033
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Feb 20 08:29:17 2019 +0800
Fix mesh telemetry performance issue and adjust default thread number (#2261)
* Try adjustment.
* Remove heartbeat.
* Make heartbeat back to work. And default threadpool size more sense.
* Make L2 to less than before.
* Make instance heartbeat works.
* Try L1 aggregation thread = core * 2 * 2.
---
.../apm/commons/datacarrier/buffer/Channels.java | 8 ++---
.../datacarrier/consumer/BulkConsumePool.java | 8 ++---
.../consumer/MultipleChannelsConsumer.java | 3 +-
.../analysis/worker/IndicatorAggregateWorker.java | 5 +--
.../analysis/worker/IndicatorPersistentWorker.java | 2 +-
.../register/worker/RegisterDistinctWorker.java | 6 +++-
.../register/worker/RegisterPersistentWorker.java | 6 +++-
.../server/library/server/jetty/JettyServer.java | 19 ++---------
.../receiver/mesh/TelemetryDataDispatcher.java | 37 ++++++++++------------
9 files changed, 38 insertions(+), 56 deletions(-)
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
index 2a83ba0..e0f3026 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/buffer/Channels.java
@@ -29,6 +29,7 @@ public class Channels<T> {
private final Buffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
private BufferStrategy strategy;
+ private final long size;
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
@@ -37,6 +38,7 @@ public class Channels<T> {
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
+ size = channelSize * bufferSize;
}
public boolean save(T data) {
@@ -81,12 +83,8 @@ public class Channels<T> {
return this.bufferChannels.length;
}
- public int getBufferSize() {
- return bufferChannels[0].getBufferSize();
- }
-
public long size() {
- return (long)getChannelSize() * getBufferSize();
+ return size;
}
public Buffer<T> getBuffer(int index) {
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
index 798a601..1b95c1b 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/BulkConsumePool.java
@@ -66,7 +66,7 @@ public class BulkConsumePool implements ConsumerPool {
for (int i = 1; i < allConsumers.size(); i++) {
MultipleChannelsConsumer option = allConsumers.get(i);
if (option.size() < winner.size()) {
- return option;
+ winner = option;
}
}
return winner;
@@ -115,11 +115,7 @@ public class BulkConsumePool implements ConsumerPool {
}
public static int recommendMaxSize() {
- int processorNum = Runtime.getRuntime().availableProcessors();
- if (processorNum > 1) {
- processorNum -= 1;
- }
- return processorNum;
+ return Runtime.getRuntime().availableProcessors() * 2;
}
}
}
diff --git a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
index 1679302..1877446 100644
--- a/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
+++ b/apm-commons/apm-datacarrier/src/main/java/org/apache/skywalking/apm/commons/datacarrier/consumer/MultipleChannelsConsumer.java
@@ -74,9 +74,8 @@ public class MultipleChannelsConsumer extends Thread {
Buffer buffer = target.channels.getBuffer(i);
consumeList.addAll(buffer.obtain());
}
- hasData = consumeList.size() > 0;
- if (consumeList.size() > 0) {
+ if (hasData = consumeList.size() > 0) {
try {
target.consumer.consume(consumeList);
} catch (Throwable t) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 148534a..f0c3886 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -47,7 +47,8 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private final String modelName;
private CounterMetric aggregationCounter;
- IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker, String modelName) {
+ IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker,
+ String modelName) {
super(workerId);
this.modelName = modelName;
this.nextWorker = nextWorker;
@@ -55,7 +56,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
String name = "INDICATOR_L1_AGGREGATION";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize(), 20);
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index e90d98a..0e49e16 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -58,7 +58,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
this.nextWorker = nextWorker;
String name = "INDICATOR_L2_AGGREGATION";
- int size = BulkConsumePool.Creator.recommendMaxSize() / 4;
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
if (size == 0) {
size = 1;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index ae4bc67..158f01f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -45,7 +45,11 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
this.sources = new HashMap<>();
this.dataCarrier = new DataCarrier<>(1, 1000);
String name = "REGISTER_L1";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 1ccd21d..2bd11e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -55,7 +55,11 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 1000);
String name = "REGISTER_L2";
- BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 200);
+ int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
+ if (size == 0) {
+ size = 1;
+ }
+ BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, size, 200);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
index 97f4c85..af8c91e 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/jetty/JettyServer.java
@@ -18,12 +18,10 @@
package org.apache.skywalking.oap.server.library.server.jetty;
+import java.net.InetSocketAddress;
import java.util.Objects;
-import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.*;
-import org.eclipse.jetty.server.*;
import org.eclipse.jetty.servlet.*;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.*;
/**
@@ -63,20 +61,7 @@ public class JettyServer implements Server {
@Override
public void initialize() {
- QueuedThreadPool threadPool = new QueuedThreadPool();
- if (selectorNum > 0) {
- threadPool.setMaxThreads(selectorNum * 2 + 2);
- }
-
- server = new org.eclipse.jetty.server.Server(threadPool);
-
- HttpConfiguration httpConfig = new HttpConfiguration();
- ServerConnector http = new ServerConnector(server, null, null, null,
- 1, selectorNum, new HttpConnectionFactory(httpConfig));
- http.setPort(port);
- http.setHost(host);
-
- server.addConnector(http);
+ server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 4f3e54b..b0c1326 100644
--- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -20,27 +20,15 @@ package org.apache.skywalking.aop.server.receiver.mesh;
import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
-import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
-import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
-import org.apache.skywalking.oap.server.core.source.All;
-import org.apache.skywalking.oap.server.core.source.DetectPoint;
-import org.apache.skywalking.oap.server.core.source.Endpoint;
-import org.apache.skywalking.oap.server.core.source.RequestType;
-import org.apache.skywalking.oap.server.core.source.Service;
-import org.apache.skywalking.oap.server.core.source.ServiceInstance;
-import org.apache.skywalking.oap.server.core.source.ServiceInstanceRelation;
-import org.apache.skywalking.oap.server.core.source.ServiceRelation;
-import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.register.service.*;
+import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source
@@ -103,22 +91,29 @@ public class TelemetryDataDispatcher {
private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) {
ServiceMeshMetric metric = decorator.getMetric();
+ int heartbeatCycle = 10000;
// source
- SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
int instanceId = metric.getSourceServiceInstanceId();
ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
- SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+ // trigger heartbeat every 10s.
+ SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime());
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ }
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}
// dest
- SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
instanceId = metric.getDestServiceInstanceId();
serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
if (Objects.nonNull(serviceInstanceInventory)) {
- SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ if (metric.getEndTime() - serviceInstanceInventory.getHeartbeatTime() > heartbeatCycle) {
+ // trigger heartbeat every 10s.
+ SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime());
+ SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime());
+ }
} else {
logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
}