You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/30 09:15:43 UTC
[kafka] branch 2.1 updated: MINOR: improve Session expiration
notice (#6618)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b7ed3a0 MINOR: improve Session expiration notice (#6618)
b7ed3a0 is described below
commit b7ed3a0df44f62bc72e59cc01bf44550a5310b57
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Apr 27 16:59:54 2019 -0500
MINOR: improve Session expiration notice (#6618)
Reviewers: Matthias J. Sax <ma...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
.../internals/KStreamSessionWindowAggregate.java | 20 ++++++++++++++--
.../kstream/internals/KStreamWindowAggregate.java | 19 +++++++++++++--
...KStreamSessionWindowAggregateProcessorTest.java | 13 ++++++----
.../internals/KStreamWindowAggregateTest.java | 28 +++++++++++-----------
.../kstream/internals/KStreamWindowReduceTest.java | 10 ++++----
5 files changed, 63 insertions(+), 27 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f7802d6..707ad91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -148,8 +148,24 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
tupleForwarder.maybeForward(sessionKey, agg, null);
} else {
LOG.debug(
- "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
- key, context().topic(), context().partition(), context().offset(), context().timestamp(), mergedWindow.start(), mergedWindow.end(), closeTime
+ "Skipping record for expired window. " +
+ "key=[{}] " +
+ "topic=[{}] " +
+ "partition=[{}] " +
+ "offset=[{}] " +
+ "timestamp=[{}] " +
+ "window=[{},{}) " +
+ "expiration=[{}] " +
+ "streamTime=[{}]",
+ key,
+ context().topic(),
+ context().partition(),
+ context().offset(),
+ context().timestamp(),
+ mergedWindow.start(),
+ mergedWindow.end(),
+ closeTime,
+ observedStreamTime
);
lateRecordDropSensor.record();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index c5b2483..d53dd1c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -128,8 +128,23 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
} else {
log.debug(
- "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{},{}) expiration=[{}]",
- key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, windowEnd, closeTime
+ "Skipping record for expired window. " +
+ "key=[{}] " +
+ "topic=[{}] " +
+ "partition=[{}] " +
+ "offset=[{}] " +
+ "timestamp=[{}] " +
+ "window=[{},{}) " +
+ "expiration=[{}] " +
+ "streamTime=[{}]",
+ key,
+ context().topic(),
+ context().partition(),
+ context().offset(),
+ context().timestamp(),
+ windowStart, windowEnd,
+ closeTime,
+ observedStreamTime
);
lateRecordDropSensor.record();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index e5e74d8..c93dc60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -368,9 +368,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
)
);
- assertThat((Double) metrics.metrics().get(dropRate).metricValue(), greaterThan(0.0));
-
- assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
- assertThat(appender.getMessages(), hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
+ assertThat(
+ (Double) metrics.metrics().get(dropRate).metricValue(),
+ greaterThan(0.0));
+ assertThat(
+ appender.getMessages(),
+ hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
+ assertThat(
+ appender.getMessages(),
+ hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 1e39bd3..7d2118c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -293,13 +293,13 @@ public class KStreamWindowAggregateTest {
);
assertThat(appender.getMessages(), hasItems(
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
@@ -343,13 +343,13 @@ public class KStreamWindowAggregateTest {
assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
assertThat(appender.getMessages(), hasItems(
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
- "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@200/210]", "+100", 200);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 913710f..d545e21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -113,11 +113,11 @@ public class KStreamWindowReduceTest {
assertThat(dropMetric.metricValue(), equalTo(5.0));
assertThat(appender.getMessages(), hasItems(
- "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]",
- "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5] streamTime=[100]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5] streamTime=[100]"
));
OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100", 100);