You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2019/07/02 23:32:37 UTC
[skywalking] 01/01: Adding deadline to gRPC client
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch grpc/deadline
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4bcb651d8eaafb1f0e7adff7c4d245542ccb62d3
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jul 3 07:23:58 2019 +0800
Adding deadline to gRPC client
* Set up 10 seconds deadline after gRPC client sending
* The duration of deadline contains three segments: connecting,
request and response
For blocking stub, I just set deadline before invoke service. For
bi-streaming stub, I found all of them are used as a blocking style,
that after getting streaming response client stub just close current
streaming. Base on above reality, I pick the same way as blocking one.
---
.../org/apache/skywalking/apm/agent/core/jvm/JVMService.java | 2 +-
.../agent/core/remote/ServiceAndEndpointRegisterClient.java | 12 +++++++-----
.../apm/agent/core/remote/TraceSegmentServiceClient.java | 4 +++-
.../oap/server/exporter/provider/grpc/GRPCExporter.java | 5 +++--
.../oap/server/core/remote/client/GRPCRemoteClient.java | 3 ++-
5 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 791375f..6294b8b 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -138,7 +138,7 @@ public class JVMService implements BootService, Runnable {
if (buffer.size() > 0) {
builder.addAllMetrics(buffer);
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
- stub.collect(builder.build());
+ stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build());
}
} catch (Throwable t) {
logger.error(t, "send JVM metrics to Collector fail.");
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index b3e29de..a112b56 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
@@ -112,7 +113,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
try {
if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) {
if (registerBlockingStub != null) {
- ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister(
+ ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
if (serviceRegisterMapping != null) {
for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) {
@@ -127,7 +128,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
if (registerBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) {
- ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder()
+ ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+ .doServiceInstanceRegister(ServiceInstances.newBuilder()
.addInstances(
ServiceInstance.newBuilder()
.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID)
@@ -144,14 +146,14 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable,
}
}
} else {
- serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder()
+ serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doPing(ServiceInstancePingPkg.newBuilder()
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
.setTime(System.currentTimeMillis())
.setServiceInstanceUUID(INSTANCE_UUID)
.build());
- NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
- EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub);
+ NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
+ EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS));
}
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 221d982..a58f6c8 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.skywalking.apm.agent.core.boot.*;
import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
@@ -85,7 +87,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
- StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Commands>() {
+ StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 2e28322..136c2aa 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -69,7 +70,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
public void initSubscriptionList() {
- SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
+ SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
subscription.getMetricNamesList().forEach(subscriptionSet::add);
logger.debug("Get exporter subscription list, {}", subscriptionSet);
}
@@ -84,7 +85,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
}
ExportStatus status = new ExportStatus();
- StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.export(
+ StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
new StreamObserver<ExportResponse>() {
@Override public void onNext(ExportResponse response) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 6a920a5..4927b97 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
@@ -183,7 +184,7 @@ public class GRPCRemoteClient implements RemoteClient {
}
}
- return getStub().call(new StreamObserver<Empty>() {
+ return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
@Override public void onNext(Empty empty) {
}