You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/10/07 00:07:50 UTC
[kafka] branch trunk updated: KAFKA-14209 : Integration tests 3/3 (#12676)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1cb7736de10 KAFKA-14209 : Integration tests 3/3 (#12676)
1cb7736de10 is described below
commit 1cb7736de1016d8d997a92a61488bc04ef75c6d2
Author: Vicky Papavasileiou <vp...@users.noreply.github.com>
AuthorDate: Fri Oct 7 01:07:34 2022 +0100
KAFKA-14209 : Integration tests 3/3 (#12676)
Tests for 21a15c6b1f1ee80f163633ba617ad381f5edc0c1
Implements KIP-862: https://cwiki.apache.org/confluence/x/WSf1D
Reviewer: John Roesler <vv...@apache.org>
---
.../integration/AbstractJoinIntegrationTest.java | 36 +++
.../SelfJoinUpgradeIntegrationTest.java | 256 +++++++++++++++++++++
.../StreamStreamJoinIntegrationTest.java | 39 ++++
3 files changed, 331 insertions(+)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index d41cec04406..fdc8525bd42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -105,6 +105,17 @@ public abstract class AbstractJoinIntegrationTest {
new Input<>(INPUT_TOPIC_LEFT, "D")
);
+ private final List<Input<String>> leftInput = Arrays.asList(
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_LEFT, "A"),
+ new Input<>(INPUT_TOPIC_LEFT, "B"),
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_LEFT, "C"),
+ new Input<>(INPUT_TOPIC_LEFT, null),
+ new Input<>(INPUT_TOPIC_LEFT, "D")
+ );
+
+
final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
final boolean cacheEnabled;
@@ -204,6 +215,31 @@ public abstract class AbstractJoinIntegrationTest {
}
}
+ void runSelfJoinTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult) {
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
+ final TestInputTopic<Long, String> left = driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
+ final TestOutputTopic<Long, String> outputTopic = driver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new StringDeserializer());
+
+ final long firstTimestamp = time.milliseconds();
+ long eventTimestamp = firstTimestamp;
+ final Iterator<List<TestRecord<Long, String>>> resultIterator = expectedResult.iterator();
+ for (final Input<String> singleInputRecord : leftInput) {
+ left.pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp);
+
+ final List<TestRecord<Long, String>> expected = resultIterator.next();
+ if (expected != null) {
+ final List<TestRecord<Long, String>> updatedExpected = new LinkedList<>();
+ for (final TestRecord<Long, String> record : expected) {
+ updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, firstTimestamp + record.timestamp()));
+ }
+
+ final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
+ assertThat(output, equalTo(updatedExpected));
+ }
+ }
+ }
+ }
+
private void checkQueryableStore(final String queryableName, final TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) {
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore(queryableName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
new file mode 100644
index 00000000000..aaa791aa1dd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import static java.time.Duration.ofMinutes;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({IntegrationTest.class})
+public class SelfJoinUpgradeIntegrationTest {
+ public static final String INPUT_TOPIC = "selfjoin-input";
+ public static final String OUTPUT_TOPIC = "selfjoin-output";
+ private String inputTopic;
+ private String outputTopic;
+
+ private KafkaStreams kafkaStreams;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+ @BeforeClass
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterClass
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Before
+ public void createTopics() throws Exception {
+ inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+ outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+ CLUSTER.createTopic(inputTopic);
+ CLUSTER.createTopic(outputTopic);
+ }
+
+ private Properties props() {
+ final Properties streamsConfiguration = new Properties();
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
+ Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
+ Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return streamsConfiguration;
+ }
+
+ @After
+ public void shutdown() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ kafkaStreams.cleanUp();
+ }
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
+
+ final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+ final KStream<String, String> leftOld = streamsBuilderOld.stream(
+ inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+ final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+ final KStream<String, String> joinedOld = leftOld.join(
+ leftOld,
+ valueJoiner,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+ );
+ joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+ final Properties props = props();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION);
+ kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+ kafkaStreams.start();
+
+ final long currentTime = CLUSTER.time.milliseconds();
+ processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+ new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L)));
+
+ processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+ new KeyValueTimestamp("1", "BA", currentTime + 43L),
+ new KeyValueTimestamp("1", "AB", currentTime + 43L),
+ new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+ kafkaStreams.start();
+
+ final long currentTimeNew = CLUSTER.time.milliseconds();
+
+ processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
+ new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldRestartWithTopologyOptimizationOn() throws Exception {
+
+ final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
+ final KStream<String, String> leftOld = streamsBuilderOld.stream(
+ inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
+ final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+ final KStream<String, String> joinedOld = leftOld.join(
+ leftOld,
+ valueJoiner,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMinutes(100))
+ );
+ joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+
+ final Properties props = props();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+ kafkaStreams.start();
+
+ final long currentTime = CLUSTER.time.milliseconds();
+ processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
+ new KeyValueTimestamp("1", "AA", currentTime + 42L)));
+
+ processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
+ new KeyValueTimestamp("1", "BA", currentTime + 43L),
+ new KeyValueTimestamp("1", "AB", currentTime + 43L),
+ new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+ kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
+ kafkaStreams.start();
+
+ final long currentTimeNew = CLUSTER.time.milliseconds();
+
+ processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
+ new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
+ new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+
+ kafkaStreams.close();
+ }
+
+
+ private <K, V> boolean processKeyValueAndVerifyCount(
+ final K key,
+ final V value,
+ final long timestamp,
+ final List<KeyValueTimestamp<K, V>> expected)
+ throws Exception {
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputTopic,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp);
+
+
+ final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
+ consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProperties.put(StreamsConfig.WINDOW_SIZE_MS_CONFIG, 500L);
+
+
+ final List<KeyValueTimestamp<K, V>> actual =
+ IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
+ consumerProperties,
+ outputTopic,
+ expected.size(),
+ 60 * 1000);
+
+ assertThat(actual, is(expected));
+
+ return actual.equals(expected);
+ }
+
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 49818e71a45..fc0d2213d2d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -65,6 +65,45 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
rightStream = builder.stream(INPUT_TOPIC_RIGHT);
}
+ @Test
+ public void testSelfJoin() {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-selfJoin");
+ STREAMS_CONFIG.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
+
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-A", null, 2L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-A", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-B", null, 3L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-B", null, 3L)),
+ null,
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-A", null, 5L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-B", null, 5L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-C", null, 5L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-C", null, 5L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-C", null, 5L)),
+ null,
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-A", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-B", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-C", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-D", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-D", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-D", null, 7L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-D", null, 7L))
+ );
+
+ leftStream.join(
+ leftStream,
+ valueJoiner,
+ JoinWindows.ofTimeDifferenceAndGrace(ofSeconds(10), ofHours(24))
+ ).to(OUTPUT_TOPIC);
+
+ runSelfJoinTestWithDriver(expectedResult);
+ }
+
@Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");