You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2024/01/02 23:18:14 UTC

(kafka) branch 3.7 updated: KAFKA-16046: also fix stores for outer join (#15073)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new fe3bc9c709d KAFKA-16046: also fix stores for outer join (#15073)
fe3bc9c709d is described below

commit fe3bc9c709dded9920cad0e65812b747b964a649
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Tue Jan 2 15:07:46 2024 -0800

    KAFKA-16046: also fix stores for outer join (#15073)
    
    This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Lucas Brutschy <lb...@confluent.io>
---
 .../internals/KeyValueStoreMaterializer.java       |  2 +-
 .../internals/OuterStreamJoinStoreFactory.java     |  7 ++--
 .../streams/state/BuiltInDslStoreSuppliers.java    |  4 +-
 .../kafka/streams/state/DslKeyValueParams.java     | 17 ++++++--
 .../kafka/streams/state/DslWindowParams.java       |  7 +++-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  4 +-
 .../internals/KStreamKStreamOuterJoinTest.java     | 49 ++++++++++++++++++++++
 7 files changed, 77 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index b54ba531815..3d11c451429 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -45,7 +45,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
     @Override
     public StateStore build() {
         final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
-                ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
+                ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
                 : (KeyValueBytesStoreSupplier) materialized.storeSupplier();
 
         final StoreBuilder<?> builder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 123d1eade63..978bf39577c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -95,11 +95,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
         final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
         final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
 
+        final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
         final KeyValueBytesStoreSupplier supplier;
 
         if (passedInDslStoreSuppliers != null) {
             // case 1: dslStoreSuppliers was explicitly passed in
-            supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
+            supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams);
         } else if (streamJoined.thisStoreSupplier() != null) {
             // case 2: thisStoreSupplier was explicitly passed in, we match
             // the type for that one
@@ -110,12 +111,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
             } else {
                 // couldn't determine the type of bytes store for thisStoreSupplier,
                 // fallback to the default
-                supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+                supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
             }
         } else {
             // case 3: nothing was explicitly passed in, fallback to default which
             // was configured via either the TopologyConfig or StreamsConfig globally
-            supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+            supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
         }
 
         final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 71e7ac869fc..eedcafd7c79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -36,7 +36,9 @@ public class BuiltInDslStoreSuppliers {
 
         @Override
         public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
-            return Stores.persistentTimestampedKeyValueStore(params.name());
+            return params.isTimestamped()
+                    ? Stores.persistentTimestampedKeyValueStore(params.name())
+                    : Stores.persistentKeyValueStore(params.name());
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 1077da45fb3..7447d6c711f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -25,19 +25,26 @@ import java.util.Objects;
 public class DslKeyValueParams {
 
     private final String name;
+    private final boolean isTimestamped;
 
     /**
-     * @param name the name of the store (cannot be {@code null})
+     * @param name          the name of the store (cannot be {@code null})
+     * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
      */
-    public DslKeyValueParams(final String name) {
+    public DslKeyValueParams(final String name, final boolean isTimestamped) {
         Objects.requireNonNull(name);
         this.name = name;
+        this.isTimestamped = isTimestamped;
     }
 
     public String name() {
         return name;
     }
 
+    public boolean isTimestamped() {
+        return isTimestamped;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) {
@@ -47,18 +54,20 @@ public class DslKeyValueParams {
             return false;
         }
         final DslKeyValueParams that = (DslKeyValueParams) o;
-        return Objects.equals(name, that.name);
+        return isTimestamped == that.isTimestamped
+                && Objects.equals(name, that.name);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name);
+        return Objects.hash(name, isTimestamped);
     }
 
     @Override
     public String toString() {
         return "DslKeyValueParams{" +
                 "name='" + name + '\'' +
+                "isTimestamped=" + isTimestamped +
                 '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
index 672afc66e0d..9a7f0fe9b38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
@@ -108,7 +108,8 @@ public class DslWindowParams {
                 && Objects.equals(retentionPeriod, that.retentionPeriod)
                 && Objects.equals(windowSize, that.windowSize)
                 && Objects.equals(emitStrategy, that.emitStrategy)
-                && Objects.equals(isSlidingWindow, that.isSlidingWindow);
+                && Objects.equals(isSlidingWindow, that.isSlidingWindow)
+                && Objects.equals(isTimestamped, that.isTimestamped);
     }
 
     @Override
@@ -119,7 +120,8 @@ public class DslWindowParams {
                 windowSize,
                 retainDuplicates,
                 emitStrategy,
-                isSlidingWindow
+                isSlidingWindow,
+                isTimestamped
         );
     }
 
@@ -132,6 +134,7 @@ public class DslWindowParams {
                 ", retainDuplicates=" + retainDuplicates +
                 ", emitStrategy=" + emitStrategy +
                 ", isSlidingWindow=" + isSlidingWindow +
+                ", isTimestamped=" + isTimestamped +
                 '}';
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e059da4fa87..72422f067d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemorySessionStore;
 import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -1299,7 +1299,7 @@ public class StreamsBuilderTest {
         assertTypesForStateStore(topology.stateStores(),
                 InMemoryWindowStore.class,
                 RocksDBWindowStore.class,
-                RocksDBTimestampedStore.class);
+                RocksDBStore.class);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 8133e25ec4b..099dc5b0c83 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -33,8 +33,12 @@ import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -50,9 +54,11 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKStreamOuterJoinTest {
@@ -1308,4 +1314,47 @@ public class KStreamKStreamOuterJoinTest {
             new KeyValueTimestamp<>(0, "dummy+null", 1103L)
         );
     }
+
+    public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
+
+        final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>();
+
+        @Override
+        public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
+            final KeyValueBytesStoreSupplier result = super.keyValueStore(params);
+            capture.set(result);
+            return result;
+        }
+    }
+
+    @Test
+    public void shouldJoinWithNonTimestampedStore() {
+        final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
+        final StreamJoined<Integer, String, String> streamJoined =
+                StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+                        .withDslStoreSuppliers(suppliers);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+
+        joined = stream1.outerJoin(
+                stream2,
+                MockValueJoiner.TOSTRING_JOINER,
+                JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+                streamJoined
+        );
+        joined.process(supplier);
+
+        // create a TTD so that the topology gets built
+        try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
+            assertThat("Expected stream joined to supply builders that create non-timestamped stores",
+                    !WrappedStateStore.isTimestamped(suppliers.capture.get().get()));
+        }
+    }
 }