You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/29 21:40:53 UTC

[kafka] branch 2.2 updated: MINOR: improve Session expiration notice (#6618)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new b8b153a  MINOR: improve Session expiration notice (#6618)
b8b153a is described below

commit b8b153aad6d8b96adcaf4456c8d4dd14e18ec33c
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Apr 27 16:59:54 2019 -0500

    MINOR: improve Session expiration notice (#6618)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../internals/KStreamSessionWindowAggregate.java   | 20 ++++++++++++++--
 .../kstream/internals/KStreamWindowAggregate.java  | 19 +++++++++++++--
 ...KStreamSessionWindowAggregateProcessorTest.java |  4 ++--
 .../internals/KStreamWindowAggregateTest.java      | 28 +++++++++++-----------
 4 files changed, 51 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 60f1b6a..2c98c10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -147,8 +147,24 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 tupleForwarder.maybeForward(sessionKey, agg, null);
             } else {
                 LOG.debug(
-                    "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
-                    key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
+                    "Skipping record for expired window. " +
+                        "key=[{}] " +
+                        "topic=[{}] " +
+                        "partition=[{}] " +
+                        "offset=[{}] " +
+                        "timestamp=[{}] " +
+                        "window=[{},{}) " +
+                        "expiration=[{}] " +
+                        "streamTime=[{}]",
+                    key,
+                    context().topic(),
+                    context().partition(),
+                    context().offset(),
+                    context().timestamp(),
+                    mergedWindow.start(),
+                    mergedWindow.end(),
+                    closeTime,
+                    observedStreamTime
                 );
                 lateRecordDropSensor.record();
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index e5f290e..79c088b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -128,8 +128,23 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
                     tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, sendOldValues ? oldAgg : null);
                 } else {
                     log.debug(
-                        "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
-                        key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
+                        "Skipping record for expired window. " +
+                            "key=[{}] " +
+                            "topic=[{}] " +
+                            "partition=[{}] " +
+                            "offset=[{}] " +
+                            "timestamp=[{}] " +
+                            "window=[{},{}) " +
+                            "expiration=[{}] " +
+                            "streamTime=[{}]",
+                        key,
+                        context().topic(),
+                        context().partition(),
+                        context().offset(),
+                        context().timestamp(),
+                        windowStart, windowEnd,
+                        closeTime,
+                        observedStreamTime
                     );
                     lateRecordDropSensor.record();
                 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 4038bfd..8503980 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -382,9 +382,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
             greaterThan(0.0));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
+            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 068062b..e9f08fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -292,13 +292,13 @@ public class KStreamWindowAggregateTest {
             );
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
@@ -342,13 +342,13 @@ public class KStreamWindowAggregateTest {
             assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
 
             assertThat(appender.getMessages(), hasItems(
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
-                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]",
+                "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"
             ));
 
             OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);