You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/14 15:04:24 UTC
[storm] branch master updated: STORM-3780 switch
ErrorReportingMetrics to V2 API (#3401)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 0df2ba2 STORM-3780 switch ErrorReportingMetrics to V2 API (#3401)
0df2ba2 is described below
commit 0df2ba288a57b2afff11a47c978fa99c905c71da
Author: agresch <ag...@gmail.com>
AuthorDate: Wed Jul 14 10:04:15 2021 -0500
STORM-3780 switch ErrorReportingMetrics to V2 API (#3401)
* STORM-3780 switch ErrorReportingMetrics to V2 API
---
.../storm/daemon/metrics/BuiltinMetrics.java | 28 -------------------
.../daemon/metrics/ErrorReportingMetrics.java | 32 ----------------------
.../jvm/org/apache/storm/executor/Executor.java | 15 +++++-----
.../apache/storm/executor/bolt/BoltExecutor.java | 3 --
.../executor/bolt/BoltOutputCollectorImpl.java | 2 +-
.../apache/storm/executor/spout/SpoutExecutor.java | 1 -
.../executor/spout/SpoutOutputCollectorImpl.java | 2 +-
7 files changed, 10 insertions(+), 73 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
deleted file mode 100644
index 6fbbdcc..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetrics.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.task.TopologyContext;
-
-public abstract class BuiltinMetrics {
- protected final Map<String, IMetric> metricMap = new HashMap<>();
-
- public void registerAll(Map<String, Object> topoConf, TopologyContext context) {
- for (Map.Entry<String, IMetric> entry : metricMap.entrySet()) {
- BuiltinMetricsUtil.registerMetric("__" + entry.getKey(), entry.getValue(), topoConf, context);
- }
- }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java
deleted file mode 100644
index aea3388..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/ErrorReportingMetrics.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.api.CountMetric;
-
-public class ErrorReportingMetrics extends BuiltinMetrics {
- private final CountMetric reportedErrorCount = new CountMetric();
-
- public ErrorReportingMetrics() {
- metricMap.put("reported-error-count", reportedErrorCount);
- }
-
- public void incrReportedErrorCountBy(long n) {
- this.reportedErrorCount.incrBy(n);
- }
-
- public void incrReportedErrorCount() {
- this.reportedErrorCount.incr();
- }
-
-}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index ef4d057..8b4d8ac 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -49,7 +49,6 @@ import org.apache.storm.daemon.Acker;
import org.apache.storm.daemon.GrouperFactory;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.ErrorReportingMetrics;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.bolt.BoltExecutor;
import org.apache.storm.executor.error.IReportError;
@@ -66,6 +65,7 @@ import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.metrics2.RateCounter;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue;
@@ -119,7 +119,6 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
protected final Boolean isDebug;
protected final Boolean hasEventLoggers;
protected final boolean ackingEnabled;
- protected final ErrorReportingMetrics errorReportingMetrics;
protected final MpscChunkedArrayQueue<AddressedTuple> pendingEmits = new MpscChunkedArrayQueue<>(1024, (int) Math.pow(2, 30));
private final AddressedTuple flushTuple;
protected ExecutorTransfer executorTransfer;
@@ -128,6 +127,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
protected String hostname;
private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
private AtomicBoolean needToRefreshCreds = new AtomicBoolean(false);
+ private final RateCounter reportedErrorCount;
protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
this.workerData = workerData;
@@ -180,8 +180,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
} catch (UnknownHostException ignored) {
this.hostname = "";
}
- this.errorReportingMetrics = new ErrorReportingMetrics();
flushTuple = AddressedTuple.createFlushTuple(workerTopologyContext);
+ this.reportedErrorCount = workerData.getMetricRegistry().rateCounter("__reported-error-count", componentId,
+ taskIds.get(0));
}
public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
@@ -636,10 +637,6 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
return reportError;
}
- public ErrorReportingMetrics getErrorReportingMetrics() {
- return errorReportingMetrics;
- }
-
public WorkerTopologyContext getWorkerTopologyContext() {
return workerTopologyContext;
}
@@ -680,4 +677,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
public void setLocalExecutorTransfer(ExecutorTransfer executorTransfer) {
this.executorTransfer = executorTransfer;
}
+
+ public void incrementReportedErrorCount() {
+ reportedErrorCount.inc(1L);
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 293d756..273bab5 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -99,9 +99,6 @@ public class BoltExecutor extends Executor {
Utils.sleepNoSimulation(100);
}
- if (!componentId.equals(StormCommon.SYSTEM_STREAM_ID)) { // System bolt doesn't call reportError()
- this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
- }
LOG.info("Preparing bolt {}:{}", componentId, getTaskIds());
for (Task taskData : idToTask) {
if (taskData == null) {
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index cf3e7c6..886a00c 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -187,7 +187,7 @@ public class BoltOutputCollectorImpl implements IOutputCollector {
@Override
public void reportError(Throwable error) {
- executor.getErrorReportingMetrics().incrReportedErrorCount();
+ executor.incrementReportedErrorCount();
executor.getReportError().report(error);
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 94e8899..25c8447 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -123,7 +123,6 @@ public class SpoutExecutor extends Executor {
}
});
- this.errorReportingMetrics.registerAll(topoConf, idToTask.get(taskIds.get(0) - idToTaskBase).getUserContext());
this.outputCollectors = new ArrayList<>();
for (int i = 0; i < idToTask.size(); ++i) {
Task taskData = idToTask.get(i);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
index c370d6a..d47b534 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@@ -102,7 +102,7 @@ public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
@Override
public void reportError(Throwable error) {
- executor.getErrorReportingMetrics().incrReportedErrorCount();
+ executor.incrementReportedErrorCount();
executor.getReportError().report(error);
}