You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/23 16:49:30 UTC
[incubator-skywalking] branch exporter-impl updated: Support
subscription from remote.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch exporter-impl
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/exporter-impl by this push:
new 6334e20 Support subscription from remote.
6334e20 is described below
commit 6334e20b62e9bb76198e2dc1922e70073f58a7b4
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Mar 23 09:49:14 2019 -0700
Support subscription from remote.
---
.../server/exporter/provider/grpc/GRPCExporter.java | 18 +++++++++++++++---
.../exporter/provider/grpc/GRPCExporterProvider.java | 2 ++
.../exporter/src/main/proto/metric-exporter.proto | 11 +++++++++++
.../exporter/provider/grpc/ExporterMockReceiver.java | 7 +++++++
.../server-starter/src/main/resources/application.yml | 8 ++++----
5 files changed, 39 insertions(+), 7 deletions(-)
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 3869ca9..730a3f1 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -39,8 +39,10 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
private GRPCExporterSetting setting;
- private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+ private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+ private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
private final DataCarrier exportBuffer;
+ private final Set<String> subscriptionSet;
public GRPCExporter(GRPCExporterSetting setting) {
this.setting = setting;
@@ -48,12 +50,22 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
client.connect();
ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
+ blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
+ subscriptionSet = new HashSet<>();
}
@Override public void export(IndicatorMetaInfo meta, Indicator indicator) {
- exportBuffer.produce(new ExportData(meta, indicator));
+ if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getIndicatorName())) {
+ exportBuffer.produce(new ExportData(meta, indicator));
+ }
+ }
+
+ public void initSubscriptionList() {
+ SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
+ subscription.getMetricNamesList().forEach(subscriptionSet::add);
+ logger.debug("Get exporter subscription list, {}", subscriptionSet);
}
@Override public void init() {
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
index ac11bb3..84de719 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
@@ -56,6 +56,8 @@ public class GRPCExporterProvider extends ModuleProvider {
exporter.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
exporter.setServiceInstanceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class));
exporter.setEndpointInventoryCache(getManager().find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class));
+
+ exporter.initSubscriptionList();
}
@Override public String[] requiredModules() {
diff --git a/oap-server/exporter/src/main/proto/metric-exporter.proto b/oap-server/exporter/src/main/proto/metric-exporter.proto
index 180d7df..a612b28 100644
--- a/oap-server/exporter/src/main/proto/metric-exporter.proto
+++ b/oap-server/exporter/src/main/proto/metric-exporter.proto
@@ -25,6 +25,9 @@ option java_package = "org.apache.skywalking.oap.server.exporter.grpc";
service MetricExportService {
rpc export (stream ExportMetricValue) returns (ExportResponse) {
}
+
+ rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
+ }
}
message ExportMetricValue {
@@ -37,10 +40,18 @@ message ExportMetricValue {
double doubleValue = 7;
}
+message SubscriptionsResp {
+ repeated string metricNames = 1;
+}
+
enum ValueType {
LONG = 0;
DOUBLE = 1;
}
+message SubscriptionReq {
+
+}
+
message ExportResponse {
}
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
index 2309755..57da559 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
@@ -50,5 +50,12 @@ public class ExporterMockReceiver {
}
};
}
+
+ @Override
+ public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) {
+ responseObserver.onNext(SubscriptionsResp.newBuilder()
+ .addMetricNames("all_p99").addMetricNames("service_cpm").addMetricNames("endpoint_sla").build());
+ responseObserver.onCompleted();
+ }
}
}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 172c9dc..fd21669 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -110,7 +110,7 @@ alarm:
default:
telemetry:
prometheus:
-#exporter:
-# grpc:
-# targetHost: 127.0.0.1
-# targetPort: 9870
\ No newline at end of file
+exporter:
+ grpc:
+ targetHost: 127.0.0.1
+ targetPort: 9870
\ No newline at end of file