You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:06:23 UTC
[kafka] 02/02: KAFKA-9925: decorate pseudo-topics with app id
(#8574)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d1ae75d5f9222da42b1a396f4dc1f01e3df0ef83
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Apr 29 16:17:34 2020 -0500
KAFKA-9925: decorate pseudo-topics with app id (#8574)
Reviewers: Boyang Chen <bo...@confluent.io>, Kin Siu
---
.../streams/kstream/internals/KTableImpl.java | 24 +++-
.../foreignkeyjoin/CombinedKeySchema.java | 17 ++-
...reignJoinSubscriptionSendProcessorSupplier.java | 17 ++-
.../SubscriptionResolverJoinProcessorSupplier.java | 10 +-
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 30 +++--
.../internals/InternalTopologyBuilder.java | 4 +
...KTableKTableForeignKeyJoinDefaultSerdeTest.java | 74 ++++++++++-
.../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 ---------------------
.../foreignkeyjoin/CombinedKeySchemaTest.java | 30 +++--
...scriptionResolverJoinProcessorSupplierTest.java | 12 +-
.../SubscriptionWrapperSerdeTest.java | 8 +-
11 files changed, 176 insertions(+), 188 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 75cdffa..cc7d8f2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -75,6 +75,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
@@ -953,13 +954,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues();
+ final NamedInternal renamed = new NamedInternal(joinName);
+
+ final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+ "-subscription-registration",
+ builder,
+ SUBSCRIPTION_REGISTRATION
+ ) + TOPIC_SUFFIX;
+ // the decoration can't be performed until we have the configuration available when the app runs,
+ // so we pass Suppliers into the components, which they can call at run time
+
+ final Supplier<String> subscriptionPrimaryKeySerdePseudoTopic =
+ () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-pk");
+
+ final Supplier<String> subscriptionForeignKeySerdePseudoTopic =
+ () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-fk");
+
+ final Supplier<String> valueHashSerdePseudoTopic =
+ () -> internalTopologyBuilder().decoratePseudoTopic(subscriptionTopicName + "-vh");
- final NamedInternal renamed = new NamedInternal(joinName);
- final String subscriptionTopicName = renamed.suffixWithOrElseGet("-subscription-registration", builder, SUBSCRIPTION_REGISTRATION) + TOPIC_SUFFIX;
- final String subscriptionPrimaryKeySerdePseudoTopic = subscriptionTopicName + "-pk";
- final String subscriptionForeignKeySerdePseudoTopic = subscriptionTopicName + "-fk";
- final String valueHashSerdePseudoTopic = subscriptionTopicName + "-vh";
builder.internalTopologyBuilder.addInternalTopic(subscriptionTopicName);
final Serde<KO> foreignKeySerde = ((KTableImpl<KO, VO, ?>) foreignKeyTable).keySerde;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
index 92fb72c..57bc646 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java
@@ -23,24 +23,27 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.nio.ByteBuffer;
+import java.util.function.Supplier;
/**
* Factory for creating CombinedKey serializers / deserializers.
*/
public class CombinedKeySchema<KO, K> {
- private final String primaryKeySerdeTopic;
- private final String foreignKeySerdeTopic;
+ private final Supplier<String> undecoratedPrimaryKeySerdeTopicSupplier;
+ private final Supplier<String> undecoratedForeignKeySerdeTopicSupplier;
+ private String primaryKeySerdeTopic;
+ private String foreignKeySerdeTopic;
private Serializer<K> primaryKeySerializer;
private Deserializer<K> primaryKeyDeserializer;
private Serializer<KO> foreignKeySerializer;
private Deserializer<KO> foreignKeyDeserializer;
- public CombinedKeySchema(final String foreignKeySerdeTopic,
+ public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
final Serde<KO> foreignKeySerde,
- final String primaryKeySerdeTopic,
+ final Supplier<String> primaryKeySerdeTopicSupplier,
final Serde<K> primaryKeySerde) {
- this.primaryKeySerdeTopic = primaryKeySerdeTopic;
- this.foreignKeySerdeTopic = foreignKeySerdeTopic;
+ undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier;
+ undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
primaryKeyDeserializer = primaryKeySerde == null ? null : primaryKeySerde.deserializer();
foreignKeyDeserializer = foreignKeySerde == null ? null : foreignKeySerde.deserializer();
@@ -49,6 +52,8 @@ public class CombinedKeySchema<KO, K> {
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
+ primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
+ foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
index ba794f7..9787875 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
import static org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
@@ -43,21 +44,21 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private static final Logger LOG = LoggerFactory.getLogger(ForeignJoinSubscriptionSendProcessorSupplier.class);
private final Function<V, KO> foreignKeyExtractor;
- private final String foreignKeySerdeTopic;
- private final String valueSerdeTopic;
+ private final Supplier<String> foreignKeySerdeTopicSupplier;
+ private final Supplier<String> valueSerdeTopicSupplier;
private final boolean leftJoin;
private Serializer<KO> foreignKeySerializer;
private Serializer<V> valueSerializer;
public ForeignJoinSubscriptionSendProcessorSupplier(final Function<V, KO> foreignKeyExtractor,
- final String foreignKeySerdeTopic,
- final String valueSerdeTopic,
+ final Supplier<String> foreignKeySerdeTopicSupplier,
+ final Supplier<String> valueSerdeTopicSupplier,
final Serde<KO> foreignKeySerde,
final Serializer<V> valueSerializer,
final boolean leftJoin) {
this.foreignKeyExtractor = foreignKeyExtractor;
- this.foreignKeySerdeTopic = foreignKeySerdeTopic;
- this.valueSerdeTopic = valueSerdeTopic;
+ this.foreignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
+ this.valueSerdeTopicSupplier = valueSerdeTopicSupplier;
this.valueSerializer = valueSerializer;
this.leftJoin = leftJoin;
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
@@ -71,11 +72,15 @@ public class ForeignJoinSubscriptionSendProcessorSupplier<K, KO, V> implements P
private class UnbindChangeProcessor extends AbstractProcessor<K, Change<V>> {
private Sensor droppedRecordsSensor;
+ private String foreignKeySerdeTopic;
+ private String valueSerdeTopic;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ foreignKeySerdeTopic = foreignKeySerdeTopicSupplier.get();
+ valueSerdeTopic = valueSerdeTopicSupplier.get();
// get default key serde if it wasn't supplied directly at construction
if (foreignKeySerializer == null) {
foreignKeySerializer = (Serializer<KO>) context.keySerde().serializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 31de068..3cd0636 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
+import java.util.function.Supplier;
+
/**
* Receives {@code SubscriptionResponseWrapper<VO>} events and filters out events which do not match the current hash
* of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
@@ -42,18 +44,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
- private final String valueHashSerdePseudoTopic;
+ private final Supplier<String> valueHashSerdePseudoTopicSupplier;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;
public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
- final String valueHashSerdePseudoTopic,
+ final Supplier<String> valueHashSerdePseudoTopicSupplier,
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
constructionTimeValueSerializer = valueSerializer;
- this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
+ this.valueHashSerdePseudoTopicSupplier = valueHashSerdePseudoTopicSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@@ -61,6 +63,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@Override
public Processor<K, SubscriptionResponseWrapper<VO>> get() {
return new AbstractProcessor<K, SubscriptionResponseWrapper<VO>>() {
+ private String valueHashSerdePseudoTopic;
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
private KTableValueGetter<K, V> valueGetter;
@@ -69,6 +72,7 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
valueGetter = valueGetterSupplier.get();
valueGetter.init(context);
if (runtimeValueSerializer == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index 42aed94..136128c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -25,16 +25,17 @@ import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.function.Supplier;
public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>> {
private final SubscriptionWrapperSerializer<K> serializer;
private final SubscriptionWrapperDeserializer<K> deserializer;
- public SubscriptionWrapperSerde(final String primaryKeySerializationPseudoTopic,
+ public SubscriptionWrapperSerde(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
final Serde<K> primaryKeySerde) {
- serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopic,
+ serializer = new SubscriptionWrapperSerializer<>(primaryKeySerializationPseudoTopicSupplier,
primaryKeySerde == null ? null : primaryKeySerde.serializer());
- deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopic,
+ deserializer = new SubscriptionWrapperDeserializer<>(primaryKeySerializationPseudoTopicSupplier,
primaryKeySerde == null ? null : primaryKeySerde.deserializer());
}
@@ -51,12 +52,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
private static class SubscriptionWrapperSerializer<K>
implements Serializer<SubscriptionWrapper<K>>, WrappingNullableSerializer<SubscriptionWrapper<K>, K> {
- private final String primaryKeySerializationPseudoTopic;
+ private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+ private String primaryKeySerializationPseudoTopic = null;
private Serializer<K> primaryKeySerializer;
- SubscriptionWrapperSerializer(final String primaryKeySerializationPseudoTopic,
+ SubscriptionWrapperSerializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
final Serializer<K> primaryKeySerializer) {
- this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+ this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
this.primaryKeySerializer = primaryKeySerializer;
}
@@ -76,6 +78,10 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
throw new UnsupportedVersionException("SubscriptionWrapper version is larger than maximum supported 0x7F");
}
+ if (primaryKeySerializationPseudoTopic == null) {
+ primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+ }
+
final byte[] primaryKeySerializedData = primaryKeySerializer.serialize(
primaryKeySerializationPseudoTopic,
data.getPrimaryKey()
@@ -106,12 +112,13 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
private static class SubscriptionWrapperDeserializer<K>
implements Deserializer<SubscriptionWrapper<K>>, WrappingNullableDeserializer<SubscriptionWrapper<K>, K> {
- private final String primaryKeySerializationPseudoTopic;
+ private final Supplier<String> primaryKeySerializationPseudoTopicSupplier;
+ private String primaryKeySerializationPseudoTopic = null;
private Deserializer<K> primaryKeyDeserializer;
- SubscriptionWrapperDeserializer(final String primaryKeySerializationPseudoTopic,
+ SubscriptionWrapperDeserializer(final Supplier<String> primaryKeySerializationPseudoTopicSupplier,
final Deserializer<K> primaryKeyDeserializer) {
- this.primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopic;
+ this.primaryKeySerializationPseudoTopicSupplier = primaryKeySerializationPseudoTopicSupplier;
this.primaryKeyDeserializer = primaryKeyDeserializer;
}
@@ -144,6 +151,11 @@ public class SubscriptionWrapperSerde<K> implements Serde<SubscriptionWrapper<K>
final byte[] primaryKeyRaw = new byte[data.length - lengthSum]; //The remaining data is the serialized pk
buf.get(primaryKeyRaw, 0, primaryKeyRaw.length);
+
+ if (primaryKeySerializationPseudoTopic == null) {
+ primaryKeySerializationPseudoTopic = primaryKeySerializationPseudoTopicSupplier.get();
+ }
+
final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerializationPseudoTopic,
primaryKeyRaw);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index f36324d..41123b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1203,6 +1203,10 @@ public class InternalTopologyBuilder {
return decoratedTopics;
}
+ public String decoratePseudoTopic(final String topic) {
+ return decorateTopic(topic);
+ }
+
private String decorateTopic(final String topic) {
if (applicationId == null) {
throw new TopologyException("there are internal topics and "
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
index 5af0530..64c6b06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDefaultSerdeTest.java
@@ -24,12 +24,14 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
@@ -38,10 +40,15 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class KTableKTableForeignKeyJoinDefaultSerdeTest {
+
@Test
public void shouldWorkWithDefaultSerdes() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -161,7 +168,72 @@ public class KTableKTableForeignKeyJoinDefaultSerdeTest {
validateTopologyCanProcessData(builder);
}
- private static void validateTopologyCanProcessData(final StreamsBuilder builder) {
+ @Test
+ public void shouldUseExpectedTopicsWithSerde() {
+ final String applicationId = "ktable-ktable-joinOnForeignKey";
+ final Properties streamsConfig = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+ ));
+
+ final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final String leftTable = "left_table";
+ final String rightTable = "right_table";
+ final String output = "output-topic";
+
+ final KTable<String, String> left = builder.table(
+ leftTable,
+ Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ );
+ final KTable<String, String> right = builder.table(
+ rightTable,
+ Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ );
+
+ left.join(
+ right,
+ value -> value.split("\\|")[1],
+ (value1, value2) -> "(" + value1 + "," + value2 + ")",
+ Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+ ))
+ .toStream()
+ .to(output);
+
+
+ final Topology topology = builder.build(streamsConfig);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> leftInput = driver.createInputTopic(leftTable, new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> rightInput = driver.createInputTopic(rightTable, new StringSerializer(), new StringSerializer());
+ leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+ rightInput.pipeInput("rhs1", "rhsValue1");
+ }
+ // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+ // topics our serdes serialize data for
+ assertThat(serdeScope.registeredTopics(), is(mkSet(
+ // expected pseudo-topics
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+ // internal topics
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+ applicationId + "-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+ applicationId + "-left_table-STATE-STORE-0000000000-changelog--key",
+ applicationId + "-left_table-STATE-STORE-0000000000-changelog--value",
+ applicationId + "-right_table-STATE-STORE-0000000003-changelog--key",
+ applicationId + "-right_table-STATE-STORE-0000000003-changelog--value",
+ // output topics
+ "output-topic--key",
+ "output-topic--value"
+ )));
+ }
+
+ private void validateTopologyCanProcessData(final StreamsBuilder builder) {
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "dummy-" + UUID.randomUUID());
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
deleted file mode 100644
index b08c293..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-
-public class KTableKTableForeignKeyJoinPseudoTopicTest {
-
- private static final String LEFT_TABLE = "left_table";
- private static final String RIGHT_TABLE = "right_table";
- private static final String OUTPUT = "output-topic";
- private static final String REJOIN_OUTPUT = "rejoin-output-topic";
- private final Properties streamsConfig = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
- mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
- ));
-
-
- private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
- List<Object[]> result = new LinkedList<>();
- result.add(new Object[0]);
-
- for (final List<?> argOption : argOptions) {
- result = times(result, argOption);
- }
-
- return result;
- }
-
- private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
- final List<Object[]> result = new LinkedList<>();
- for (final Object[] args : left) {
- for (final Object rightElem : right) {
- final Object[] resArgs = new Object[args.length + 1];
- System.arraycopy(args, 0, resArgs, 0, args.length);
- resArgs[args.length] = rightElem;
- result.add(resArgs);
- }
- }
- return result;
- }
-
-
- @Test
- public void shouldUseExpectedTopicsWithSerde() {
- final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
- final StreamsBuilder builder = new StreamsBuilder();
-
- final KTable<String, String> left = builder.table(
- LEFT_TABLE,
- Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
- serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
- );
- final KTable<String, String> right = builder.table(
- RIGHT_TABLE,
- Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
- serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
- );
-
- left.join(
- right,
- value -> value.split("\\|")[1],
- (value1, value2) -> "(" + value1 + "," + value2 + ")",
- Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
- ))
- .toStream()
- .to(OUTPUT);
-
-
- final Topology topology = builder.build(streamsConfig);
- try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
- final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
- final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
- leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
- rightInput.pipeInput("rhs1", "rhsValue1");
- }
- // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
- // topics our serdes serialize data for
- assertThat(serdeScope.registeredTopics(), is(mkSet(
- // expected pseudo-topics
- "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
- "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
- "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
- // internal topics
- "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
- "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
- "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
- "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
- "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
- "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
- "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
- // output topics
- "output-topic--key",
- "output-topic--value"
- )));
- }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
index cb1ef59..17f0c79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchemaTest.java
@@ -28,8 +28,10 @@ public class CombinedKeySchemaTest {
@Test
public void nonNullPrimaryKeySerdeTest() {
- final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
- "pkTopic", Serdes.Integer());
+ final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+ () -> "fkTopic", Serdes.String(),
+ () -> "pkTopic", Serdes.Integer()
+ );
final Integer primary = -999;
final Bytes result = cks.toBytes("foreignKey", primary);
@@ -40,22 +42,28 @@ public class CombinedKeySchemaTest {
@Test(expected = NullPointerException.class)
public void nullPrimaryKeySerdeTest() {
- final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
- "pkTopic", Serdes.Integer());
+ final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+ () -> "fkTopic", Serdes.String(),
+ () -> "pkTopic", Serdes.Integer()
+ );
cks.toBytes("foreignKey", null);
}
@Test(expected = NullPointerException.class)
public void nullForeignKeySerdeTest() {
- final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
- "pkTopic", Serdes.Integer());
+ final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+ () -> "fkTopic", Serdes.String(),
+ () -> "pkTopic", Serdes.Integer()
+ );
cks.toBytes(null, 10);
}
@Test
public void prefixKeySerdeTest() {
- final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
- "pkTopic", Serdes.Integer());
+ final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+ () -> "fkTopic", Serdes.String(),
+ () -> "pkTopic", Serdes.Integer()
+ );
final String foreignKey = "someForeignKey";
final byte[] foreignKeySerializedData =
Serdes.String().serializer().serialize("fkTopic", foreignKey);
@@ -71,8 +79,10 @@ public class CombinedKeySchemaTest {
@Test(expected = NullPointerException.class)
public void nullPrefixKeySerdeTest() {
- final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>("fkTopic", Serdes.String(),
- "pkTopic", Serdes.Integer());
+ final CombinedKeySchema<String, Integer> cks = new CombinedKeySchema<>(
+ () -> "fkTopic", Serdes.String(),
+ () -> "pkTopic", Serdes.Integer()
+ );
final String foreignKey = null;
cks.prefixBytes(foreignKey);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index aae99ec..b95569f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,7 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -107,7 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -132,7 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -158,7 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -184,7 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -210,7 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
- "value-hash-dummy-topic",
+ () -> "value-hash-dummy-topic",
JOINER,
leftJoin
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
index 5c6551c..dd67b4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerdeTest.java
@@ -31,7 +31,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked")
public void shouldSerdeTest() {
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -46,7 +46,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked")
public void shouldSerdeNullHashTest() {
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = null;
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
final byte[] serialized = swSerde.serializer().serialize(null, wrapper);
@@ -61,7 +61,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullKeyTest() {
final String originalKey = null;
- final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, SubscriptionWrapper.Instruction.PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, originalKey);
swSerde.serializer().serialize(null, wrapper);
@@ -71,7 +71,7 @@ public class SubscriptionWrapperSerdeTest {
@SuppressWarnings("unchecked")
public void shouldThrowExceptionOnNullInstructionTest() {
final String originalKey = "originalKey";
- final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>("pkTopic", Serdes.String());
+ final SubscriptionWrapperSerde swSerde = new SubscriptionWrapperSerde<>(() -> "pkTopic", Serdes.String());
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0xFF, (byte) 0xAA, (byte) 0x00, (byte) 0x19});
final SubscriptionWrapper wrapper = new SubscriptionWrapper<>(hashedValue, null, originalKey);
swSerde.serializer().serialize(null, wrapper);