You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2021/03/08 12:35:58 UTC

[GitHub] [skywalking] muse-dev[bot] commented on a change in pull request #6516: Add telemetry data about metrics in, metrics scratching and trace in metrics to zipkin receiver.

muse-dev[bot] commented on a change in pull request #6516:
URL: https://github.com/apache/skywalking/pull/6516#discussion_r589389360



##########
File path: oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java
##########
@@ -35,19 +40,41 @@
 public class MeterServiceHandler implements KafkaHandler {
     private KafkaFetcherConfig config;
     private IMeterProcessService processService;
+    private final HistogramMetrics histogram;
+    private final CounterMetrics errorCounter;
 
     public MeterServiceHandler(ModuleManager manager, KafkaFetcherConfig config) {
         this.config = config;
         this.processService = manager.find(AnalyzerModule.NAME).provider().getService(IMeterProcessService.class);
+        MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME)
+                .provider()
+                .getService(MetricsCreator.class);
+        histogram = metricsCreator.createHistogramMetric(
+                "meter_in_latency", "The process latency of meter",
+                new MetricsTag.Keys("protocol"), new MetricsTag.Values("kafka-fetcher")
+        );
+        errorCounter = metricsCreator.createCounter("meter_analysis_error_count", "The error number of meter analysis",
+                new MetricsTag.Keys("protocol"),
+                new MetricsTag.Values("kafka-fetcher")
+        );
     }
 
     @Override
     public void handle(final ConsumerRecord<String, Bytes> record) {
         try {
             MeterDataCollection meterDataCollection = MeterDataCollection.parseFrom(record.value().get());
-
             MeterProcessor processor = processService.createProcessor();
-            meterDataCollection.getMeterDataList().forEach(meterData -> processor.read(meterData));
+            meterDataCollection.getMeterDataList().forEach(meterData -> {
+                HistogramMetrics.Timer timer = histogram.createTimer();
+                try {
+                    processor.read(meterData);
+                } catch (Exception e) {
+                    errorCounter.inc();
+                    log.error(e.getMessage(), e);
+                } finally {
+                    timer.finish();

Review comment:
       *RESOURCE_LEAK:*  resource of type `org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics$Timer` acquired by call to `createTimer()` at line 68 is not released after line 75.
   (at-me [in a reply](https://docs.muse.dev/docs/talk-to-muse/) with `help` or `ignore`)

##########
File path: oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java
##########
@@ -103,44 +122,52 @@ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleSta
                 private final PrometheusMetricConverter converter = new PrometheusMetricConverter(r, service);
 
                 @Override public void run() {
-                    if (Objects.isNull(r.getStaticConfig())) {
-                        return;
-                    }
-                    StaticConfig sc = r.getStaticConfig();
-                    long now = System.currentTimeMillis();
-                    converter.toMeter(sc.getTargets().stream()
-                        .map(CheckedFunction1.liftTry(target -> {
-                            URI url = new URI(target.getUrl());
-                            URI targetURL = url.resolve(r.getMetricsPath());
-                            String content = HttpClient.builder().url(targetURL.toString()).caFilePath(target.getSslCaFilePath()).build().request();
-                            List<Metric> result = new ArrayList<>();
-                            try (InputStream targetStream = new ByteArrayInputStream(content.getBytes(Charsets.UTF_8))) {
-                                Parser p = Parsers.text(targetStream);
-                                MetricFamily mf;
-                                while ((mf = p.parse(now)) != null) {
-                                    mf.getMetrics().forEach(metric -> {
-                                        if (Objects.isNull(sc.getLabels())) {
-                                            return;
+                    HistogramMetrics.Timer timer = histogram.createTimer();
+                    try {
+                        if (Objects.isNull(r.getStaticConfig())) {
+                            return;
+                        }
+                        StaticConfig sc = r.getStaticConfig();
+                        long now = System.currentTimeMillis();
+                        converter.toMeter(sc.getTargets().stream()
+                                .map(CheckedFunction1.liftTry(target -> {
+                                    URI url = new URI(target.getUrl());
+                                    URI targetURL = url.resolve(r.getMetricsPath());
+                                    String content = HttpClient.builder().url(targetURL.toString()).caFilePath(target.getSslCaFilePath()).build().request();
+                                    List<Metric> result = new ArrayList<>();
+                                    try (InputStream targetStream = new ByteArrayInputStream(content.getBytes(Charsets.UTF_8))) {
+                                        Parser p = Parsers.text(targetStream);
+                                        MetricFamily mf;
+                                        while ((mf = p.parse(now)) != null) {
+                                            mf.getMetrics().forEach(metric -> {
+                                                if (Objects.isNull(sc.getLabels())) {
+                                                    return;
+                                                }
+                                                Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
+                                                extraLabels.put("instance", target.getUrl());
+                                                extraLabels.forEach((key, value) -> {
+                                                    if (metric.getLabels().containsKey(key)) {
+                                                        metric.getLabels().put("exported_" + key, metric.getLabels().get(key));
+                                                    }
+                                                    metric.getLabels().put(key, value);
+                                                });
+                                            });
+                                            result.addAll(mf.getMetrics());
                                         }
-                                        Map<String, String> extraLabels = Maps.newHashMap(sc.getLabels());
-                                        extraLabels.put("instance", target.getUrl());
-                                        extraLabels.forEach((key, value) -> {
-                                            if (metric.getLabels().containsKey(key)) {
-                                                metric.getLabels().put("exported_" + key, metric.getLabels().get(key));
-                                            }
-                                            metric.getLabels().put(key, value);
-                                        });
-                                    });
-                                    result.addAll(mf.getMetrics());
-                                }
-                            }
-                            if (log.isDebugEnabled()) {
-                                log.debug("Fetch metrics from prometheus: {}", result);
-                            }
-                            return result;
-                        }))
-                        .flatMap(tryIt -> MetricConvert.log(tryIt, "Load metric"))
-                        .flatMap(Collection::stream));
+                                    }
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Fetch metrics from prometheus: {}", result);
+                                    }
+                                    return result;
+                                }))
+                                .flatMap(tryIt -> MetricConvert.log(tryIt, "Load metric"))
+                                .flatMap(Collection::stream));
+                    } catch (Exception e) {
+                        errorCounter.inc();
+                        log.error(e.getMessage(), e);
+                    } finally {
+                        timer.finish();

Review comment:
       *RESOURCE_LEAK:*  resource of type `org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics$Timer` acquired by call to `createTimer()` at line 125 is not released after line 169.
   (at-me [in a reply](https://docs.muse.dev/docs/talk-to-muse/) with `help` or `ignore`)

##########
File path: oap-server/server-receiver-plugin/skywalking-meter-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/meter/provider/handler/MeterServiceHandler.java
##########
@@ -45,7 +64,15 @@ public MeterServiceHandler(IMeterProcessService processService) {
         return new StreamObserver<MeterData>() {
             @Override
             public void onNext(MeterData meterData) {
-                processor.read(meterData);
+                HistogramMetrics.Timer timer = histogram.createTimer();
+                try {
+                    processor.read(meterData);
+                } catch (Exception e) {
+                    errorCounter.inc();
+                    log.error(e.getMessage(), e);
+                } finally {
+                    timer.finish();

Review comment:
       *RESOURCE_LEAK:*  resource of type `org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics$Timer` acquired by call to `createTimer()` at line 67 is not released after line 74.
   (at-me [in a reply](https://docs.muse.dev/docs/talk-to-muse/) with `help` or `ignore`)

##########
File path: oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV1JettyHandler.java
##########
@@ -64,8 +82,10 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
             response.setStatus(202);
         } catch (Exception e) {
             response.setStatus(500);
-
+            errorCounter.inc();
             log.error(e.getMessage(), e);
+        } finally {
+            timer.finish();

Review comment:
       *RESOURCE_LEAK:*  resource of type `org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics$Timer` acquired by call to `createTimer()` at line 71 is not released after line 88.
   **Note**: potential exception at line 80
   (at-me [in a reply](https://docs.muse.dev/docs/talk-to-muse/) with `help` or `ignore`)

##########
File path: oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/SpanV2JettyHandler.java
##########
@@ -65,8 +83,10 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
             response.setStatus(202);
         } catch (Exception e) {
             response.setStatus(500);
-
+            errorCounter.inc();
             log.error(e.getMessage(), e);
+        } finally {
+            timer.finish();

Review comment:
       *RESOURCE_LEAK:*  resource of type `org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics$Timer` acquired by call to `createTimer()` at line 72 is not released after line 89.
   **Note**: potential exception at line 81
   (at-me [in a reply](https://docs.muse.dev/docs/talk-to-muse/) with `help` or `ignore`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org