You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2022/09/30 15:50:44 UTC
[kafka] 01/02: tmp
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch repro-task-idling-problem
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 95847cf747c36dd538dddf5b0f9c47d8d92a952b
Author: John Roesler <jo...@vvcephei.org>
AuthorDate: Tue Sep 27 10:46:49 2022 -0500
tmp
---
.../KafkaStreamsCloseOptionsIntegrationTest.java | 32 ++++++++++++++++------
1 file changed, 23 insertions(+), 9 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index 8d3cb8e8795..90fa323e281 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@@ -152,6 +153,19 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
}
}
+ public void tmp() {
+ StreamsBuilder streamsBuilder = new StreamsBuilder();
+ KStream<Object, Object> stream = streamsBuilder.stream("inStream");
+ KTable<Object, Object> table = streamsBuilder.table("inTable");
+ KStream<Object, Object> result = stream.join(table, (l, r) -> new Object());
+ result.to("output");
+ Properties properties = new Properties();
+ properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+
+ KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
+ kafkaStreams.start();
+ }
+
@Test
public void testCloseOptions() throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
@@ -180,15 +194,15 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
private void add10InputElements() {
final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
- KeyValue.pair(1L, "bbb"),
- KeyValue.pair(0L, "ccc"),
- KeyValue.pair(1L, "ddd"),
- KeyValue.pair(0L, "eee"),
- KeyValue.pair(1L, "fff"),
- KeyValue.pair(0L, "ggg"),
- KeyValue.pair(1L, "hhh"),
- KeyValue.pair(0L, "iii"),
- KeyValue.pair(1L, "jjj"));
+ KeyValue.pair(1L, "bbb"),
+ KeyValue.pair(0L, "ccc"),
+ KeyValue.pair(1L, "ddd"),
+ KeyValue.pair(0L, "eee"),
+ KeyValue.pair(1L, "fff"),
+ KeyValue.pair(0L, "ggg"),
+ KeyValue.pair(1L, "hhh"),
+ KeyValue.pair(0L, "iii"),
+ KeyValue.pair(1L, "jjj"));
for (final KeyValue<Long, String> record : records) {
mockTime.sleep(10);