You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/01/08 06:50:57 UTC
[incubator-skywalking] branch prometheus updated: Make telemetry
works.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch prometheus
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/prometheus by this push:
new c84db10 Make telemetry works.
c84db10 is described below
commit c84db10a452961cc2144308991d316b7bdbdce50
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Tue Jan 8 14:50:47 2019 +0800
Make telemetry works.
---
.../analysis/worker/IndicatorAggregateWorker.java | 2 +-
.../core/remote/client/GRPCRemoteClient.java | 4 +--
.../provider/IstioTelemetryGRPCHandler.java | 36 ++++++++++++++++------
.../server/telemetry/prometheus/BaseMetric.java | 24 ++++++++++++---
.../prometheus/PrometheusCounterMetric.java | 6 ++--
.../prometheus/PrometheusGaugeMetric.java | 6 ++--
.../prometheus/PrometheusHistogramMetric.java | 13 +++++---
7 files changed, 62 insertions(+), 29 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index ed199a7..9d28c1c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -56,7 +56,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
- new MetricTag.Keys("metricName", "level"), new MetricTag.Values(modelName, "1"));
+ new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
}
@Override public final void in(Indicator indicator) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index 6a1105c..6b3cf4f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -65,10 +65,10 @@ public class GRPCRemoteClient implements RemoteClient {
remoteOutCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_count", "The number(client side) of inside remote inside aggregate rpc.",
- new MetricTag.Keys("dest"), new MetricTag.Values(address.toString()));
+ new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
remoteOutErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricCreator.class)
.createCounter("remote_out_error_count", "The error number(client side) of inside remote inside aggregate rpc.",
- new MetricTag.Keys("dest"), new MetricTag.Values(address.toString()));
+ new MetricTag.Keys("dest", "self"), new MetricTag.Values(address.toString(), "N"));
}
@Override public void connect() {
diff --git a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
index 587b35e..bb725c6 100644
--- a/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
+++ b/oap-server/server-receiver-plugin/skywalking-istio-telemetry-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/istio/telemetry/provider/IstioTelemetryGRPCHandler.java
@@ -21,22 +21,18 @@ package org.apache.skywalking.oap.server.receiver.istio.telemetry.provider;
import com.google.common.base.Joiner;
import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
-import io.istio.HandleMetricServiceGrpc;
-import io.istio.IstioMetricProto;
+import io.istio.*;
import io.istio.api.mixer.adapter.model.v1beta1.ReportProto;
import io.istio.api.policy.v1beta1.TypeProto;
-import java.time.Duration;
-import java.time.Instant;
+import java.time.*;
import java.util.Map;
import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.common.DetectPoint;
-import org.apache.skywalking.apm.network.servicemesh.Protocol;
-import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
+import org.apache.skywalking.apm.network.servicemesh.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* Handle istio telemetry data.
@@ -99,9 +95,24 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
} else {
detectPoint = DetectPoint.server;
}
+
+ String sourceServiceName;
+ if (has(i, "sourceNamespace")) {
+ sourceServiceName = JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace"));
+ } else {
+ sourceServiceName = string(i, "sourceService");
+ }
+
+ String destServiceName;
+ if (has(i, "destinationNamespace")) {
+ destServiceName = JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace"));
+ } else {
+ destServiceName = string(i, "destinationService");
+ }
+
ServiceMeshMetric metric = ServiceMeshMetric.newBuilder().setStartTime(requestTime.toEpochMilli())
- .setEndTime(responseTime.toEpochMilli()).setSourceServiceName(JOINER.join(string(i, "sourceService"), string(i, "sourceNamespace")))
- .setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(JOINER.join(string(i, "destinationService"), string(i, "destinationNamespace")))
+ .setEndTime(responseTime.toEpochMilli()).setSourceServiceName(sourceServiceName)
+ .setSourceServiceInstance(string(i, "sourceUID")).setDestServiceName(destServiceName)
.setDestServiceInstance(string(i, "destinationUID")).setEndpoint(endpoint).setLatency(latency)
.setResponseCode(Math.toIntExact(responseCode)).setStatus(status).setProtocol(netProtocol).setDetectPoint(detectPoint).build();
logger.debug("Transformed metric {}", metric);
@@ -139,4 +150,9 @@ public class IstioTelemetryGRPCHandler extends HandleMetricServiceGrpc.HandleMet
throw new IllegalArgumentException(String.format("Lack dimension %s", key));
}
}
+
+ private boolean has(final IstioMetricProto.InstanceMsg instanceMsg, final String key) {
+ Map<String, TypeProto.Value> map = instanceMsg.getDimensionsMap();
+ return map.containsKey(key);
+ }
}
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
index dff3cc7..536154d 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/BaseMetric.java
@@ -18,6 +18,8 @@
package org.apache.skywalking.oap.server.telemetry.prometheus;
+import io.prometheus.client.SimpleCollector;
+import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.telemetry.api.*;
@@ -26,8 +28,10 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
*
* @author wusheng
*/
-public abstract class BaseMetric<T> {
- private volatile T metricInstance;
+public abstract class BaseMetric<T extends SimpleCollector, C> {
+ private static Map<String, Object> ALL_METRICS = new HashMap<>();
+
+ private volatile C metricInstance;
protected final String name;
protected final String tips;
protected final MetricTag.Keys labels;
@@ -46,7 +50,7 @@ public abstract class BaseMetric<T> {
return TelemetryRelatedContext.INSTANCE.getId() != null;
}
- protected T getMetric() {
+ protected C getMetric() {
if (metricInstance == null) {
if (isIDReady()) {
lock.lock();
@@ -64,7 +68,17 @@ public abstract class BaseMetric<T> {
labelValues[i + 1] = values.getValues()[i];
}
- metricInstance = create(labelNames, labelValues);
+ if (!ALL_METRICS.containsKey(name)) {
+ synchronized (ALL_METRICS) {
+ if (!ALL_METRICS.containsKey(name)) {
+ ALL_METRICS.put(name, create(labelNames));
+ }
+ }
+ }
+
+ T metric = (T)ALL_METRICS.get(name);
+
+ metricInstance = (C)metric.labels(labelValues);
}
} finally {
lock.unlock();
@@ -75,5 +89,5 @@ public abstract class BaseMetric<T> {
return metricInstance;
}
- protected abstract T create(String[] labelNames, String[] labelValues);
+ protected abstract T create(String[] labelNames);
}
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
index beb0e88..cae0edb 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusCounterMetric.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
*
* @author wusheng
*/
-public class PrometheusCounterMetric extends BaseMetric<Counter.Child> implements CounterMetric {
+public class PrometheusCounterMetric extends BaseMetric<Counter, Counter.Child> implements CounterMetric {
public PrometheusCounterMetric(String name, String tips,
MetricTag.Keys labels, MetricTag.Values values) {
@@ -47,8 +47,8 @@ public class PrometheusCounterMetric extends BaseMetric<Counter.Child> implement
}
}
- @Override protected Counter.Child create(String[] labelNames, String[] labelValues) {
+ @Override protected Counter create(String[] labelNames) {
return Counter.build()
- .name(name).help(tips).labelNames(labelNames).register().labels(labelValues);
+ .name(name).help(tips).labelNames(labelNames).register();
}
}
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
index ad0f947..c651725 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusGaugeMetric.java
@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.telemetry.api.*;
*
* @author wusheng
*/
-public class PrometheusGaugeMetric extends BaseMetric<Gauge.Child> implements GaugeMetric {
+public class PrometheusGaugeMetric extends BaseMetric<Gauge, Gauge.Child> implements GaugeMetric {
public PrometheusGaugeMetric(String name, String tips,
MetricTag.Keys labels,
MetricTag.Values values) {
@@ -68,8 +68,8 @@ public class PrometheusGaugeMetric extends BaseMetric<Gauge.Child> implements Ga
}
}
- @Override protected Gauge.Child create(String[] labelNames, String[] labelValues) {
+ @Override protected Gauge create(String[] labelNames) {
return Gauge.build()
- .name(name).help(tips).labelNames(labelNames).register().labels(labelValues);
+ .name(name).help(tips).labelNames(labelNames).register();
}
}
diff --git a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
index 9dc116b..a2ad2ec 100644
--- a/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
+++ b/oap-server/server-telemetry/telemetry-prometheus/src/main/java/org/apache/skywalking/oap/server/telemetry/prometheus/PrometheusHistogramMetric.java
@@ -37,22 +37,25 @@ public class PrometheusHistogramMetric extends HistogramMetric {
}
@Override public void observe(double value) {
-
+ Histogram.Child metric = inner.getMetric();
+ if (metric != null) {
+ metric.observe(value);
+ }
}
- class InnerMetricObject extends BaseMetric<Histogram.Child> {
+ class InnerMetricObject extends BaseMetric<Histogram, Histogram.Child> {
public InnerMetricObject(String name, String tips, MetricTag.Keys labels,
MetricTag.Values values) {
super(name, tips, labels, values);
}
- @Override protected Histogram.Child create(String[] labelNames, String[] labelValues) {
+ @Override protected Histogram create(String[] labelNames) {
Histogram.Builder builder = Histogram.build()
.name(name).help(tips);
- if (builder != null) {
+ if (builder != null && buckets.length > 0) {
builder = builder.buckets(buckets);
}
- return builder.labelNames(labelNames).register().labels(labelValues);
+ return builder.labelNames(labelNames).register();
}
}
}