You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/04 11:26:35 UTC
[kafka] branch 2.1 updated: KAFKA-8289: Fix Session Expiration and
Suppression (#6654) (#6671)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new c6acaaa KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671)
c6acaaa is described below
commit c6acaaa469b20c385a7b5dbdcbb7a810a24881ab
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat May 4 06:26:10 2019 -0500
KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6671)
Session windows expired prematurely (off-by-one error), since the window end is inclusive, unlike other windows
Suppress duration for sessions incorrectly waited only the grace period, but session windows aren't closed until gracePeriod + sessionGap
cherry-pick of 6654 from trunk
Reviewers: Bill Bejeck <bi...@confluent.io>
---
.../internals/KStreamSessionWindowAggregate.java | 28 +++----
.../internals/graph/GraphGraceSearchUtil.java | 2 +-
...KStreamSessionWindowAggregateProcessorTest.java | 96 ++++++++++++++++++++--
.../kstream/internals/SuppressScenarioTest.java | 33 ++++----
.../internals/graph/GraphGraceSearchUtilTest.java | 10 +--
5 files changed, 125 insertions(+), 44 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 707ad91..368e2c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -134,19 +134,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
}
}
- if (mergedWindow.end() > closeTime) {
- if (!mergedWindow.equals(newSessionWindow)) {
- for (final KeyValue<Windowed<K>, Agg> session : merged) {
- store.remove(session.key);
- tupleForwarder.maybeForward(session.key, null, session.value);
- }
- }
-
- agg = aggregator.apply(key, value, agg);
- final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
- store.put(sessionKey, agg);
- tupleForwarder.maybeForward(sessionKey, agg, null);
- } else {
+ if (mergedWindow.end() < closeTime) {
LOG.debug(
"Skipping record for expired window. " +
"key=[{}] " +
@@ -154,7 +142,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
"partition=[{}] " +
"offset=[{}] " +
"timestamp=[{}] " +
- "window=[{},{}) " +
+ "window=[{},{}] " +
"expiration=[{}] " +
"streamTime=[{}]",
key,
@@ -168,6 +156,18 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
observedStreamTime
);
lateRecordDropSensor.record();
+ } else {
+ if (!mergedWindow.equals(newSessionWindow)) {
+ for (final KeyValue<Windowed<K>, Agg> session : merged) {
+ store.remove(session.key);
+ tupleForwarder.maybeForward(session.key, null, session.value);
+ }
+ }
+
+ agg = aggregator.apply(key, value, agg);
+ final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
+ store.put(sessionKey, agg);
+ tupleForwarder.maybeForward(sessionKey, agg, null);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 306ddf5..2fb28dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -78,7 +78,7 @@ public final class GraphGraceSearchUtil {
} else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
final SessionWindows windows = kStreamSessionWindowAggregate.windows();
- return windows.gracePeriodMs();
+ return windows.gracePeriodMs() + windows.inactivityGap();
} else {
return null;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c93dc60..bf61fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -320,11 +320,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldLogAndMeterWhenSkippingLateRecord() {
+ public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
- SessionWindows.with(ofMillis(10L)).grace(ofMillis(10L)),
+ SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
STORE_NAME,
initializer,
aggregator,
@@ -334,14 +334,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
initStore(false);
processor.init(context);
- // dummy record to advance stream time
- context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
+ // dummy record to establish stream time = 0
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
processor.process("dummy", "dummy");
+ // record arrives on time, should not be skipped
context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
- processor.process("A", "1");
+ processor.process("OnTime1", "1");
+
+ // dummy record to advance stream time = 1
context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
- processor.process("A", "1");
+ processor.process("dummy", "dummy");
+
+ // record is late
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+ processor.process("Late1", "1");
LogCaptureAppender.unregister(appender);
final MetricName dropMetric = new MetricName(
@@ -355,7 +362,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
)
);
- assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+ assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
final MetricName dropRate = new MetricName(
"late-record-drop-rate",
@@ -373,9 +380,80 @@ public class KStreamSessionWindowAggregateProcessorTest {
greaterThan(0.0));
assertThat(
appender.getMessages(),
- hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
+ hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]"));
+ }
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
+ LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
+ SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
+ STORE_NAME,
+ initializer,
+ aggregator,
+ sessionMerger
+ ).get();
+
+ initStore(false);
+ processor.init(context);
+
+ // dummy record to establish stream time = 0
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+ processor.process("dummy", "dummy");
+
+ // record arrives on time, should not be skipped
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+ processor.process("OnTime1", "1");
+
+ // dummy record to advance stream time = 1
+ context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+ processor.process("dummy", "dummy");
+
+ // delayed record arrives on time, should not be skipped
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+ processor.process("OnTime2", "1");
+
+ // dummy record to advance stream time = 2
+ context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
+ processor.process("dummy", "dummy");
+
+ // delayed record arrives late
+ context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+ processor.process("Late1", "1");
+
+
+ LogCaptureAppender.unregister(appender);
+
+ final MetricName dropMetric = new MetricName(
+ "late-record-drop-total",
+ "stream-processor-node-metrics",
+ "The total number of occurrence of late-record-drop operations.",
+ mkMap(
+ mkEntry("client-id", "test"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "TESTING_NODE")
+ )
+ );
+
+ assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
+
+ final MetricName dropRate = new MetricName(
+ "late-record-drop-rate",
+ "stream-processor-node-metrics",
+ "The average number of occurrence of late-record-drop operations.",
+ mkMap(
+ mkEntry("client-id", "test"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "TESTING_NODE")
+ )
+ );
+
+ assertThat(
+ (Double) metrics.metrics().get(dropRate).metricValue(),
+ greaterThan(0.0));
assertThat(
appender.getMessages(),
- hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
+ hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]"));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 65c51fc..361677d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -46,7 +46,6 @@ import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.Test;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -310,7 +309,7 @@ public class SuppressScenarioTest {
.count();
valueCounts
// this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
- .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
+ .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
.toStream()
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
@@ -481,7 +480,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
- .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L)))
+ .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -502,34 +501,38 @@ public class SuppressScenarioTest {
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
// first window
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+ driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
+ // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
- // new window
- driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L));
+ // any record in the same partition advances stream time (note the key is different)
+ driver.pipeInput(recordFactory.create("input", "k2", "v1", 6L));
// late event for first window - this should get dropped from all streams, since the first window is now closed.
- driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+ driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
// just pushing stream time forward to flush the other events through.
driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
- new KeyValueTimestamp<>("[k1@0/0]", null, 1L),
- new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
- new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L),
+ new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
+ new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
+ new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
+ new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+ new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
- new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
- new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L)
+ new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+ new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
)
);
}
}
- private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
+ private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
if (results.size() != expectedResults.size()) {
throw new AssertionError(printRecords(results) + " != " + expectedResults);
}
@@ -544,7 +547,7 @@ public class SuppressScenarioTest {
}
}
- private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
+ private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
final List<ProducerRecord<K, V>> result = new LinkedList<>();
for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
next != null;
@@ -554,11 +557,11 @@ public class SuppressScenarioTest {
return new ArrayList<>(result);
}
- private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
+ private static <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
final StringBuilder resultStr = new StringBuilder();
resultStr.append("[\n");
for (final ProducerRecord<?, ?> record : result) {
- resultStr.append(" ").append(record.toString()).append("\n");
+ resultStr.append(" ").append(record).append("\n");
}
resultStr.append("]");
return resultStr.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 5e426d9..45fe845 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -121,11 +121,11 @@ public class GraphGraceSearchUtilTest {
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
- assertThat(extracted, is(windows.gracePeriodMs()));
+ assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test
- public void shouldExtractGraceFromAncestorThroughStatefulParent() {
+ public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
@@ -160,11 +160,11 @@ public class GraphGraceSearchUtilTest {
statefulParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
- assertThat(extracted, is(windows.gracePeriodMs()));
+ assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test
- public void shouldExtractGraceFromAncestorThroughStatelessParent() {
+ public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
@@ -189,7 +189,7 @@ public class GraphGraceSearchUtilTest {
statelessParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
- assertThat(extracted, is(windows.gracePeriodMs()));
+ assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
}
@Test