You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/01/08 06:50:57 UTC

[incubator-skywalking] branch prometheus updated: Make telemetry works.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch prometheus
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/prometheus by this push:
     new c84db10  Make telemetry works.
c84db10 is described below

commit c84db10a452961cc2144308991d316b7bdbdce50
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Jan 8 14:50:47 2019 +0800

    Make telemetry works.
---
 .../analysis/worker/IndicatorAggregateWorker.java  |  2 +-
 .../core/remote/client/GRPCRemoteClient.java       |  4 +--
 .../provider/IstioTelemetryGRPCHandler.java        | 36 ++++++++++++++++------
 .../server/telemetry/prometheus/BaseMetric.java    | 24 ++++++++++++---
 .../prometheus/PrometheusCounterMetric.java        |  6 ++--
 .../prometheus/PrometheusGaugeMetric.java          |  6 ++--
 .../prometheus/PrometheusHistogramMetric.java      | 13 +++++---
 7 files changed, 62 insertions(+), 29 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index ed199a7..9d28c1c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -56,7 +56,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
 
         MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
         aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
-            new MetricTag.Keys("metricName", "level"), new MetricTag.Values(modelName, "1"));
+            new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
     }
 
     @Override public final void in(Indicator indicator) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 6a1105c..6b3cf4f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -65,10 +65,10 @@ public class GRPCRemoteClient implements RemoteClient {
 
         remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
             .createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
-                new MetricTag.Keys("dest"), new MetricTag.Values(address.toString()));
+                new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
         remoteOutErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
             .createCounter("remote_out_error_count", "The error number(client side) of inside remote inside aggregate rpc.",
-                new MetricTag.Keys("dest"), new MetricTag.Values(address.toString()));
+                new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
     }
 
     @Override public void connect() {
diff --git a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
index 587b35e..bb725c6 100644
--- a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
@@ -21,22 +21,18 @@ package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
 import com.google.common.base.Joiner;
 import com.google.protobuf.Timestamp;
 import io.grpc.stub.StreamObserver;
-import io.istio.HandleMetricServiceGrpc;
-import io.istio.IstioMetricProto;
+import io.istio.*;
 import io.istio.api.mixer.adapter.model.v1beta1.ReportProto;
 import io.istio.api.policy.v1beta1.TypeProto;
-import java.time.Duration;
-import java.time.Instant;
+import java.time.*;
 import java.util.Map;
 import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
 import org.apache.skywalking.apm.network.common.DetectPoint;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
 import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
 
 /**
  * Handle istio telemetry data.
@@ -99,9 +95,24 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
                 } else {
                     detectPoint = DetectPoint.server;
                 }
+
+                String sourceServiceName;
+                if (has(i, "sourceNamespace")) {
+                    sourceServiceName = JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace"));
+                } else {
+                    sourceServiceName = string(i, "sourceService");
+                }
+
+                String destServiceName;
+                if (has(i, "destinationNamespace")) {
+                    destServiceName = JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace"));
+                } else {
+                    destServiceName = string(i, "destinationService");
+                }
+
                 ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(requestTime.toEpochMilli())
-                    .setEndTime(responseTime.toEpochMilli()).setSourceServiceName(JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace")))
-                    .setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace")))
+                    .setEndTime(responseTime.toEpochMilli()).setSourceServiceName(sourceServiceName)
+                    .setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(destServiceName)
                     .setDestServiceInstance(string(i, "destinationUID")).setEndpoint(endpoint).setLatency(latency)
                     .setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(netProtocol).setDetectPoint(detectPoint).build();
                 logger.debug("Transformed metric {}", metric);
@@ -139,4 +150,9 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
             throw new IllegalArgumentException(String.format("Lack dimension %s", key));
         }
     }
+
+    private boolean has(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
+        Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
+        return map.containsKey(key);
+    }
 }
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
index dff3cc7..536154d 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
@@ -18,6 +18,8 @@
 
 package org.apache.skywalking.oap.server.telemetry.prometheus;
 
+import io.prometheus.client.SimpleCollector;
+import java.util.*;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.skywalking.oap.server.telemetry.api.*;
 
@@ -26,8 +28,10 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
  *
  * @author wusheng
  */
-public abstract class BaseMetric<T> {
-    private volatile T metricInstance;
+public abstract class BaseMetric<T extends SimpleCollector, C> {
+    private static Map<String, Object> ALL_METRICS = new HashMap<>();
+
+    private volatile C metricInstance;
     protected final String name;
     protected final String tips;
     protected final MetricTag.Keys labels;
@@ -46,7 +50,7 @@ public abstract class BaseMetric<T> {
         return TelemetryRelatedContext.INSTANCE.getId() != null;
     }
 
-    protected T getMetric() {
+    protected C getMetric() {
         if (metricInstance == null) {
             if (isIDReady()) {
                 lock.lock();
@@ -64,7 +68,17 @@ public abstract class BaseMetric<T> {
                             labelValues[i + 1] = values.getValues()[i];
                         }
 
-                        metricInstance = create(labelNames, labelValues);
+                        if (!ALL_METRICS.containsKey(name)) {
+                            synchronized (ALL_METRICS) {
+                                if (!ALL_METRICS.containsKey(name)) {
+                                    ALL_METRICS.put(name, create(labelNames));
+                                }
+                            }
+                        }
+
+                        T metric = (T)ALL_METRICS.get(name);
+
+                        metricInstance = (C)metric.labels(labelValues);
                     }
                 } finally {
                     lock.unlock();
@@ -75,5 +89,5 @@ public abstract class BaseMetric<T> {
         return metricInstance;
     }
 
-    protected abstract T create(String[] labelNames, String[] labelValues);
+    protected abstract T create(String[] labelNames);
 }
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
index beb0e88..cae0edb 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
  *
  * @author wusheng
  */
-public class PrometheusCounterMetric extends BaseMetric<Counter.Child> implements CounterMetric {
+public class PrometheusCounterMetric extends BaseMetric<Counter, Counter.Child> implements CounterMetric {
 
     public PrometheusCounterMetric(String name, String tips,
         MetricTag.Keys labels, MetricTag.Values values) {
@@ -47,8 +47,8 @@ public class PrometheusCounterMetric extends BaseMetric<Counter.Child> implement
         }
     }
 
-    @Override protected Counter.Child create(String[] labelNames, String[] labelValues) {
+    @Override protected Counter create(String[] labelNames) {
         return Counter.build()
-            .name(name).help(tips).labelNames(labelNames).register().labels(labelValues);
+            .name(name).help(tips).labelNames(labelNames).register();
     }
 }
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
index ad0f947..c651725 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
  *
  * @author wusheng
  */
-public class PrometheusGaugeMetric extends BaseMetric<Gauge.Child> implements GaugeMetric {
+public class PrometheusGaugeMetric extends BaseMetric<Gauge, Gauge.Child> implements GaugeMetric {
     public PrometheusGaugeMetric(String name, String tips,
         MetricTag.Keys labels,
         MetricTag.Values values) {
@@ -68,8 +68,8 @@ public class PrometheusGaugeMetric extends BaseMetric<Gauge.Child> implements Ga
         }
     }
 
-    @Override protected Gauge.Child create(String[] labelNames, String[] labelValues) {
+    @Override protected Gauge create(String[] labelNames) {
         return Gauge.build()
-            .name(name).help(tips).labelNames(labelNames).register().labels(labelValues);
+            .name(name).help(tips).labelNames(labelNames).register();
     }
 }
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
index 9dc116b..a2ad2ec 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
@@ -37,22 +37,25 @@ public class PrometheusHistogramMetric extends HistogramMetric {
     }
 
     @Override public void observe(double value) {
-
+        Histogram.Child metric = inner.getMetric();
+        if (metric != null) {
+            metric.observe(value);
+        }
     }
 
-    class InnerMetricObject extends BaseMetric<Histogram.Child> {
+    class InnerMetricObject extends BaseMetric<Histogram, Histogram.Child> {
         public InnerMetricObject(String name, String tips, MetricTag.Keys labels,
             MetricTag.Values values) {
             super(name, tips, labels, values);
         }
 
-        @Override protected Histogram.Child create(String[] labelNames, String[] labelValues) {
+        @Override protected Histogram create(String[] labelNames) {
             Histogram.Builder builder = Histogram.build()
                 .name(name).help(tips);
-            if (builder != null) {
+            if (builder != null && buckets.length > 0) {
                 builder = builder.buckets(buckets);
             }
-            return builder.labelNames(labelNames).register().labels(labelValues);
+            return builder.labelNames(labelNames).register();
         }
     }
 }