You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/11/02 17:20:58 UTC
[kafka] branch 2.6 updated: KAFKA-10454 / (2.6) Update
copartitionSourceGroups when optimization algorithm is triggered (#9468)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 0e8fcb6 KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered (#9468)
0e8fcb6 is described below
commit 0e8fcb61c20c6c7d3c08269a30245822886f466f
Author: Levani Kokhreidze <le...@transferwise.com>
AuthorDate: Mon Nov 2 19:14:36 2020 +0200
KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered (#9468)
cherry-picking #9237 on 2.6 branch
Reviewers: Bill Bejeck <bb...@apache.org>
---
.../kstream/internals/InternalStreamsBuilder.java | 5 +
.../internals/InternalTopologyBuilder.java | 12 +-
...bleJoinTopologyOptimizationIntegrationTest.java | 256 +++++++++++++++++++++
3 files changed, 272 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index e6aa2af..1d10ab6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -387,6 +387,11 @@ public class InternalStreamsBuilder implements InternalNameProvider {
}
repartitionNodeToBeReplaced.clearChildren();
+ // if replaced repartition node is part of any copartition group,
+ // we need to update it with the new node name so that co-partitioning won't break.
+ internalTopologyBuilder.maybeUpdateCopartitionSourceGroups(repartitionNodeToBeReplaced.nodeName(),
+ optimizedSingleRepartition.nodeName());
+
LOG.debug("Updated node {} children {}", optimizedSingleRepartition, optimizedSingleRepartition.children());
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9844341..a3b6aa5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -626,7 +626,17 @@ public class InternalTopologyBuilder {
}
public final void copartitionSources(final Collection<String> sourceNodes) {
- copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
+ copartitionSourceGroups.add(new HashSet<>(sourceNodes));
+ }
+
+ public final void maybeUpdateCopartitionSourceGroups(final String replacedNodeName,
+ final String optimizedNodeName) {
+ for (final Set<String> copartitionSourceGroup : copartitionSourceGroups) {
+ if (copartitionSourceGroup.contains(replacedNodeName)) {
+ copartitionSourceGroup.remove(replacedNodeName);
+ copartitionSourceGroup.add(optimizedNodeName);
+ }
+ }
}
public void validateCopartition() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
new file mode 100644
index 0000000..2852622
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(value = Parameterized.class)
+@Category({IntegrationTest.class})
+public class StreamTableJoinTopologyOptimizationIntegrationTest {
+ private static final int NUM_BROKERS = 1;
+
+ @ClassRule
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ private String tableTopic;
+ private String inputTopic;
+ private String outputTopic;
+ private String applicationId;
+
+ private Properties streamsConfiguration;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameterized.Parameter
+ public String topologyOptimization;
+
+ @Parameterized.Parameters(name = "Optimization = {0}")
+ public static Collection<?> topologyOptimization() {
+ return Arrays.asList(new String[][]{
+ {StreamsConfig.OPTIMIZE},
+ {StreamsConfig.NO_OPTIMIZATION}
+ });
+ }
+
+ @Before
+ public void before() throws InterruptedException {
+ streamsConfiguration = new Properties();
+
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+
+ tableTopic = "table-topic" + safeTestName;
+ inputTopic = "stream-topic-" + safeTestName;
+ outputTopic = "output-topic-" + safeTestName;
+ applicationId = "app-" + safeTestName;
+
+ CLUSTER.createTopic(inputTopic, 4, 1);
+ CLUSTER.createTopic(tableTopic, 2, 1);
+ CLUSTER.createTopic(outputTopic, 4, 1);
+
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, topologyOptimization);
+ }
+
+ @After
+ public void whenShuttingDown() throws IOException {
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ }
+
+ @Test
+ public void shouldDoStreamTableJoinWithDifferentNumberOfPartitions() throws Exception {
+ final String storeName = "store";
+ final String selectKeyName = "selectKey";
+
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ final KStream<Integer, String> stream = streamsBuilder.stream(inputTopic);
+ final KTable<Integer, String> table = streamsBuilder.table(tableTopic, Materialized.as(storeName));
+
+ stream
+ .selectKey((key, value) -> key, Named.as(selectKeyName))
+ .join(table, (value1, value2) -> value2)
+ .to(outputTopic);
+
+ startStreams(streamsBuilder);
+
+ final long timestamp = System.currentTimeMillis();
+
+ final List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+ new KeyValue<>(1, "A"),
+ new KeyValue<>(2, "B")
+ );
+
+ sendEvents(inputTopic, timestamp, expectedRecords);
+ sendEvents(outputTopic, timestamp, expectedRecords);
+
+ startStreams(streamsBuilder);
+
+ validateReceivedMessages(
+ outputTopic,
+ new IntegerDeserializer(),
+ new StringDeserializer(),
+ expectedRecords
+ );
+
+ final Set<String> allTopicsInCluster = CLUSTER.getAllTopicsInCluster();
+
+ final String repartitionTopicName = applicationId + "-" + selectKeyName + "-repartition";
+ final String tableChangelogStoreName = applicationId + "-" + storeName + "-changelog";
+
+ assertTrue(topicExists(repartitionTopicName));
+ assertEquals(2, getNumberOfPartitionsForTopic(repartitionTopicName));
+
+ if (StreamsConfig.OPTIMIZE.equals(topologyOptimization)) {
+ assertFalse(allTopicsInCluster.contains(tableChangelogStoreName));
+ } else if (StreamsConfig.NO_OPTIMIZATION.equals(topologyOptimization)) {
+ assertTrue(allTopicsInCluster.contains(tableChangelogStoreName));
+ }
+ }
+
+ private KafkaStreams startStreams(final StreamsBuilder builder) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(streamsConfiguration), streamsConfiguration);
+
+ kafkaStreams.setStateListener((newState, oldState) -> {
+ if (KafkaStreams.State.REBALANCING == oldState && KafkaStreams.State.RUNNING == newState) {
+ latch.countDown();
+ }
+ });
+
+ kafkaStreams.start();
+
+ latch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ return kafkaStreams;
+ }
+
+ private int getNumberOfPartitionsForTopic(final String topic) throws Exception {
+ try (final AdminClient adminClient = createAdminClient()) {
+ final TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topic))
+ .values()
+ .get(topic)
+ .get(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+
+ return topicDescription.partitions().size();
+ }
+ }
+
+ private boolean topicExists(final String topic) {
+ return CLUSTER.getAllTopicsInCluster().contains(topic);
+ }
+
+ private <K, V> void sendEvents(final String topic,
+ final long timestamp,
+ final List<KeyValue<K, V>> events) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ topic,
+ events,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ ),
+ timestamp
+ );
+ }
+
+ private <K, V> void validateReceivedMessages(final String topic,
+ final Deserializer<K> keySerializer,
+ final Deserializer<V> valueSerializer,
+ final List<KeyValue<K, V>> expectedRecords) throws Exception {
+
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
+ consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ keySerializer.getClass().getName()
+ );
+ consumerProperties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ valueSerializer.getClass().getName()
+ );
+
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ consumerProperties,
+ topic,
+ expectedRecords
+ );
+ }
+
+ private static AdminClient createAdminClient() {
+ final Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+ return AdminClient.create(properties);
+ }
+}