You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/20 02:14:52 UTC
[skywalking] 01/01: Support timeout configuration in agent and
backend.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch timeout-config
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 85075a62b343475050242b5382b0e200cd2665ae
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Sep 20 10:14:37 2019 +0800
Support timeout configuration in agent and backend.
---
.../org/apache/skywalking/apm/agent/core/conf/Config.java | 4 ++++
.../org/apache/skywalking/apm/agent/core/jvm/JVMService.java | 4 +++-
.../agent/core/remote/ServiceAndEndpointRegisterClient.java | 12 +++++++-----
.../apm/agent/core/remote/TraceSegmentServiceClient.java | 3 ++-
docs/en/setup/service-agent/java-agent/README.md | 1 +
.../apache/skywalking/oap/server/core/CoreModuleConfig.java | 4 ++++
.../skywalking/oap/server/core/CoreModuleProvider.java | 2 +-
.../oap/server/core/remote/client/GRPCRemoteClient.java | 6 ++++--
.../oap/server/core/remote/client/RemoteClientManager.java | 11 +++++++++--
.../core/remote/client/RemoteClientManagerTestCase.java | 2 +-
10 files changed, 36 insertions(+), 13 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 5a8eca4..def4cd5 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -132,6 +132,10 @@ public class Config {
* Collector skywalking trace receiver service addresses.
*/
public static String BACKEND_SERVICE = "";
+ /**
+ * How long grpc client will timeout in sending data to upstream.
+ */
+ public static int GRPC_UPSTREAM_TIMEOUT = 30;
}
public static class Jvm {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 040d733..598cd36 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
/**
* The <code>JVMService</code> represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send
* the collected info to Collector through the channel provided by {@link GRPCChannelManager}
@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
- Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
+ Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
} catch (Throwable t) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index 92c22a5..9c68ac3 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
/**
* @author wusheng
*/
@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if (registerBlockingStub != null) {
- ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
+ ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
if (serviceRegisterMapping != null) {
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
if (registerBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
- ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(
ServiceInstance.newBuilder()
@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
}
}
} else {
- final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
- NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
- EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
+ NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
+ EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 3ffacff..edea03b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.logging.api.*;
@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
- StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
+ StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md
index e3fa37d..bfa95e4 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -82,6 +82,7 @@ property key | Description | Default |
`collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
`collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
+`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
`logging.level`|The log level. Default is debug.|`DEBUG`|
`logging.file_name`|Log file name.|`skywalking-api.log`|
`logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index c7c26e1..1858116 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int monthMetricsDataTTL;
@Setter private int gRPCThreadPoolSize;
@Setter private int gRPCThreadPoolQueueSize;
+ /**
+ * Timeout for cluster internal communication, in seconds.
+ */
+ @Setter private int remoteTimeout = 20;
CoreModuleConfig() {
this.downsampling = new ArrayList<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 86c611f..3bc4e9a 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener);
- this.remoteClientManager = new RemoteClientManager(getManager());
+ this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index fd80c9c..e671ef4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient {
private boolean isConnect;
private CounterMetrics remoteOutCounter;
private CounterMetrics remoteOutErrorCounter;
+ private int remoteTimeout;
public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize,
- int bufferSize) {
+ int bufferSize, int remoteTimeout) {
this.address = address;
this.channelSize = channelSize;
this.bufferSize = bufferSize;
+ this.remoteTimeout = remoteTimeout;
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient {
}
}
- return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
+ return getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 06a963f..e749c66 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service {
private final List<RemoteClient> clientsB;
private volatile List<RemoteClient> usingClients;
private GaugeMetrics gauge;
+ private int remoteTimeout;
- public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+ /**
+ * Initial the manager for all remote communication clients.
+ * @param moduleDefineHolder for looking up other modules
+ * @param remoteTimeout for cluster internal communication, in seconds.
+ */
+ public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
this.moduleDefineHolder = moduleDefineHolder;
this.clientsA = new LinkedList<>();
this.clientsB = new LinkedList<>();
this.usingClients = clientsA;
+ this.remoteTimeout = remoteTimeout;
}
public void start() {
@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service {
RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
getFreeClients().add(client);
} else {
- RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
+ RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
client.connect();
getFreeClients().add(client);
}
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index e83aa63..c561273 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase {
moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
- RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+ RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10);
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
clientManager.refresh();