You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/28 23:56:10 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #11812: KAFKA-12738: address minor followup and consolidate integration tests of PR #11787

ableegoldman commented on a change in pull request #11812:
URL: https://github.com/apache/kafka/pull/11812#discussion_r816342899



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -775,6 +781,70 @@ public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
         }
     }
 
+    @Test
+    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
+        final AtomicInteger noOutputExpected = new AtomicInteger(0);
+        final AtomicInteger outputExpected = new AtomicInteger(0);
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
+
+        streams = new KafkaStreamsNamedTopologyWrapper(props);
+        streams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
+
+        final NamedTopologyBuilder builder = streams.newNamedTopologyBuilder("topology_A");
+        builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1);

Review comment:
       Ah yeah I meant to add a second integration test variant that's `FromDifferentTopologies` instead of `WithinSameTopology` -- but I'm still working on it, so I didn't want to block this PR on it necessarily. But short answer is "yes", definitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org