You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/30 09:15:43 UTC

[kafka] branch 2.1 updated: MINOR: improve Session expiration notice (#6618)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new b7ed3a0  MINOR: improve Session expiration notice (#6618)
b7ed3a0 is described below

commit b7ed3a0df44f62bc72e59cc01bf44550a5310b57
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Apr 27 16:59:54 2019 -0500

    MINOR: improve Session expiration notice (#6618)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../internals/KStreamSessionWindowAggregate.java   | 20 ++++++++++++++--
 .../kstream/internals/KStreamWindowAggregate.java  | 19 +++++++++++++--
 ...KStreamSessionWindowAggregateProcessorTest.java | 13 ++++++----
 .../internals/KStreamWindowAggregateTest.java      | 28 +++++++++++-----------
 .../kstream/internals/KStreamWindowReduceTest.java | 10 ++++----
 5 files changed, 63 insertions(+), 27 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f7802d6..707ad91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -148,8 +148,24 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 tupleForwarder.maybeForward(sessionKey, agg, null);
             } else {
                 LOG.debug(
-                    "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
-                    key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}) " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    mergedWindow.start(),
+                    mergedWindow.end(),
+                    closeTime,
+                    observedStreamTime
                 );
                 lateRecordDropSensor.record();
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index c5b2483..d53dd1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -128,8 +128,23 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                     tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
                 } else {
                     log.debug(
-                        "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
-                        key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
+                        "Skipping record for expired window. " +
+                            "key=[{}] " +
+                            "topic=[{}] " +
+                            "partition=[{}] " +
+                            "offset=[{}] " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}) " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        key,
+                        context().topic(),
+                        context().partition(),
+                        context().offset(),
+                        context().timestamp(),
+                        windowStart, windowEnd,
+                        closeTime,
+                        observedStreamTime
                     );
                     lateRecordDropSensor.record();
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index e5e74d8..c93dc60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -368,9 +368,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
             )
         );
 
-        assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0));
-
-        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
-        assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
+        assertThat(
+            (Double) metrics.metrics().get(dropRate).metricValue(),
+            greaterThan(0.0));
+        assertThat(
+            appender.getMessages(),
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
+        assertThat(
+            appender.getMessages(),
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 1e39bd3..7d2118c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -293,13 +293,13 @@ public class KStreamWindowAggregateTest {
             );
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
@@ -343,13 +343,13 @@ public class KStreamWindowAggregateTest {
             assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 913710f..d545e21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -113,11 +113,11 @@ public class KStreamWindowReduceTest {
 
             assertThat(dropMetric.metricValue(), equalTo(5.0));
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
-                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5] streamTime=[100]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100", 100);