You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/14 15:04:24 UTC

[storm] branch master updated: STORM-3780 switch ErrorReportingMetrics to V2 API (#3401)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 0df2ba2  STORM-3780 switch ErrorReportingMetrics to V2 API (#3401)
0df2ba2 is described below

commit 0df2ba288a57b2afff11a47c978fa99c905c71da
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Jul 14 10:04:15 2021 -0500

    STORM-3780 switch ErrorReportingMetrics to V2 API (#3401)
    
    * STORM-3780 switch ErrorReportingMetrics to V2 API
---
 .../storm/daemon/metrics/BuiltinMetrics.java       | 28 -------------------
 .../daemon/metrics/ErrorReportingMetrics.java      | 32 ----------------------
 .../jvm/org/apache/storm/executor/Executor.java    | 15 +++++-----
 .../apache/storm/executor/bolt/BoltExecutor.java   |  3 --
 .../executor/bolt/BoltOutputCollectorImpl.java     |  2 +-
 .../apache/storm/executor/spout/SpoutExecutor.java |  1 -
 .../executor/spout/SpoutOutputCollectorImpl.java   |  2 +-
 7 files changed, 10 insertions(+), 73 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
deleted file mode 100644
index 6fbbdcc..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-
-public abstract class BuiltinMetrics {
-    protected final Map<String, IMetric> metricMap = new HashMap<>();
-
-    public void registerAll(Map<String, Object> topoConf, TopologyContext context) {
-        for (Map.Entry<String, IMetric> entry : metricMap.entrySet()) {
-            BuiltinMetricsUtil.registerMetric("__" + entry.getKey(), entry.getValue(), topoConf, context);
-        }
-    }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java
deleted file mode 100644
index aea3388..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.api.CountMetric;
-
-public class ErrorReportingMetrics extends BuiltinMetrics {
-    private final CountMetric reportedErrorCount = new CountMetric();
-
-    public ErrorReportingMetrics() {
-        metricMap.put("reported-error-count", reportedErrorCount);
-    }
-
-    public void incrReportedErrorCountBy(long n) {
-        this.reportedErrorCount.incrBy(n);
-    }
-
-    public void incrReportedErrorCount() {
-        this.reportedErrorCount.incr();
-    }
-
-}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index ef4d057..8b4d8ac 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -49,7 +49,6 @@ import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.GrouperFactory;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.ErrorReportingMetrics;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.bolt.BoltExecutor;
 import org.apache.storm.executor.error.IReportError;
@@ -66,6 +65,7 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.grouping.LoadMapping;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.RateCounter;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -119,7 +119,6 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     protected final Boolean isDebug;
     protected final Boolean hasEventLoggers;
     protected final boolean ackingEnabled;
-    protected final ErrorReportingMetrics errorReportingMetrics;
     protected final MpscChunkedArrayQueue<AddressedTuple> pendingEmits = new MpscChunkedArrayQueue<>(1024, (int) Math.pow(2, 30));
     private final AddressedTuple flushTuple;
     protected ExecutorTransfer executorTransfer;
@@ -128,6 +127,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     protected String hostname;
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
     private AtomicBoolean needToRefreshCreds = new AtomicBoolean(false);
+    private final RateCounter reportedErrorCount;
 
     protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
         this.workerData = workerData;
@@ -180,8 +180,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         } catch (UnknownHostException ignored) {
             this.hostname = "";
         }
-        this.errorReportingMetrics = new ErrorReportingMetrics();
         flushTuple = AddressedTuple.createFlushTuple(workerTopologyContext);
+        this.reportedErrorCount = workerData.getMetricRegistry().rateCounter("__reported-error-count", componentId,
+                taskIds.get(0));
     }
 
     public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
@@ -636,10 +637,6 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         return reportError;
     }
 
-    public ErrorReportingMetrics getErrorReportingMetrics() {
-        return errorReportingMetrics;
-    }
-
     public WorkerTopologyContext getWorkerTopologyContext() {
         return workerTopologyContext;
     }
@@ -680,4 +677,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
         this.executorTransfer = executorTransfer;
     }
+
+    public void incrementReportedErrorCount() {
+        reportedErrorCount.inc(1L);
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 293d756..273bab5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -99,9 +99,6 @@ public class BoltExecutor extends Executor {
             Utils.sleepNoSimulation(100);
         }
 
-        if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System bolt doesn't call reportError()
-            this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
-        }
         LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
         for (Task taskData : idToTask) {
             if (taskData == null) {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index cf3e7c6..886a00c 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -187,7 +187,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
 
     @Override
     public void reportError(Throwable error) {
-        executor.getErrorReportingMetrics().incrReportedErrorCount();
+        executor.incrementReportedErrorCount();
         executor.getReportError().report(error);
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 94e8899..25c8447 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -123,7 +123,6 @@ public class SpoutExecutor extends Executor {
             }
         });
 
-        this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
         this.outputCollectors = new ArrayList<>();
         for (int i = 0; i < idToTask.size(); ++i) {
             Task taskData = idToTask.get(i);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
index c370d6a..d47b534 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -102,7 +102,7 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
 
     @Override
     public void reportError(Throwable error) {
-        executor.getErrorReportingMetrics().incrReportedErrorCount();
+        executor.incrementReportedErrorCount();
         executor.getReportError().report(error);
     }