You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/04/29 22:06:22 UTC

[kafka] 01/02: MINOR: reuse pseudo-topic in FKJoin (#8296)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e3a8223e3c511a8a17bac1b60b1b9ef1c2b4c9de
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Mar 13 23:04:14 2020 -0500

    MINOR: reuse pseudo-topic in FKJoin (#8296)
    
    Reuse the same pseudo-topic for serializing the LHS value in the foreign-key join resolver as
    we originally used to serialize it before sending the subscription request.
    
    Reviewers: Boyang Chen <bo...@confluent.io>
---
 .../streams/kstream/internals/KTableImpl.java      |   1 +
 .../SubscriptionResolverJoinProcessorSupplier.java |  11 +-
 .../KTableKTableForeignKeyJoinPseudoTopicTest.java | 138 +++++++++++++++++++++
 ...scriptionResolverJoinProcessorSupplierTest.java |   6 +
 .../kafka/streams/utils/UniqueTopicSerdeScope.java |   6 +
 5 files changed, 155 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index ddff497..75cdffa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -1085,6 +1085,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
         final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
             primaryKeyValueGetter,
             valSerde == null ? null : valSerde.serializer(),
+            valueHashSerdePseudoTopic,
             joiner,
             leftJoin
         );
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
index 8fa77aa..31de068 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java
@@ -42,15 +42,18 @@ import org.apache.kafka.streams.state.internals.Murmur3;
 public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>> {
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
     private final Serializer<V> constructionTimeValueSerializer;
+    private final String valueHashSerdePseudoTopic;
     private final ValueJoiner<V, VO, VR> joiner;
     private final boolean leftJoin;
 
     public SubscriptionResolverJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
                                                      final Serializer<V> valueSerializer,
+                                                     final String valueHashSerdePseudoTopic,
                                                      final ValueJoiner<V, VO, VR> joiner,
                                                      final boolean leftJoin) {
         this.valueGetterSupplier = valueGetterSupplier;
         constructionTimeValueSerializer = valueSerializer;
+        this.valueHashSerdePseudoTopic = valueHashSerdePseudoTopic;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
     }
@@ -83,15 +86,9 @@ public class SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR> implements
                 }
                 final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(key);
 
-                //We are unable to access the actual source topic name for the valueSerializer at runtime, without
-                //tightly coupling to KTableRepartitionProcessorSupplier.
-                //While we can use the source topic from where the events came from, we shouldn't serialize against it
-                //as it causes problems with the confluent schema registry, which requires each topic have only a single
-                //registered schema.
-                final String dummySerializationTopic = context().topic() + "-join-resolver";
                 final long[] currentHash = currentValueWithTimestamp == null ?
                     null :
-                    Murmur3.hash128(runtimeValueSerializer.serialize(dummySerializationTopic, currentValueWithTimestamp.value()));
+                    Murmur3.hash128(runtimeValueSerializer.serialize(valueHashSerdePseudoTopic, currentValueWithTimestamp.value()));
 
                 final long[] messageHash = value.getOriginalValueHash();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
new file mode 100644
index 0000000..b08c293
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinPseudoTopicTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+public class KTableKTableForeignKeyJoinPseudoTopicTest {
+
+    private static final String LEFT_TABLE = "left_table";
+    private static final String RIGHT_TABLE = "right_table";
+    private static final String OUTPUT = "output-topic";
+    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
+    private final Properties streamsConfig = mkProperties(mkMap(
+        mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "ktable-ktable-joinOnForeignKey"),
+        mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+        mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath())
+    ));
+
+
+    private static Collection<Object[]> buildParameters(final List<?>... argOptions) {
+        List<Object[]> result = new LinkedList<>();
+        result.add(new Object[0]);
+
+        for (final List<?> argOption : argOptions) {
+            result = times(result, argOption);
+        }
+
+        return result;
+    }
+
+    private static List<Object[]> times(final List<Object[]> left, final List<?> right) {
+        final List<Object[]> result = new LinkedList<>();
+        for (final Object[] args : left) {
+            for (final Object rightElem : right) {
+                final Object[] resArgs = new Object[args.length + 1];
+                System.arraycopy(args, 0, resArgs, 0, args.length);
+                resArgs[args.length] = rightElem;
+                result.add(resArgs);
+            }
+        }
+        return result;
+    }
+
+
+    @Test
+    public void shouldUseExpectedTopicsWithSerde() {
+        final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder.table(
+            LEFT_TABLE,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+        final KTable<String, String> right = builder.table(
+            RIGHT_TABLE,
+            Consumed.with(serdeScope.decorateSerde(Serdes.String(), streamsConfig, true),
+                          serdeScope.decorateSerde(Serdes.String(), streamsConfig, false))
+        );
+
+        left.join(
+            right,
+            value -> value.split("\\|")[1],
+            (value1, value2) -> "(" + value1 + "," + value2 + ")",
+            Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), streamsConfig, false)
+            ))
+            .toStream()
+            .to(OUTPUT);
+
+
+        final Topology topology = builder.build(streamsConfig);
+        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
+            final TestInputTopic<String, String> leftInput = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
+            final TestInputTopic<String, String> rightInput = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
+            leftInput.pipeInput("lhs1", "lhsValue1|rhs1");
+            rightInput.pipeInput("rhs1", "rhsValue1");
+        }
+        // verifying primarily that no extra pseudo-topics were used, but it's nice to also verify the rest of the
+        // topics our serdes serialize data for
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
+            // expected pseudo-topics
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            "KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            // internal topics
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic--key",
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--key",
+            "ktable-ktable-joinOnForeignKey-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic--value",
+            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--key",
+            "ktable-ktable-joinOnForeignKey-left_table-STATE-STORE-0000000000-changelog--value",
+            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--key",
+            "ktable-ktable-joinOnForeignKey-right_table-STATE-STORE-0000000003-changelog--value",
+            // output topics
+            "output-topic--key",
+            "output-topic--value"
+        )));
+    }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index 3ec19de..aae99ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -82,6 +82,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -106,6 +107,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -130,6 +132,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -155,6 +158,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -180,6 +184,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
@@ -205,6 +210,7 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
             new SubscriptionResolverJoinProcessorSupplier<>(
                 valueGetterSupplier,
                 STRING_SERIALIZER,
+                "value-hash-dummy-topic",
                 JOINER,
                 leftJoin
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
index 04b1a7b..c385187 100644
--- a/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
+++ b/streams/src/test/java/org/apache/kafka/streams/utils/UniqueTopicSerdeScope.java
@@ -21,8 +21,10 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -41,6 +43,10 @@ public class UniqueTopicSerdeScope {
         return decorator;
     }
 
+    public Set<String> registeredTopics() {
+        return Collections.unmodifiableSet(topicTypeRegistry.keySet());
+    }
+
     public class UniqueTopicSerdeDecorator<T> implements Serde<T> {
         private final AtomicBoolean isKey = new AtomicBoolean(false);
         private final Serde<T> delegate;