You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/09/30 15:50:45 UTC

[kafka] 02/02: MINOR: Add TaskIdlingIntegrationTest

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch repro-task-idling-problem
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 48f6c042315bffcbb575d8c416b5156c36908382
Author: John Roesler <jo...@vvcephei.org>
AuthorDate: Fri Sep 30 10:50:05 2022 -0500

    MINOR: Add TaskIdlingIntegrationTest
---
 .../integration/TaskIdlingIntegrationTest.java     | 241 +++++++++++++++++++++
 1 file changed, 241 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java
new file mode 100644
index 00000000000..f531273ab9e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskIdlingIntegrationTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.api.*;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.hamcrest.Matchers;
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
+
+import static java.time.Duration.ofSeconds;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+@Category({IntegrationTest.class})
+public class TaskIdlingIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private final static int NUM_BROKERS = 1;
+
+    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final static MockTime MOCK_TIME = CLUSTER.time;
+    private final static String STREAM_1 = "STREAM_1";
+    private final static String STREAM_2 = "STREAM_2";
+    private final static String STREAM_3 = "STREAM_3";
+    private final static String STREAM_4 = "STREAM_4";
+    private final Properties streamsConfig = Utils.mkProperties(
+        Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "TaskIdlingIT"),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+            Utils.mkEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+        )
+    );
+    private static Properties CONSUMER_CONFIG;
+    private static Properties PRODUCER_CONFIG_1;
+
+    @BeforeClass
+    public static void startCluster() throws IOException, InterruptedException {
+        CLUSTER.start();
+        //Use multiple partitions to ensure distribution of keys.
+        CONSUMER_CONFIG = Utils.mkProperties(
+            Utils.mkMap(
+                Utils.mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                Utils.mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()),
+                Utils.mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName())
+            )
+        );
+
+        PRODUCER_CONFIG_1 = Utils.mkProperties(
+            Utils.mkMap(
+                Utils.mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                Utils.mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()),
+                Utils.mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName())
+            )
+        );
+
+        CLUSTER.createTopic(STREAM_1, 1, 1);
+        CLUSTER.createTopic(STREAM_2, 1, 1);
+        CLUSTER.createTopic(STREAM_3, 1, 1);
+        CLUSTER.createTopic(STREAM_4, 1, 1);
+
+        try (final Producer<Object, Object> producer = new KafkaProducer<>(PRODUCER_CONFIG_1)) {
+            String[] inputs = {STREAM_1, STREAM_2, STREAM_3};
+            for (int i = 0; i < 10_000; i++) {
+                for (String input : inputs) {
+                    producer.send(
+                        new ProducerRecord<>(
+                            input,
+                            null,
+                            ((Time) MOCK_TIME).milliseconds(),
+                            i,
+                            i,
+                            null
+                        )
+                    );
+                    ((Time) MOCK_TIME).sleep(1L);
+                }
+            }
+            producer.flush();
+        }
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws IOException {
+        final String stateDirBasePath = TestUtils.tempDirectory().getPath();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + "-1");
+    }
+
+    @After
+    public void after() throws IOException {
+        IntegrationTestUtils.purgeLocalStreamsState(Collections.singletonList(streamsConfig));
+    }
+
+    @Test
+    public void shouldInnerJoinMultiPartitionQueryable() throws Exception {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final KStream<Integer, Integer> stream1 =
+            streamsBuilder.stream(STREAM_1, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream2 =
+            streamsBuilder.stream(STREAM_2, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+        final KStream<Integer, Integer> stream3 =
+            streamsBuilder.stream(STREAM_3, Consumed.with(Serdes.Integer(), Serdes.Integer()));
+
+        final KStream<Integer, Integer> merge = stream1.merge(stream2).merge(stream3);
+        final ConcurrentLinkedDeque<KeyValue<Integer, Integer>> mapSeen = new ConcurrentLinkedDeque<>();
+        final KStream<Integer, Integer> map = merge.map((key, value) -> {
+            KeyValue<Integer, Integer> keyValue = new KeyValue<>(key, value);
+            mapSeen.offer(keyValue);
+            return keyValue;
+        });
+        streamsBuilder.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("STORE"),
+                Serdes.Integer(),
+                Serdes.Integer()
+            )
+        );
+        final ConcurrentLinkedDeque<Record<Integer, Integer>> processSeen = new ConcurrentLinkedDeque<>();
+        KStream<Integer, String> process = map.process(
+            () -> new ContextualProcessor<Integer, Integer, Integer, String>() {
+
+                private KeyValueStore<Integer, Integer> store;
+
+                @Override
+                public void init(ProcessorContext<Integer, String> context) {
+                    super.init(context);
+                    store = context.getStateStore("STORE");
+                }
+
+                @Override
+                public void process(Record<Integer, Integer> record) {
+                    processSeen.offer(record);
+                    store.put(record.key(), record.value());
+                    String topic = String.format(
+                        "%s %d %d",
+                        context().recordMetadata().get().topic(),
+                        context().recordMetadata().get().partition(),
+                        record.timestamp()
+                    );
+                    context().forward(record.withValue(topic));
+                }
+            },
+            "STORE"
+        );
+
+        process.to(STREAM_4, Produced.with(Serdes.Integer(), Serdes.String()));
+
+        final ArrayList<ConsumerRecord<Integer, String>> consumerSeen = new ArrayList<>(10_000 * 3);
+
+        try (
+            final KafkaStreams runningStreams = IntegrationTestUtils.getRunningStreams(streamsConfig, streamsBuilder, true);
+            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(CONSUMER_CONFIG);
+        ) {
+            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+            List<TopicPartition> partitions =
+                topics
+                    .get(STREAM_4)
+                    .stream()
+                    .map(info -> new TopicPartition(info.topic(), info.partition()))
+                    .collect(Collectors.toList());
+
+            consumer.assign(partitions);
+            consumer.seekToBeginning(partitions);
+
+            while (consumerSeen.size() < (10_000 * 3)) {
+                ConsumerRecords<Integer, String> poll = consumer.poll(Duration.ofMillis(100L));
+                for (ConsumerRecord<Integer, String> record : poll) {
+                    System.out.println(record.key() + " " + record.value());
+                    consumerSeen.add(record);
+                }
+            }
+        }
+
+        long lastTimestamp = -1;
+        int consumeIdx = 0;
+        for (int i = 0; i < 10_000; i++) {
+            for (int j = 0; j < 3; j++) {
+                assertThat(mapSeen.poll().key, Matchers.is(i));
+                Record<Integer, Integer> processRecord = processSeen.poll();
+                assertThat(processRecord.key(), Matchers.is(i));
+                assertThat(processRecord.timestamp(), Matchers.greaterThan(lastTimestamp));
+                lastTimestamp = processRecord.timestamp();
+                ConsumerRecord<Integer, String> consumerRecord = consumerSeen.get(consumeIdx++);
+                assertThat(consumerRecord.key(), Matchers.is(i));
+            }
+        }
+    }
+
+}