You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "wcarlson5 (via GitHub)" <gi...@apache.org> on 2023/06/14 20:30:13 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request, #13855: Grace period added

wcarlson5 opened a new pull request, #13855:
URL: https://github.com/apache/kafka/pull/13855

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1238801056


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,60 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)

Review Comment:
   Yeah we can remove



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   This seems to be the main sticking point of the PR. 
   
   The thought was that this that we would need some way to stop using the grace period without loosing records, but you bring up a good point.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -122,12 +122,12 @@ public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(final K key) {
     }
 
     @Override
-    public void put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) {
+    public boolean put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) {

Review Comment:
   added `shouldReturnIfRecordWasAdded`



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   Okay so I actually tried to set this up and it was NOT simpler. The mocking got way out of hand. Normally I 100 precent agree with out, mocking is way better. 
   
   Due to you other comment I just ended up moving this whole class to the unit testing class for the normal Join anyways, so this is a bit of a moot point



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,

Review Comment:
   sure, good call.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableC", Duration.ofMinutes(5))));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ZERO)).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join", Duration.ZERO)).to("out-two");
+        final Topology topology = builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {
+        makeJoin(Duration.ofMillis(9));
+
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+    }
+
+    @Test
+    public void shouldNotJoinWithEmptyTableOnStreamUpdates() {
+        makeJoin(Duration.ofMillis(1));
+        // push four items to the primary stream. the table is empty
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldNotJoinOnTableUpdates() {

Review Comment:
   That is a good idea, massively simplified the code. I resolved the other comments that were pretty much the same as this one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1238656502


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java:
##########
@@ -122,12 +122,12 @@ public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(final K key) {
     }
 
     @Override
-    public void put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) {
+    public boolean put(final long time, final Record<K, V> record, final ProcessorRecordContext recordContext) {

Review Comment:
   Could you please add unit tests for this change?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,

Review Comment:
   Please do also verify the exception message. We had cases in the past where the test did not fail although a different exception was thrown.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   I still do not understand why you check for `gracePeriod.isPresent()` but that is not blocking this PR.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   > I think its best if they aline with each other
   
   Why is that?
   
   I find it a bit sad that we do not have real unit tests for the processor. But it's not your fault. I just wanted to document it somewhere.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   Another related question: What is the difference between `KStreamKTableJoinWithGraceTest` and `StreamTableJoinWithGraceIntegrationTest`? They both use the `TopologyTestDriver` so it seems they both test the stream table join on the same level. I would rather expect to have unit tests that test directly the processor and an intergration test that tests the integration of the processor.  



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,60 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)

Review Comment:
   Is this comment so important or can you remove it? 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   BTW, using unit tests would make the tests much simpler. You just need to mock the value getter (and maybe some other parts) instead of building up a whole topology. 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableC", Duration.ofMinutes(5))));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ZERO)).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join", Duration.ZERO)).to("out-two");
+        final Topology topology = builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {
+        makeJoin(Duration.ofMillis(9));
+
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+    }
+
+    @Test
+    public void shouldNotJoinWithEmptyTableOnStreamUpdates() {
+        makeJoin(Duration.ofMillis(1));
+        // push four items to the primary stream. the table is empty
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldNotJoinOnTableUpdates() {

Review Comment:
   Same here and for the rest of the test class. I think you should just move the first two tests (i.e., the new ones) to `KStreamKTableJoinTest`.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {

Review Comment:
   Is this test really needed? How is this related to the grace period and the buffer? Repartition topics should work as before, right?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableC", Duration.ofMinutes(5))));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ZERO)).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join", Duration.ZERO)).to("out-two");
+        final Topology topology = builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {

Review Comment:
   Same applies to this test. The grace period does not affect this part of the logic.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @BeforeEach
+    public void setUp() {
+        builder = new StreamsBuilder();
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @AfterEach
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+        final KTable<String, String> tableC = builder.table("topic3", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableC", Duration.ofMinutes(5))));
+        final KStream<String, String> rekeyedStream = streamA.map((k, v) -> new KeyValue<>(v, k));
+
+        rekeyedStream.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ZERO)).to("out-one");
+        rekeyedStream.join(tableC, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "second-join", Duration.ZERO)).to("out-two");
+        final Topology topology = builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserProvidedRepartitionTopicNames, topology.describe().toString());
+    }
+
+    @Test
+    public void shouldRequireCopartitionedStreams() {
+        makeJoin(Duration.ofMillis(9));
+
+        final Collection<Set<String>> copartitionGroups =
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
+    }
+
+    @Test
+    public void shouldNotJoinWithEmptyTableOnStreamUpdates() {

Review Comment:
   This is already tested in `KStreamKTableJoinTest`. I do not think you need to test it here. 



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   Fair enough! It is fine to leave it to the future when we will migrate also the parent class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232839068


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   So I was following the normal KStreamKTableJoinTest. I think its best if they aline with each other.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1603313016

   The next PR is the restore logic https://github.com/wcarlson5/kafka/pull/2
   
   I have it targeted to this branch for easy reading and will retarget it when this is merged. 
   
   @vcrfxia @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244730962


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);
+        }
+    }
+
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) {
+        final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
+            .withHeaders(toEmit.recordContext().headers());
+        internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   certainly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1235096344


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   > So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.
   
   Are you saying that if I set the grace period > 0 and I run my application, then I stop my application and set the grace period == 0, and restart my application, my application will process records from my first run?
   Why should removing the grace period (grace period == null) behave differently?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1232831833


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   If you find it easier to understand sure. that is fine



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   I see what you mean but we actually were going to create the store and leave it empty for zero duration. The point was to make it easier to change the grace period if desired so the store isn't getting created and destroyed. Something about making it more backward compatible.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   yep sure.
   
   (Edit: The class it extents also uses junit 4 and I cant change this one without also chaining that and all other join integration tests)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {

Review Comment:
   That is a good idea. I'm also in favor of having put decide.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   Ah this should be removed. It will be added back in the next PR (or the one after)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   So we are going to make a store even when the grace period is zero. This should make changing the grace period to and from zero smoother. So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private KStream<Long, String> leftStream;
+    private KTable<Long, String> rightTable;
+    private Joined<Long, String, String> joined;
+
+    public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+        appID = "stream-table-join-integration-test";
+        builder = new StreamsBuilder();
+        joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ZERO);

Review Comment:
   I'll add some, I think I need to rework what is in the integration tests and the unit tests. Should be fixed. 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {
+                doJoin(record);
+            } else {
+                buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext());
+            }
+            buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= deadline, this::emit);

Review Comment:
   it should be the same. So sure I can go with the simple approach 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   I'm going to move the relevant tests to the integration tests and remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1592137217

   @cadonna could you give this a look? @vcrfxia has looked at [this](https://github.com/wcarlson5/kafka/pull/1) and there is a check of the table's history retention that I am missing but other than that it should be good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244729657


##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window.

Review Comment:
   updated and clarified 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1613049852

   Thanks @wcarlson5 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1243468362


##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window.

Review Comment:
   Could you please also add a couple of line breaks to the java docs as you did for the javadocs of the `with()` method?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   I think that should be the following way to avoid a unnecessary range query:
   ```suggestion
               if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
                   doJoin(record);
               } else {
                   buffer.get().evictWhile(() -> true, this::emit);
               }
   ```
   If the record is a late record it will not update the observed stream time. If the observed stream time is not updated, the range query will not return records that need to be evicted, since they have been already evicted the last time `evictWhile()` was called.
   Does this make sense?
   
   If you agree could also please add a test for this?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -203,4 +250,4 @@ public Serde<V> valueSerde() {
     public Serde<VO> otherValueSerde() {
         return otherValueSerde;
     }
-}
+}

Review Comment:
   nit: Could remove this change? 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   Good call!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1235135157


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   What I am trying to say is that we should try to put simplicity over partial support for topology updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna merged PR #13855:
URL: https://github.com/apache/kafka/pull/13855


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1230539039


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   Shouldn't this be
   ```suggestion
           if (joined.gracePeriod() != null && joined.gracePeriod() > 0) {
   ```
   In the KIP discussion we said that a grace period of zero is equal to no grace period.
   
   Note, some conditions later on need to be changed if you agree with this change, like for example `gracePeriod.isPresent() ^ buffer.isPresent()`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {

Review Comment:
   Why do you check for `gracePeriod.isPresent()` again? You have already done that in the constructor. 
   
   I think a boolean `useBuffer` that is set in the constructor would make the code more readable.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {

Review Comment:
   Why here `record.timestamp() <= deadline` and in `RocksDBTimeOrderedKeyValueBuffer` at line 127 `wrapped().observedStreamTime - gracePeriod > record.timestamp()`?
   
   I am wondering, if it would be simpler to remove the condition in `RocksDBTimeOrderedKeyValueBuffer` at line 127 and only check late records here. In the end, you do not get any return value from the `put()` in `RocksDBTimeOrderedKeyValueBuffer`. So it would silently not add the record to the buffer. Such a behavior might become hard to debug in case of a bug and it would also be quite error-prone because you have the condition for late records in two places instead of just one.
   
   I am even in favor of let `RocksDBTimeOrderedKeyValueBuffer#put()` return whether the record was added to the buffer or not. In such a way, you can move the logic for eviction and late records to the buffer instead of having it half here and half there making maintaining the code harder.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private KStream<Long, String> leftStream;
+    private KTable<Long, String> rightTable;
+    private Joined<Long, String, String> joined;
+
+    public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+        appID = "stream-table-join-integration-test";
+        builder = new StreamsBuilder();
+        joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ZERO);

Review Comment:
   Why do you have only tests with grace period set to zero. I thought a grace period of zero is equivalent to no grace period. Basically, you are not testing the new code if you look at it from the outside.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMinutes(6))).to("out-one");
+        builder.build(props);
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTable(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce four items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1),
+            new KeyValueTimestamp<>(2, "X2+YY2", 2),
+            new KeyValueTimestamp<>(3, "X3+YY3", 3));

Review Comment:
   nit:
   ```suggestion
           processor.checkAndClearProcessResult(
               new KeyValueTimestamp<>(0, "X0+YY0", 0),
               new KeyValueTimestamp<>(1, "X1+YY1", 1),
               new KeyValueTimestamp<>(2, "X2+YY2", 2),
               new KeyValueTimestamp<>(3, "X3+YY3", 3));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {

Review Comment:
   Again, why do you verify both? In the constructor you ensure the invariant `gracePeriod.isPresent() == buffer.isPresent()`. A boolean `useBuffer` would make the code better readable.  



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;

Review Comment:
   Also here, please use JUnit 5.
   I also forgot to point that out for `RocksDBTimeOrderedKeyValueBufferTest`. You can to open a separate PR for that.



##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   Could you please use JUnit 5 for new integration tests?
   You can find an example in `RestoreIntegrationTest`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +75,63 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (buffer.isPresent()) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            //cast doesn't matter, it is just because the processor is deprecated. The context gets converted back with StoreToProcessorContextAdapter.adapt(context)
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!gracePeriod.isPresent() || !buffer.isPresent()) {
+            doJoin(record);
+        } else {
+            final long deadline = observedStreamTime - gracePeriod.get().toMillis();
+            if (record.timestamp() <= deadline) {
+                doJoin(record);
+            } else {
+                buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext());
+            }
+            buffer.get().evictWhile(() -> buffer.get().minTimestamp() <= deadline, this::emit);

Review Comment:
   I am wondering if it might not be simpler to use as predicate just `() -> true` instead of `() -> buffer.get().minTimestamp() <= deadline` and leave the conditions for evicting to the buffer. Otherwise, we have half of the logic for evicting here and the other half in the buffer. That seems error-prone and hard to maintain and debug to me.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;

Review Comment:
   Why do you need the topology test driver for unit tests?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5))));
+
+        streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMinutes(6))).to("out-one");
+        builder.build(props);
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTable(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));

Review Comment:
   nit:
   ```suggestion
           processor.checkAndClearProcessResult(
               new KeyValueTimestamp<>(0, "X0+Y0", 0),
               new KeyValueTimestamp<>(1, "X1+Y1", 1));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinWithGraceTest.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class KStreamKTableJoinWithGraceTest {
+    private final static KeyValueTimestamp<?, ?>[] EMPTY = new KeyValueTimestamp[0];
+
+    private final String streamTopic = "streamTopic";
+    private final String tableTopic = "tableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<Integer, String> inputTableTopic;
+    private final int[] expectedKeys = {0, 1, 2, 3};
+
+    private MockApiProcessor<Integer, String, Void, Void> processor;
+    private TopologyTestDriver driver;
+    private StreamsBuilder builder;
+    private final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+
+    @Before
+    public void setUp() {
+        builder = new StreamsBuilder();
+        processor = supplier.theCapturedProcessor();
+    }
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table(tableTopic, consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic(tableTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+    }
+
+    @After
+    public void cleanup() {
+        driver.close();
+    }
+
+    private void pushToStream(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputStreamTopic.pipeInput(expectedKeys[i], valuePrefix + expectedKeys[i], i);
+        }
+    }
+
+    private void pushToTable(final int messageCount, final String valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
+    private void pushNullValueToTable() {
+        for (int i = 0; i < 2; i++) {
+            inputTableTopic.pipeInput(expectedKeys[i], null);
+        }
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   I am missing assertions in this test. You should call the `init()` and `process()` methods directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1613044610

   Build failures are unrelated:
   ```
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithFrequentOffsetSyncs()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplicateSourceDefault()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testRestartReplication()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithFrequentOffsetSyncs()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testNoCheckpointsIfNoRecordsAreMirrored()
       Build / JDK 17 and Scala 2.13 / kafka.admin.ReassignPartitionsIntegrationTest.testAlterReassignmentThrottle(String).quorum=kraft
       Build / JDK 17 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOffsetTranslationBehindReplicationFlow()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithAutoOffsetSync()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOneWayReplicationWithAutoOffsetSync()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
       Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
       Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterSnapshotTest.testSnapshotsGenerated()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.tests.RelationalSmokeTestTest.verifySmokeTestLogic
   ```
   
   The looked into the two Kafka Streams related tests. `org.apache.kafka.streams.tests.RelationalSmokeTestTest.verifySmokeTestLogic` fails because of a file issue thrown by the JVM. I ran `org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()` locally but could not reproduce the issue after multiple runs.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239143225


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this test which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1234687152


##########
streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;

Review Comment:
   Also https://github.com/apache/kafka/pull/13878



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1598038798

   @cadonna I got around to updating the tests as well like you asked


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1235096344


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   > So if someone set the grace period to zero then we can evict everything from the buffer even after the grace period is changed.
   
   Are you saying that if I set the grace period > 0 and I run my application, then I stop my application and set the grace period == 0, and restart my application, my application will process records from my first run?
   Why should removing the grace period (grace period == null) behave differently?
   Actually we do not support topology updates. However, we can at least make equivalent changes behave equivalently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1235102765


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {

Review Comment:
   Another thought here: 
   What happens if I specify a grace period > 0 and run my application for a while. Then, I remove the grace period and again run my application for a while. Finally, I specify a grace period > 0 or even == 0 and again run my application for a while. Does my application in the last run process records that the first run wrote into the buffer? (Assuming the task is always assigned to the same node).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1244739834


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);

Review Comment:
   Sure that is just fine with me. I'll add a test adding records out of the grace period to



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   I added something to flush the buffer at the end of this test. Works just fine :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13855:
URL: https://github.com/apache/kafka/pull/13855#issuecomment-1611889559

   Hey @wcarlson5 did you mean to push a new commit to this PR since @cadonna and I last reviewed? Some of your replies mention having made changes but I don't see any. 
   
   Nothing blocking from my side, though I do think that the latest round of suggestions (including test coverage improvements and @cadonna 's suggestion for avoiding an unnecessary range query) would be good to incorporate either in this PR or the next one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13855: KAFKA-14936: Add Grace period logic to Stream Table Join (2/N)

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13855:
URL: https://github.com/apache/kafka/pull/13855#discussion_r1239142854


##########
streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java:
##########
@@ -189,7 +221,22 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter a buffer before being processed. Out of order records in the grace period will be processed in timestamp order. Late records, out of the grace period, will be executed right as they come in, if it is past the table history retention this could result in joins on the wrong version or a null join. Long gaps in stream side arriving records will cause records to be delayed in processing, even resulting in be processed out of the grace period window.

Review Comment:
   > if it is past the table history retention this could result in joins on the wrong version or a null join
   
   I think this should be "if it is past the table history retention this could result in a null join" instead? I don't think it's possible to join with the wrong version, the only possible issue is that it joins with a null instead.
   
   > even resulting in be processed out of the grace period window
   
   What does this mean?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -56,10 +77,59 @@ public void init(final ProcessorContext<K1, VOut> context) {
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics();
         droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
+                doJoin(record);
+            }
+            buffer.get().evictWhile(() -> true, this::emit);
+        }
+    }
+
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) {
+        final Record<K1, V1> record = new Record<>(toEmit.key(), toEmit.value(), toEmit.recordContext().timestamp())
+            .withHeaders(toEmit.recordContext().headers());
+        internalProcessorContext.setRecordContext(toEmit.recordContext());

Review Comment:
   Can you unset the record context (i.e., set it back to the original context) after finishing this step, in order to avoid confusion about what context is being used downstream? That's what we do with the suppress buffer, for example: https://github.com/apache/kafka/blob/1dbcb7da9e3625ec2078a82f84542a3127730fef/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java#L207-L215



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -112,6 +125,74 @@ private void pushNullValueToTable() {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), "Grace", grace)
+        ).process(supplier);
+        final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(

Review Comment:
   The reason this produces two output records is because the max timestamp seen so far is still 3, which means only records with timestamp 0 and 1 are emitted (timestamps 2 and 3 are still in the buffer). Can you add another step to this step which now produces a record with a larger timestamp and verifies that the records with timestamps 2 and 3 are emitted? There should be four of them, and they should be emitted in timestamp order which is different from the offset order that they arrived in.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org