You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/09/20 02:14:52 UTC

[skywalking] 01/01: Support timeout configuration in agent and backend.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch timeout-config
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 85075a62b343475050242b5382b0e200cd2665ae
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Fri Sep 20 10:14:37 2019 +0800

    Support timeout configuration in agent and backend.
---
 .../org/apache/skywalking/apm/agent/core/conf/Config.java    |  4 ++++
 .../org/apache/skywalking/apm/agent/core/jvm/JVMService.java |  4 +++-
 .../agent/core/remote/ServiceAndEndpointRegisterClient.java  | 12 +++++++-----
 .../apm/agent/core/remote/TraceSegmentServiceClient.java     |  3 ++-
 docs/en/setup/service-agent/java-agent/README.md             |  1 +
 .../apache/skywalking/oap/server/core/CoreModuleConfig.java  |  4 ++++
 .../skywalking/oap/server/core/CoreModuleProvider.java       |  2 +-
 .../oap/server/core/remote/client/GRPCRemoteClient.java      |  6 ++++--
 .../oap/server/core/remote/client/RemoteClientManager.java   | 11 +++++++++--
 .../core/remote/client/RemoteClientManagerTestCase.java      |  2 +-
 10 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 5a8eca4..def4cd5 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -132,6 +132,10 @@ public class Config {
          * Collector skywalking trace receiver service addresses.
          */
         public static String BACKEND_SERVICE = "";
+        /**
+         * How long grpc client will timeout in sending data to upstream.
+         */
+        public static int GRPC_UPSTREAM_TIMEOUT = 30;
     }
 
     public static class Jvm {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 040d733..598cd36 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -47,6 +47,8 @@ import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
 import org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
 /**
  * The <code>JVMService</code> represents a timer, which collectors JVM cpu, memory, memorypool and gc info, and send
  * the collected info to Collector through the channel provided by {@link GRPCChannelManager}
@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable {
                         if (buffer.size() > 0) {
                             builder.addAllMetrics(buffer);
                             builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
-                            Commands commands = stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
+                            Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());
                             ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                         }
                     } catch (Throwable t) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index 92c22a5..9c68ac3 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.apm.util.StringUtil;
 
+import static org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
 /**
  * @author wusheng
  */
@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
             try {
                 if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
                     if (registerBlockingStub != null) {
-                        ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
+                        ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).doServiceRegister(
                             Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
                         if (serviceRegisterMapping != null) {
                             for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
                     if (registerBlockingStub != null) {
                         if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
 
-                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                     .doServiceInstanceRegister(ServiceInstances.newBuilder()
                                 .addInstances(
                                     ServiceInstance.newBuilder()
@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
                                 }
                             }
                         } else {
-                            final Commands commands = serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                            final Commands commands = serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                 .doPing(ServiceInstancePingPkg.newBuilder()
                                 .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
                                 .setTime(System.currentTimeMillis())
                                 .setServiceInstanceUUID(INSTANCE_UUID)
                                 .build());
 
-                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
-                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
+                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
+                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS));
                             ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                         }
                     }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 3ffacff..edea03b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.skywalking.apm.agent.core.boot.*;
 import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import org.apache.skywalking.apm.agent.core.logging.api.*;
@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
     public void consume(List<TraceSegment> data) {
         if (CONNECTED.equals(status)) {
             final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
-            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
+            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                 @Override
                 public void onNext(Commands commands) {
                     ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md
index e3fa37d..bfa95e4 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -82,6 +82,7 @@ property key | Description | Default |
 `collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
 `collector.app_and_service_register_check_interval`|application and service registry check interval.|`3`|
 `collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
+`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
 `logging.level`|The log level. Default is debug.|`DEBUG`|
 `logging.file_name`|Log file name.|`skywalking-api.log`|
 `logging.output`| Log output. Default is FILE. Use CONSOLE means output to stdout. |`FILE`|
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index c7c26e1..1858116 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private int monthMetricsDataTTL;
     @Setter private int gRPCThreadPoolSize;
     @Setter private int gRPCThreadPoolQueueSize;
+    /**
+     * Timeout for cluster internal communication, in seconds.
+     */
+    @Setter private int remoteTimeout = 20;
 
     CoreModuleConfig() {
         this.downsampling = new ArrayList<>();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 86c611f..3bc4e9a 100755
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
         annotationScan.registerListener(streamAnnotationListener);
 
-        this.remoteClientManager = new RemoteClientManager(getManager());
+        this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
         this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
 
         MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index fd80c9c..e671ef4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient {
     private boolean isConnect;
     private CounterMetrics remoteOutCounter;
     private CounterMetrics remoteOutErrorCounter;
+    private int remoteTimeout;
 
     public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address address, int channelSize,
-        int bufferSize) {
+        int bufferSize, int remoteTimeout) {
         this.address = address;
         this.channelSize = channelSize;
         this.bufferSize = bufferSize;
+        this.remoteTimeout = remoteTimeout;
 
         remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
             .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient {
             }
         }
 
-        return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
+        return getStub().withDeadlineAfter(remoteTimeout, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 06a963f..e749c66 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service {
     private final List<RemoteClient> clientsB;
     private volatile List<RemoteClient> usingClients;
     private GaugeMetrics gauge;
+    private int remoteTimeout;
 
-    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+    /**
+     * Initial the manager for all remote communication clients.
+     * @param moduleDefineHolder for looking up other modules
+     * @param remoteTimeout for cluster internal communication, in seconds.
+     */
+    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
         this.moduleDefineHolder = moduleDefineHolder;
         this.clientsA = new LinkedList<>();
         this.clientsB = new LinkedList<>();
         this.usingClients = clientsA;
+        this.remoteTimeout = remoteTimeout;
     }
 
     public void start() {
@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service {
                         RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
                         getFreeClients().add(client);
                     } else {
-                        RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
+                        RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
                         client.connect();
                         getFreeClients().add(client);
                     }
diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index e83aa63..c561273 100644
--- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase {
         moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
         telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class, metricsCreator);
 
-        RemoteClientManager clientManager = new RemoteClientManager(moduleManager);
+        RemoteClientManager clientManager = new RemoteClientManager(moduleManager, 10);
 
         when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
         clientManager.refresh();