You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2019/07/02 23:32:37 UTC

[skywalking] 01/01: Adding deadline to gRPC client

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch grpc/deadline
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 4bcb651d8eaafb1f0e7adff7c4d245542ccb62d3
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jul 3 07:23:58 2019 +0800

    Adding deadline to gRPC client
    
     * Set up 10 seconds deadline after gRPC client sending
     * The duration of deadline contains three segments: connecting,
       request and response
    
    For blocking stub, I just set deadline before invoke service. For
    bi-streaming stub, I found all of them are used as a blocking style,
    that after getting streaming response client stub just close current
    streaming. Base on above reality, I pick the same way as blocking one.
---
 .../org/apache/skywalking/apm/agent/core/jvm/JVMService.java |  2 +-
 .../agent/core/remote/ServiceAndEndpointRegisterClient.java  | 12 +++++++-----
 .../apm/agent/core/remote/TraceSegmentServiceClient.java     |  4 +++-
 .../oap/server/exporter/provider/grpc/GRPCExporter.java      |  5 +++--
 .../oap/server/core/remote/client/GRPCRemoteClient.java      |  3 ++-
 5 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 791375f..6294b8b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -138,7 +138,7 @@ public class JVMService implements BootService, Runnable {
                         if (buffer.size() > 0) {
                             builder.addAllMetrics(buffer);
                             builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
-                            stub.collect(builder.build());
+                            stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
                         }
                     } catch (Throwable t) {
                         logger.error(t, "send JVM metrics to Collector fail.");
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index b3e29de..a112b56 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.skywalking.apm.agent.core.boot.BootService;
 import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
 import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
@@ -112,7 +113,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
             try {
                 if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
                     if (registerBlockingStub != null) {
-                        ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(
+                        ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
                             Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
                         if (serviceRegisterMapping != null) {
                             for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
@@ -127,7 +128,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
                     if (registerBlockingStub != null) {
                         if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
 
-                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder()
+                            ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                                    .doServiceInstanceRegister(ServiceInstances.newBuilder()
                                 .addInstances(
                                     ServiceInstance.newBuilder()
                                         .setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
@@ -144,14 +146,14 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
                                 }
                             }
                         } else {
-                            serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder()
+                            serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doPing(ServiceInstancePingPkg.newBuilder()
                                 .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
                                 .setTime(System.currentTimeMillis())
                                 .setServiceInstanceUUID(INSTANCE_UUID)
                                 .build());
 
-                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
-                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
+                            NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
+                            EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
                         }
                     }
                 }
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 221d982..a58f6c8 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.remote;
 import io.grpc.Channel;
 import io.grpc.stub.StreamObserver;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.skywalking.apm.agent.core.boot.*;
 import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
@@ -85,7 +87,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
     public void consume(List<TraceSegment> data) {
         if (CONNECTED.equals(status)) {
             final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
-            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Commands>() {
+            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                 @Override
                 public void onNext(Commands commands) {
 
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 2e28322..136c2aa 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -69,7 +70,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
     }
 
     public void initSubscriptionList() {
-        SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
+        SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
         subscription.getMetricNamesList().forEach(subscriptionSet::add);
         logger.debug("Get exporter subscription list, {}", subscriptionSet);
     }
@@ -84,7 +85,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
         }
 
         ExportStatus status = new ExportStatus();
-        StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.export(
+        StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
             new StreamObserver<ExportResponse>() {
                 @Override public void onNext(ExportResponse response) {
 
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 6a920a5..4927b97 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
 import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
@@ -183,7 +184,7 @@ public class GRPCRemoteClient implements RemoteClient {
             }
         }
 
-        return getStub().call(new StreamObserver<Empty>() {
+        return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }