You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2024/01/02 23:18:14 UTC
(kafka) branch 3.7 updated: KAFKA-16046: also fix stores for outer join (#15073)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new fe3bc9c709d KAFKA-16046: also fix stores for outer join (#15073)
fe3bc9c709d is described below
commit fe3bc9c709dded9920cad0e65812b747b964a649
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Tue Jan 2 15:07:46 2024 -0800
KAFKA-16046: also fix stores for outer join (#15073)
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Lucas Brutschy <lb...@confluent.io>
---
.../internals/KeyValueStoreMaterializer.java | 2 +-
.../internals/OuterStreamJoinStoreFactory.java | 7 ++--
.../streams/state/BuiltInDslStoreSuppliers.java | 4 +-
.../kafka/streams/state/DslKeyValueParams.java | 17 ++++++--
.../kafka/streams/state/DslWindowParams.java | 7 +++-
.../apache/kafka/streams/StreamsBuilderTest.java | 4 +-
.../internals/KStreamKStreamOuterJoinTest.java | 49 ++++++++++++++++++++++
7 files changed, 77 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index b54ba531815..3d11c451429 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -45,7 +45,7 @@ public class KeyValueStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
@Override
public StateStore build() {
final KeyValueBytesStoreSupplier supplier = materialized.storeSupplier() == null
- ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName()))
+ ? dslStoreSuppliers().keyValueStore(new DslKeyValueParams(materialized.storeName(), true))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 123d1eade63..978bf39577c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -95,11 +95,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
+ final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, false);
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
// case 1: dslStoreSuppliers was explicitly passed in
- supplier = passedInDslStoreSuppliers.keyValueStore(new DslKeyValueParams(name));
+ supplier = passedInDslStoreSuppliers.keyValueStore(dslKeyValueParams);
} else if (streamJoined.thisStoreSupplier() != null) {
// case 2: thisStoreSupplier was explicitly passed in, we match
// the type for that one
@@ -110,12 +111,12 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends AbstractConfigurable
} else {
// couldn't determine the type of bytes store for thisStoreSupplier,
// fallback to the default
- supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+ supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
} else {
// case 3: nothing was explicitly passed in, fallback to default which
// was configured via either the TopologyConfig or StreamsConfig globally
- supplier = dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name));
+ supplier = dslStoreSuppliers().keyValueStore(dslKeyValueParams);
}
final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index 71e7ac869fc..eedcafd7c79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -36,7 +36,9 @@ public class BuiltInDslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
- return Stores.persistentTimestampedKeyValueStore(params.name());
+ return params.isTimestamped()
+ ? Stores.persistentTimestampedKeyValueStore(params.name())
+ : Stores.persistentKeyValueStore(params.name());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 1077da45fb3..7447d6c711f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -25,19 +25,26 @@ import java.util.Objects;
public class DslKeyValueParams {
private final String name;
+ private final boolean isTimestamped;
/**
- * @param name the name of the store (cannot be {@code null})
+ * @param name the name of the store (cannot be {@code null})
+ * @param isTimestamped whether the returned stores should be timestamped, see ({@link TimestampedKeyValueStore}
*/
- public DslKeyValueParams(final String name) {
+ public DslKeyValueParams(final String name, final boolean isTimestamped) {
Objects.requireNonNull(name);
this.name = name;
+ this.isTimestamped = isTimestamped;
}
public String name() {
return name;
}
+ public boolean isTimestamped() {
+ return isTimestamped;
+ }
+
@Override
public boolean equals(final Object o) {
if (this == o) {
@@ -47,18 +54,20 @@ public class DslKeyValueParams {
return false;
}
final DslKeyValueParams that = (DslKeyValueParams) o;
- return Objects.equals(name, that.name);
+ return isTimestamped == that.isTimestamped
+ && Objects.equals(name, that.name);
}
@Override
public int hashCode() {
- return Objects.hash(name);
+ return Objects.hash(name, isTimestamped);
}
@Override
public String toString() {
return "DslKeyValueParams{" +
"name='" + name + '\'' +
+ "isTimestamped=" + isTimestamped +
'}';
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
index 672afc66e0d..9a7f0fe9b38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java
@@ -108,7 +108,8 @@ public class DslWindowParams {
&& Objects.equals(retentionPeriod, that.retentionPeriod)
&& Objects.equals(windowSize, that.windowSize)
&& Objects.equals(emitStrategy, that.emitStrategy)
- && Objects.equals(isSlidingWindow, that.isSlidingWindow);
+ && Objects.equals(isSlidingWindow, that.isSlidingWindow)
+ && Objects.equals(isTimestamped, that.isTimestamped);
}
@Override
@@ -119,7 +120,8 @@ public class DslWindowParams {
windowSize,
retainDuplicates,
emitStrategy,
- isSlidingWindow
+ isSlidingWindow,
+ isTimestamped
);
}
@@ -132,6 +134,7 @@ public class DslWindowParams {
", retainDuplicates=" + retainDuplicates +
", emitStrategy=" + emitStrategy +
", isSlidingWindow=" + isSlidingWindow +
+ ", isTimestamped=" + isTimestamped +
'}';
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e059da4fa87..72422f067d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
-import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -1299,7 +1299,7 @@ public class StreamsBuilderTest {
assertTypesForStateStore(topology.stateStores(),
InMemoryWindowStore.class,
RocksDBWindowStore.class,
- RocksDBTimestampedStore.class);
+ RocksDBStore.class);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
index 8133e25ec4b..099dc5b0c83 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java
@@ -33,8 +33,12 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
@@ -50,9 +54,11 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
public class KStreamKStreamOuterJoinTest {
@@ -1308,4 +1314,47 @@ public class KStreamKStreamOuterJoinTest {
new KeyValueTimestamp<>(0, "dummy+null", 1103L)
);
}
+
+ public static class CapturingStoreSuppliers extends BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers {
+
+ final AtomicReference<KeyValueBytesStoreSupplier> capture = new AtomicReference<>();
+
+ @Override
+ public KeyValueBytesStoreSupplier keyValueStore(final DslKeyValueParams params) {
+ final KeyValueBytesStoreSupplier result = super.keyValueStore(params);
+ capture.set(result);
+ return result;
+ }
+ }
+
+ @Test
+ public void shouldJoinWithNonTimestampedStore() {
+ final CapturingStoreSuppliers suppliers = new CapturingStoreSuppliers();
+ final StreamJoined<Integer, String, String> streamJoined =
+ StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())
+ .withDslStoreSuppliers(suppliers);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.outerJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100L)),
+ streamJoined
+ );
+ joined.process(supplier);
+
+ // create a TTD so that the topology gets built
+ try (final TopologyTestDriver ignored = new TopologyTestDriver(builder.build(PROPS), PROPS)) {
+ assertThat("Expected stream joined to supply builders that create non-timestamped stores",
+ !WrappedStateStore.isTimestamped(suppliers.capture.get().get()));
+ }
+ }
}