You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/09/30 15:50:44 UTC

[kafka] 01/02: tmp

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch repro-task-idling-problem
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 95847cf747c36dd538dddf5b0f9c47d8d92a952b
Author: John Roesler <jo...@vvcephei.org>
AuthorDate: Tue Sep 27 10:46:49 2022 -0500

    tmp
---
 .../KafkaStreamsCloseOptionsIntegrationTest.java   | 32 ++++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index 8d3cb8e8795..90fa323e281 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
@@ -152,6 +153,19 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
         }
     }
 
+    public void tmp() {
+        StreamsBuilder streamsBuilder = new StreamsBuilder();
+        KStream<Object, Object> stream = streamsBuilder.stream("inStream");
+        KTable<Object, Object> table = streamsBuilder.table("inTable");
+        KStream<Object, Object> result = stream.join(table, (l, r) -> new Object());
+        result.to("output");
+        Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+
+        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
+        kafkaStreams.start();
+    }
+
     @Test
     public void testCloseOptions() throws Exception {
         final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
@@ -180,15 +194,15 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
 
     private void add10InputElements() {
         final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
-            KeyValue.pair(1L, "bbb"),
-            KeyValue.pair(0L, "ccc"),
-            KeyValue.pair(1L, "ddd"),
-            KeyValue.pair(0L, "eee"),
-            KeyValue.pair(1L, "fff"),
-            KeyValue.pair(0L, "ggg"),
-            KeyValue.pair(1L, "hhh"),
-            KeyValue.pair(0L, "iii"),
-            KeyValue.pair(1L, "jjj"));
+                KeyValue.pair(1L, "bbb"),
+                KeyValue.pair(0L, "ccc"),
+                KeyValue.pair(1L, "ddd"),
+                KeyValue.pair(0L, "eee"),
+                KeyValue.pair(1L, "fff"),
+                KeyValue.pair(0L, "ggg"),
+                KeyValue.pair(1L, "hhh"),
+                KeyValue.pair(0L, "iii"),
+                KeyValue.pair(1L, "jjj"));
 
         for (final KeyValue<Long, String> record : records) {
             mockTime.sleep(10);