You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2022/05/14 00:21:40 UTC
[kafka] branch trunk updated: MINOR: Add upgrade tests for FK joins (#12122)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78dd40123cc MINOR: Add upgrade tests for FK joins (#12122)
78dd40123cc is described below
commit 78dd40123cc47e425264f516a4b8c6c2003c99b8
Author: Alex Sorokoumov <91...@users.noreply.github.com>
AuthorDate: Sat May 14 02:21:27 2022 +0200
MINOR: Add upgrade tests for FK joins (#12122)
Follow up PR for KAFKA-13769.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/tests/SmokeTestDriver.java | 104 ++++++++++++------
.../kafka/streams/tests/SmokeTestDriver.java | 113 ++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 112 ++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 114 ++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 101 ++++++++++++++----
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 112 ++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 112 ++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
.../kafka/streams/tests/SmokeTestDriver.java | 116 +++++++++++++++------
.../kafka/streams/tests/StreamsUpgradeTest.java | 45 ++++++--
tests/kafkatest/services/streams.py | 5 +-
.../tests/streams/streams_upgrade_test.py | 77 +++++++++-----
tests/kafkatest/version.py | 3 +
18 files changed, 962 insertions(+), 322 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..2bbb25db395 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -163,7 +173,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -183,7 +194,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value)
);
- producer.send(record, new TestCallback(record, needRetry));
+ producer.send(record, new TestCallback(record, dataNeedRetry));
+
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +215,60 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+
+ }
+ return Collections.unmodifiableMap(allData);
+ }
+
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println("retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
+ producer.flush();
+ needRetry = needRetry2;
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +359,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..5c4a8cd615b 100644
--- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -148,7 +164,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka);
-
int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>();
@@ -163,7 +178,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,15 +191,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
+ producer.send(record, new TestCallback(record, dataNeedRetry));
- producer.send(record, new TestCallback(record, needRetry));
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +217,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +360,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index c0c8c72c599..9d08663d9b3 100644
--- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
- System.out.println("[2.4] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.4] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
+ producer.send(record, new TestCallback(record, dataNeedRetry));
- producer.send(record, new TestCallback(record, needRetry));
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 0fea040bcb4..69c46de37af 100644
--- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
- System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.5] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..0e08771495f 100644
--- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
@@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value)
);
- producer.send(record, new TestCallback(record, needRetry));
+ producer.send(record, new TestCallback(record, dataNeedRetry));
+
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +220,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index e1b294ff15b..0844552134a 100644
--- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
- System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.6] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..ac7482cfb2d 100644
--- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
+ producer.send(record, new TestCallback(record, dataNeedRetry));
- producer.send(record, new TestCallback(record, needRetry));
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,21 +218,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
- }
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
@@ -227,6 +248,44 @@ public class SmokeTestDriver extends SmokeTestUtil {
return Collections.unmodifiableMap(allData);
}
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
+ }
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
+ }
+ }
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
+ }
+
private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@@ -315,14 +374,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 6f485e694cf..32d8d9408f5 100644
--- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
- System.out.println("[2.7] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.7] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
+ producer.send(record, new TestCallback(record, dataNeedRetry));
- producer.send(record, new TestCallback(record, needRetry));
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 4f2825d23d6..db17d73bcba 100644
--- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
@@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
- System.out.println("[2.8] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[2.8] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..4dae6eae575 100644
--- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
+ producer.send(record, new TestCallback(record, dataNeedRetry));
- producer.send(record, new TestCallback(record, needRetry));
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +218,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index b097de71d41..0751516d76c 100644
--- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataTable, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
@@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() {
+ private static void buildFKTable(final KTable<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
- System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[3.0] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
public void process(final Record<KIn, VIn> record) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index ac83cd95eba..dbacbb9625b 100644
--- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
@@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
- private static final String[] TOPICS = {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
"data",
"echo",
"max",
@@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg",
"tagg"
};
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+
+ private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@@ -130,9 +140,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
-
producer.send(record);
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@@ -148,7 +165,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka);
-
int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>();
@@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
- List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
@@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--;
data[index] = data[remaining];
} else {
-
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
@@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value)
);
- producer.send(record, new TestCallback(record, needRetry));
+ producer.send(record, new TestCallback(record, dataNeedRetry));
+
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+
+ producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++;
allData.get(key).add(value);
@@ -195,36 +220,59 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
producer.flush();
- int remainingRetries = 5;
- while (!needRetry.isEmpty()) {
- final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
- for (final ProducerRecord<byte[], byte[]> record : needRetry) {
- System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
- producer.send(record, new TestCallback(record, needRetry2));
- }
- producer.flush();
- needRetry = needRetry2;
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
- if (--remainingRetries == 0 && !needRetry.isEmpty()) {
- System.err.println("Failed to produce all records after multiple retries");
- Exit.exit(1);
- }
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " + keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
}
-
- // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
- // all suppressed records.
- final List<PartitionInfo> partitions = producer.partitionsFor("data");
- for (final PartitionInfo partition : partitions) {
- producer.send(new ProducerRecord<>(
- partition.topic(),
- partition.partition(),
- System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
- stringSerde.serializer().serialize("", "flush"),
- intSerde.serializer().serialize("", 0)
- ));
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after multiple retries");
+ Exit.exit(1);
}
}
- return Collections.unmodifiableMap(allData);
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
}
private static Properties generatorProperties(final String kafka) {
@@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
- final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
+ final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
- Stream.of(TOPICS)
+ Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
index 6657c5f2f23..311d30ba400 100644
--- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
+++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -16,11 +16,18 @@
*/
package org.apache.kafka.streams.tests;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
- final KStream dataStream = builder.stream("data");
- dataStream.process(printProcessorSupplier());
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo");
+ final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataStream, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
final Properties config = new Properties();
- config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
@@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
}));
}
- private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() {
+ private static void buildFKTable(final KStream<String, Integer> primaryTable,
+ final KTable<Integer, String> otherTable) {
+ final KStream<String, String> kStream = primaryTable.toTable()
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext<KOut, VOut> context) {
- System.out.println("[3.1] initializing processor: topic=data taskId=" + context.taskId());
+ System.out.println("[3.1] initializing processor: topic=" + topic + "taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
public void process(final Record<KIn, VIn> record) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
- System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+ System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index 5dedc579163..38b303281d2 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1
+from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1
STATE_DIR = "state.dir"
@@ -616,6 +616,9 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM
+ if (self.UPGRADE_FROM is not None and KafkaVersion(self.UPGRADE_FROM).supports_fk_joins()) or \
+ (self.KAFKA_STREAMS_VERSION is not None and KafkaVersion(self.KAFKA_STREAMS_VERSION).supports_fk_joins()):
+ properties['test.run_fk_join'] = "true"
if self.UPGRADE_TO == "future_version":
properties['test.future.metadata'] = "any_value"
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 57c89aa4c83..7639b33f1ed 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -37,6 +37,8 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
+ str(LATEST_3_0), str(LATEST_3_1)]
"""
After each release one should first check that the released version has been uploaded to
@@ -86,9 +88,11 @@ class StreamsUpgradeTest(Test):
self.topics = {
'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 },
+ 'fk' : { 'partitions': 5 },
}
- processed_msg = "processed [0-9]* records"
+ processed_data_msg = "processed [0-9]* records from topic=data"
+ processed_fk_msg = "processed [0-9]* records from topic=fk"
base_version_number = str(DEV_VERSION).split("-")[0]
def perform_broker_upgrade(self, to_version):
@@ -159,9 +163,9 @@ class StreamsUpgradeTest(Test):
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start()
- monitor.wait_until(self.processed_msg,
+ monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))
connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@@ -172,9 +176,9 @@ class StreamsUpgradeTest(Test):
timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
- stdout_monitor.wait_until(self.processed_msg,
+ stdout_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account))
+ err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))
# SmokeTestDriver allows up to 6 minutes to consume all
# records for the verification step so this timeout is set to
@@ -192,8 +196,12 @@ class StreamsUpgradeTest(Test):
@cluster(num_nodes=6)
@matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
@matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
- def test_metadata_upgrade(self, from_version, to_version):
+ @matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
+ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
"""
+ This test verifies that the cluster successfully upgrades despite changes in the metadata and FK
+ join protocols.
+
Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
"""
@@ -311,9 +319,14 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
- monitor.wait_until(self.processed_msg,
+ monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+ if KafkaVersion(version).supports_fk_joins():
+ monitor.wait_until(self.processed_fk_msg,
+ timeout_sec=60,
+ err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node1.account))
+
# start second with <version>
self.prepare_for(self.processor2, version)
@@ -325,12 +338,16 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
- first_monitor.wait_until(self.processed_msg,
+ first_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
- second_monitor.wait_until(self.processed_msg,
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+ second_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
+ if KafkaVersion(version).supports_fk_joins():
+ second_monitor.wait_until(self.processed_fk_msg,
+ timeout_sec=60,
+ err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
# start third with <version>
self.prepare_for(self.processor3, version)
@@ -343,15 +360,19 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
- first_monitor.wait_until(self.processed_msg,
+ first_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account))
- second_monitor.wait_until(self.processed_msg,
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
+ second_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account))
- third_monitor.wait_until(self.processed_msg,
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
+ third_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node3.account))
+ if KafkaVersion(version).supports_fk_joins():
+ third_monitor.wait_until(self.processed_fk_msg,
+ timeout_sec=60,
+ err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
@staticmethod
def prepare_for(processor, version):
@@ -381,12 +402,12 @@ class StreamsUpgradeTest(Test):
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop()
- first_other_monitor.wait_until(self.processed_msg,
+ first_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
- second_other_monitor.wait_until(self.processed_msg,
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
+ second_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
@@ -414,23 +435,23 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str,
timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
- first_other_monitor.wait_until(self.processed_msg,
+ first_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
- second_other_monitor.wait_until(self.processed_msg,
+ second_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
- monitor.wait_until(self.processed_msg,
+ monitor.wait_until(self.processed_data_msg,
timeout_sec=60,
- err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account))
+ err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node.account))
def do_rolling_bounce(self, processor, counter, current_generation):
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 3a01a5adf58..64f0bf2c53d 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -106,6 +106,9 @@ class KafkaVersion(LooseVersion):
# Self-managed clusters always support topic ID, so this method only applies to ZK clusters.
return self >= V_2_8_0
+ def supports_fk_joins(self):
+ return hasattr(self, "version") and self >= V_2_4_0
+
def get_version(node=None):
"""Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None)