You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/06 17:18:20 UTC
[kafka] branch trunk updated: Moved 'expired-window-' and 'late-'
record-drop to StreamsMetricsImpl (#6355)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14d97aa Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355)
14d97aa is described below
commit 14d97aabad1d5aab4b9407e24ab523157832934b
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 09:17:58 2019 -0800
Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355)
Moved hard-coded 'expired-window-record-drop' and 'late-record-drop' to static Strings in StreamsMetricsImpl
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bb...@gmail.com>
---
.../org/apache/kafka/streams/kstream/internals/metrics/Sensors.java | 5 +++--
.../streams/processor/internals/metrics/StreamsMetricsImpl.java | 3 +++
.../apache/kafka/streams/state/internals/InMemoryWindowStore.java | 5 +++--
.../kafka/streams/state/internals/RocksDBSegmentedBytesStore.java | 5 +++--
4 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 12c4813..4ecaeb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATE_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
@@ -40,14 +41,14 @@ public class Sensors {
final Sensor sensor = metrics.nodeLevelSensor(
context.taskId().toString(),
context.currentNode().name(),
- "late-record-drop",
+ LATE_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
StreamsMetricsImpl.addInvocationRateAndCount(
sensor,
PROCESSOR_NODE_METRICS_GROUP,
metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
- "late-record-drop"
+ LATE_RECORD_DROP
);
return sensor;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index ec35157..0a47fce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -55,6 +55,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
+ public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";
+ public static final String LATE_RECORD_DROP = "late-record-drop";
+
public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
Objects.requireNonNull(metrics, "Metrics cannot be null");
this.threadName = threadName;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 77820c5..67eec0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -42,6 +42,7 @@ import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
@@ -95,14 +96,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
expiredRecordSensor = metrics.storeLevelSensor(
taskName,
name(),
- "expired-window-record-drop",
+ EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCount(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
- "expired-window-record-drop"
+ EXPIRED_WINDOW_RECORD_DROP
);
if (root != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 0ed4e9d..f733c80 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
@@ -157,14 +158,14 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
expiredRecordSensor = metrics.storeLevelSensor(
taskName,
name(),
- "expired-window-record-drop",
+ EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCount(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
- "expired-window-record-drop"
+ EXPIRED_WINDOW_RECORD_DROP
);
segments.openExisting(this.context, observedStreamTime);