You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/03 20:24:13 UTC

[kafka] branch 2.2 updated: KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6670)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new bfad7f9  KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6670)
bfad7f9 is described below

commit bfad7f99f7b27348245189f8cd451250e4fab3e6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri May 3 15:23:59 2019 -0500

    KAFKA-8289: Fix Session Expiration and Suppression (#6654) (#6670)
    
    Fix two problems in Streams:
    * Session windows expired prematurely (off-by-one error), since the window end is inclusive, unlike other windows
    * Suppress duration for sessions incorrectly waited only the grace period, but session windows aren't closed until gracePeriod + sessionGap
    
    cherry-pick of 6654 into trunk
    
    Reviewers: Bill Bejeck <bi...@confluent.io>
---
 .../internals/KStreamSessionWindowAggregate.java   | 28 +++----
 .../internals/graph/GraphGraceSearchUtil.java      |  2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 96 ++++++++++++++++++++--
 .../kstream/internals/SuppressScenarioTest.java    | 41 ++++-----
 .../internals/graph/GraphGraceSearchUtilTest.java  | 10 +--
 5 files changed, 128 insertions(+), 49 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 2c98c10..aedf7f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -133,19 +133,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 }
             }
 
-            if (mergedWindow.end() > closeTime) {
-                if (!mergedWindow.equals(newSessionWindow)) {
-                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
-                        store.remove(session.key);
-                        tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null);
-                    }
-                }
-
-                agg = aggregator.apply(key, value, agg);
-                final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
-                store.put(sessionKey, agg);
-                tupleForwarder.maybeForward(sessionKey, agg, null);
-            } else {
+            if (mergedWindow.end() < closeTime) {
                 LOG.debug(
                     "Skipping record for expired window. " +
                         "key=[{}] " +
@@ -153,7 +141,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                         "partition=[{}] " +
                         "offset=[{}] " +
                         "timestamp=[{}] " +
-                        "window=[{},{}) " +
+                        "window=[{},{}] " +
                         "expiration=[{}] " +
                         "streamTime=[{}]",
                     key,
@@ -167,6 +155,18 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                     observedStreamTime
                 );
                 lateRecordDropSensor.record();
+            } else {
+                if (!mergedWindow.equals(newSessionWindow)) {
+                    for (final KeyValue<Windowed<K>, Agg> session : merged) {
+                        store.remove(session.key);
+                        tupleForwarder.maybeForward(session.key, null, sendOldValues ? session.value : null);
+                    }
+                }
+
+                agg = aggregator.apply(key, value, agg);
+                final Windowed<K> sessionKey = new Windowed<>(key, mergedWindow);
+                store.put(sessionKey, agg);
+                tupleForwarder.maybeForward(sessionKey, agg, null);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
index 306ddf5..2fb28dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -78,7 +78,7 @@ public final class GraphGraceSearchUtil {
             } else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
                 final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
                 final SessionWindows windows = kStreamSessionWindowAggregate.windows();
-                return windows.gracePeriodMs();
+                return windows.gracePeriodMs() + windows.inactivityGap();
             } else {
                 return null;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 8503980..f53d9f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -329,11 +329,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldLogAndMeterWhenSkippingLateRecord() {
+    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
         LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
-            SessionWindows.with(ofMillis(10L)).grace(ofMillis(10L)),
+            SessionWindows.with(ofMillis(10L)).grace(ofMillis(0L)),
             STORE_NAME,
             initializer,
             aggregator,
@@ -343,14 +343,21 @@ public class KStreamSessionWindowAggregateProcessorTest {
         initStore(false);
         processor.init(context);
 
-        // dummy record to advance stream time
-        context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
+        // dummy record to establish stream time = 0
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
         processor.process("dummy", "dummy");
 
+        // record arrives on time, should not be skipped
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
-        processor.process("A", "1");
+        processor.process("OnTime1", "1");
+
+        // dummy record to advance stream time = 1
         context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
-        processor.process("A", "1");
+        processor.process("dummy", "dummy");
+
+        // record is late
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("Late1", "1");
         LogCaptureAppender.unregister(appender);
 
         final MetricName dropMetric = new MetricName(
@@ -364,7 +371,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             )
         );
 
-        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
 
         final MetricName dropRate = new MetricName(
             "late-record-drop-rate",
@@ -382,9 +389,80 @@ public class KStreamSessionWindowAggregateProcessorTest {
             greaterThan(0.0));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10] streamTime=[20]"));
+            hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]"));
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
+        LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        final Processor<String, String> processor = new KStreamSessionWindowAggregate<>(
+            SessionWindows.with(ofMillis(10L)).grace(ofMillis(1L)),
+            STORE_NAME,
+            initializer,
+            aggregator,
+            sessionMerger
+        ).get();
+
+        initStore(false);
+        processor.init(context);
+
+        // dummy record to establish stream time = 0
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // record arrives on time, should not be skipped
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("OnTime1", "1");
+
+        // dummy record to advance stream time = 1
+        context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // delayed record arrives on time, should not be skipped
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("OnTime2", "1");
+
+        // dummy record to advance stream time = 2
+        context.setRecordContext(new ProcessorRecordContext(2, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
+        // delayed record arrives late
+        context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
+        processor.process("Late1", "1");
+
+
+        LogCaptureAppender.unregister(appender);
+
+        final MetricName dropMetric = new MetricName(
+            "late-record-drop-total",
+            "stream-processor-node-metrics",
+            "The total number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat(metrics.metrics().get(dropMetric).metricValue(), is(1.0));
+
+        final MetricName dropRate = new MetricName(
+            "late-record-drop-rate",
+            "stream-processor-node-metrics",
+            "The average number of occurrence of late-record-drop operations.",
+            mkMap(
+                mkEntry("client-id", "test"),
+                mkEntry("task-id", "0_0"),
+                mkEntry("processor-node-id", "TESTING_NODE")
+            )
+        );
+
+        assertThat(
+            (Double) metrics.metrics().get(dropRate).metricValue(),
+            greaterThan(0.0));
         assertThat(
             appender.getMessages(),
-            hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10] streamTime=[20]"));
+            hasItem("Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]"));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index b0b87be..24f222b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -47,7 +47,6 @@ import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -72,9 +71,9 @@ public class SuppressScenarioTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
     private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
     private final Properties config = Utils.mkProperties(Utils.mkMap(
-            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
-            Utils.mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
-            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+        Utils.mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
+        Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
     ));
 
     @Test
@@ -304,7 +303,7 @@ public class SuppressScenarioTest {
             .count();
         valueCounts
             // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
+            .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
             .toStream()
             .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
         valueCounts
@@ -352,7 +351,6 @@ public class SuppressScenarioTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSupportFinalResultsForTimeWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -403,7 +401,6 @@ public class SuppressScenarioTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -465,7 +462,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
+            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -482,34 +479,38 @@ public class SuppressScenarioTest {
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             // first window
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
+            // arbitrarily disordered records are admitted, because the *window* is not closed until stream-time > window-end + grace
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
-            // new window
-            driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L));
+            // any record in the same partition advances stream time (note the key is different)
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 6L));
             // late event for first window - this should get dropped from all streams, since the first window is now closed.
-            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
             // just pushing stream time forward to flush the other events through.
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
             verify(
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
-                    new KeyValueTimestamp<>("[k1@0/0]", null, 1L),
-                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
-                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L),
+                    new KeyValueTimestamp<>("[k1@0/0]", null, 5L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 2L, 5L),
+                    new KeyValueTimestamp<>("[k1@0/5]", null, 1L),
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L),
                     new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
                 )
             );
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
-                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
-                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L)
+                    new KeyValueTimestamp<>("[k1@0/5]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k2@6/6]", 1L, 6L)
                 )
             );
         }
     }
 
