You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/03/23 16:49:30 UTC

[incubator-skywalking] branch exporter-impl updated: Support subscription from remote.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch exporter-impl
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/exporter-impl by this push:
     new 6334e20  Support subscription from remote.
6334e20 is described below

commit 6334e20b62e9bb76198e2dc1922e70073f58a7b4
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Mar 23 09:49:14 2019 -0700

    Support subscription from remote.
---
 .../server/exporter/provider/grpc/GRPCExporter.java    | 18 +++++++++++++++---
 .../exporter/provider/grpc/GRPCExporterProvider.java   |  2 ++
 .../exporter/src/main/proto/metric-exporter.proto      | 11 +++++++++++
 .../exporter/provider/grpc/ExporterMockReceiver.java   |  7 +++++++
 .../server-starter/src/main/resources/application.yml  |  8 ++++----
 5 files changed, 39 insertions(+), 7 deletions(-)

diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
index 3869ca9..730a3f1 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc;
 
 import io.grpc.ManagedChannel;
 import io.grpc.stub.StreamObserver;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.*;
 import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
@@ -39,8 +39,10 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
     private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
 
     private GRPCExporterSetting setting;
-    private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+    private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
+    private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
     private final DataCarrier exportBuffer;
+    private final Set<String> subscriptionSet;
 
     public GRPCExporter(GRPCExporterSetting setting) {
         this.setting = setting;
@@ -48,12 +50,22 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS
         client.connect();
         ManagedChannel channel = client.getChannel();
         exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
+        blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
         exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
         exportBuffer.consume(this, 1, 200);
+        subscriptionSet = new HashSet<>();
     }
 
     @Override public void export(IndicatorMetaInfo meta, Indicator indicator) {
-        exportBuffer.produce(new ExportData(meta, indicator));
+        if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getIndicatorName())) {
+            exportBuffer.produce(new ExportData(meta, indicator));
+        }
+    }
+
+    public void initSubscriptionList() {
+        SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build());
+        subscription.getMetricNamesList().forEach(subscriptionSet::add);
+        logger.debug("Get exporter subscription list, {}", subscriptionSet);
     }
 
     @Override public void init() {
diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
index ac11bb3..84de719 100644
--- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
+++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
@@ -56,6 +56,8 @@ public class GRPCExporterProvider extends ModuleProvider {
         exporter.setServiceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class));
         exporter.setServiceInstanceInventoryCache(getManager().find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class));
         exporter.setEndpointInventoryCache(getManager().find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class));
+
+        exporter.initSubscriptionList();
     }
 
     @Override public String[] requiredModules() {
diff --git a/oap-server/exporter/src/main/proto/metric-exporter.proto b/oap-server/exporter/src/main/proto/metric-exporter.proto
index 180d7df..a612b28 100644
--- a/oap-server/exporter/src/main/proto/metric-exporter.proto
+++ b/oap-server/exporter/src/main/proto/metric-exporter.proto
@@ -25,6 +25,9 @@ option java_package = "org.apache.skywalking.oap.server.exporter.grpc";
 service MetricExportService {
     rpc export (stream ExportMetricValue) returns (ExportResponse) {
     }
+
+    rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
+    }
 }
 
 message ExportMetricValue {
@@ -37,10 +40,18 @@ message ExportMetricValue {
     double doubleValue = 7;
 }
 
+message SubscriptionsResp {
+    repeated string metricNames = 1;
+}
+
 enum ValueType {
     LONG = 0;
     DOUBLE = 1;
 }
 
+message SubscriptionReq {
+
+}
+
 message ExportResponse {
 }
diff --git a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
index 2309755..57da559 100644
--- a/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
+++ b/oap-server/exporter/src/test/java/org/apache/skywalking/oap/server/exporter/provider/grpc/ExporterMockReceiver.java
@@ -50,5 +50,12 @@ public class ExporterMockReceiver {
                 }
             };
         }
+
+        @Override
+        public void subscription(SubscriptionReq request, StreamObserver<SubscriptionsResp> responseObserver) {
+            responseObserver.onNext(SubscriptionsResp.newBuilder()
+                .addMetricNames("all_p99").addMetricNames("service_cpm").addMetricNames("endpoint_sla").build());
+            responseObserver.onCompleted();
+        }
     }
 }
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index 172c9dc..fd21669 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -110,7 +110,7 @@ alarm:
   default:
 telemetry:
   prometheus:
-#exporter:
-#  grpc:
-#    targetHost: 127.0.0.1
-#    targetPort: 9870
\ No newline at end of file
+exporter:
+  grpc:
+    targetHost: 127.0.0.1
+    targetPort: 9870
\ No newline at end of file