You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:30:16 UTC
[kafka] 01/03: MINOR: reuse pseudo-topic in FKJoin (#8296)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6314ca6e971ee850932b663d08f7fd976c81d441
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 13 23:04:14 2020 -0500
MINOR: reuse pseudo-topic in FKJoin (#8296)
Reuse the same pseudo-topic for serializing the LHS value in the foreign-key join resolver as
we originally used to serialize it before sending the subscription request.
Reviewers: Boyang Chen <bo...@confluent.io>
---
.../streams/kstream/internals/KTableImpl.java | 1 +
.../SubscriptionResolverJoinProcessorSupplier.java | 11 +-
.../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 +++++++++++++++++++++
...scriptionResolverJoinProcessorSupplierTest.java | 6 +
.../kafka/streams/utils/UniqueTopicSerdeScope.java | 6 +
5 files changed, 155 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 941a866..c91c475 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1084,6 +1084,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
primaryKeyValueGetter,
valSerde == null ? null : valSerde.serializer(),
+ valueHashSerdePseudoTopic,
joiner,
leftJoin
);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 8fa77aa..31de068 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -42,15 +42,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
private final Serializer<V> constructionTimeValueSerializer;
+ private final String valueHashSerdePseudoTopic;
private final ValueJoiner<V, VO, VR> joiner;
private final boolean leftJoin;
public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
final Serializer<V> valueSerializer,
+ final String valueHashSerdePseudoTopic,
final ValueJoiner<V, VO, VR> joiner,
final boolean leftJoin) {
this.valueGetterSupplier = valueGetterSupplier;
constructionTimeValueSerializer = valueSerializer;
+ this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
this.joiner = joiner;
this.leftJoin = leftJoin;
}
@@ -83,15 +86,9 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
}
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(key);
- //We are unable to access the actual source topic name for the valueSerializer at runtime, without
- //tightly coupling to KTableRepartitionProcessorSupplier.
- //While we can use the source topic from where the events came from, we shouldn't serialize against it
- //as it causes problems with the confluent schema registry, which requires each topic have only a single
- //registered schema.
- final String dummySerializationTopic = context().topic() + "-join-resolver";
final long[] currentHash = currentValueWithTimestamp == null ?
null :
- Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+ Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
final long[] messageHash = value.getOriginalValueHash();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
new file mode 100644
index 0000000..b08c293
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+public class KTableKTableForeignKeyJoinPseudoTopicTest {
+
+ private static final String LEFT_TABLE = "left_table";
+ private static final String RIGHT_TABLE = "right_table";
+ private static final String OUTPUT = "output-topic";
+ private static final String REJOIN_OUTPUT = "rejoin-output-topic";
+ private final Properties streamsConfig = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+ ));
+
+
+ private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+ List<Object[]> result = new LinkedList<>();
+ result.add(new Object[0]);
+
+ for (final List<?> argOption : argOptions) {
+ result = times(result, argOption);
+ }
+
+ return result;
+ }
+
+ private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+ final List<Object[]> result = new LinkedList<>();
+ for (final Object[] args : left) {
+ for (final Object rightElem : right) {
+ final Object[] resArgs = new Object[args.length + 1];
+ System.arraycopy(args, 0, resArgs, 0, args.length);
+ resArgs[args.length] = rightElem;
+ result.add(resArgs);
+ }
+ }
+ return result;
+ }
+
+
+ @Test
+ public void shouldUseExpectedTopicsWithSerde() {
+ final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KTable<String, String> left = builder.table(
+ LEFT_TABLE,
+ Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ );
+ final KTable<String, String> right = builder.table(
+ RIGHT_TABLE,
+ Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+ serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+ );
+
+ left.join(
+ right,
+ value -> value.split("\\|")[1],
+ (value1, value2) -> "(" + value1 + "," + value2 + ")",
+ Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+ ))
+ .toStream()
+ .to(OUTPUT);
+
+
+ final Topology topology = builder.build(streamsConfig);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+ final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+ leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+ rightInput.pipeInput("rhs1", "rhsValue1");
+ }
+ // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+ // topics our serdes serialize data for
+ assertThat(serdeScope.registeredTopics(), is(mkSet(
+ // expected pseudo-topics
+ "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+ "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+ "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+ // internal topics
+ "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+ "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+ "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+ "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
+ "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
+ "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
+ "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
+ // output topics
+ "output-topic--key",
+ "output-topic--value"
+ )));
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index 3ec19de..aae99ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,6 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -106,6 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -130,6 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -155,6 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -180,6 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
@@ -205,6 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
new SubscriptionResolverJoinProcessorSupplier<>(
valueGetterSupplier,
STRING_SERIALIZER,
+ "value-hash-dummy-topic",
JOINER,
leftJoin
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
index 04b1a7b..c385187 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -41,6 +43,10 @@ public class UniqueTopicSerdeScope {
return decorator;
}
+ public Set<String> registeredTopics() {
+ return Collections.unmodifiableSet(topicTypeRegistry.keySet());
+ }
+
public class UniqueTopicSerdeDecorator<T> implements Serde<T> {
private final AtomicBoolean isKey = new AtomicBoolean(false);
private final Serde<T> delegate;