-    private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
+    private static <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
         if (results.size() != expectedResults.size()) {
             throw new AssertionError(printRecords(results) + " != " + expectedResults);
         }
@@ -524,7 +525,7 @@ public class SuppressScenarioTest {
         }
     }
 
-    private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
+    private static <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
         final List<ProducerRecord<K, V>> result = new LinkedList<>();
         for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
              next != null;
@@ -534,11 +535,11 @@ public class SuppressScenarioTest {
         return new ArrayList<>(result);
     }
 
-    private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
+    private static <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
         final StringBuilder resultStr = new StringBuilder();
         resultStr.append("[\n");
         for (final ProducerRecord<?, ?> record : result) {
-            resultStr.append("  ").append(record.toString()).append("\n");
+            resultStr.append("  ").append(record).append("\n");
         }
         resultStr.append("]");
         return resultStr.toString();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 5e426d9..45fe845 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -121,11 +121,11 @@ public class GraphGraceSearchUtilTest {
         );
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test
-    public void shouldExtractGraceFromAncestorThroughStatefulParent() {
+    public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() {
         final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
@@ -160,11 +160,11 @@ public class GraphGraceSearchUtilTest {
         statefulParent.addChild(node);
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test
-    public void shouldExtractGraceFromAncestorThroughStatelessParent() {
+    public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() {
         final SessionWindows windows = SessionWindows.with(ofMillis(10L)).grace(ofMillis(1234L));
         final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
             "asdf",
@@ -189,7 +189,7 @@ public class GraphGraceSearchUtilTest {
         statelessParent.addChild(node);
 
         final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
-        assertThat(extracted, is(windows.gracePeriodMs()));
+        assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap()));
     }
 
     @Test