You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/06 17:18:20 UTC

[kafka] branch trunk updated: Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14d97aa  Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355)
14d97aa is described below

commit 14d97aabad1d5aab4b9407e24ab523157832934b
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 09:17:58 2019 -0800

    Moved 'expired-window-' and 'late-' record-drop to StreamsMetricsImpl (#6355)
    
    Moved hard-coded 'expired-window-record-drop' and 'late-record-drop' to static Strings in StreamsMetricsImpl
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>,  Bill Bejeck <bb...@gmail.com>
---
 .../org/apache/kafka/streams/kstream/internals/metrics/Sensors.java  | 5 +++--
 .../streams/processor/internals/metrics/StreamsMetricsImpl.java      | 3 +++
 .../apache/kafka/streams/state/internals/InMemoryWindowStore.java    | 5 +++--
 .../kafka/streams/state/internals/RocksDBSegmentedBytesStore.java    | 5 +++--
 4 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 12c4813..4ecaeb4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATE_RECORD_DROP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 
@@ -40,14 +41,14 @@ public class Sensors {
         final Sensor sensor = metrics.nodeLevelSensor(
             context.taskId().toString(),
             context.currentNode().name(),
-            "late-record-drop",
+            LATE_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
         StreamsMetricsImpl.addInvocationRateAndCount(
             sensor,
             PROCESSOR_NODE_METRICS_GROUP,
             metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
-            "late-record-drop"
+            LATE_RECORD_DROP
         );
         return sensor;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index ec35157..0a47fce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -55,6 +55,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     public static final String PROCESSOR_NODE_METRICS_GROUP = "stream-processor-node-metrics";
     public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
 
+    public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";
+    public static final String LATE_RECORD_DROP = "late-record-drop";
+
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.threadName = threadName;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 77820c5..67eec0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -42,6 +42,7 @@ import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
@@ -95,14 +96,14 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         expiredRecordSensor = metrics.storeLevelSensor(
             taskName,
             name(),
-            "expired-window-record-drop",
+            EXPIRED_WINDOW_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
         addInvocationRateAndCount(
             expiredRecordSensor,
             "stream-" + metricScope + "-metrics",
             metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
-            "expired-window-record-drop"
+            EXPIRED_WINDOW_RECORD_DROP
         );
 
         if (root != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 0ed4e9d..f733c80 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
 public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
@@ -157,14 +158,14 @@ public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
         expiredRecordSensor = metrics.storeLevelSensor(
             taskName,
             name(),
-            "expired-window-record-drop",
+            EXPIRED_WINDOW_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
         addInvocationRateAndCount(
             expiredRecordSensor,
             "stream-" + metricScope + "-metrics",
             metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
-            "expired-window-record-drop"
+            EXPIRED_WINDOW_RECORD_DROP
         );
 
         segments.openExisting(this.context, observedStreamTime);