You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/17 14:58:48 UTC

[GitHub] [kafka] cadonna opened a new pull request #9191: [WIP] KAFKA-10355: PoC

cadonna opened a new pull request #9191:
URL: https://github.com/apache/kafka/pull/9191


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r471696179



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {
+
+    private final static long serialVersionUID = 1L;

Review comment:
       What's this for (or going to be for)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {

Review comment:
       I'm not sure this should extend StreamsException, my understanding is that it's generally reserved for Streams internal errors and not user code/setup issues




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-678074158


   @ableegoldman Yes, you are right there were checkstyle issues. Will fix them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-685407069


   Test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-683673488


   This is the implementation for KIP-662.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class HandlingSourceTopicDeletionTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final int NUM_THREADS = 2;
+    private static final long TIMEOUT = 60000;
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void before() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
+
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String appId = "app-" + safeTestName;
+
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+        streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+        final Topology topology = builder.build();
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
+
+        final AtomicBoolean calledUncaughtExceptionHandler = new AtomicBoolean(false);
+        kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler.set(true));
+        kafkaStreams.start();
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.RUNNING,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state RUNNING"
+        );
+
+        CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.ERROR,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state ERROR"
+        );

Review comment:
       I had that thought, too. But then I thought it is not required to test the code added in this PR. On the other hand, it does not harm to use two Streams clients and broaden the test scope since this is an integration test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472228811



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {

Review comment:
       That is a good point. I thought all exceptions thrown from inside Streams (except for the `IllegalStateException`) should be `StreamsException`s. The `RecordDeserializer` throws a `StreamsException` when the deserialization exception handler -- which is user code -- throws any exception. Maybe @guozhangwang can help here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472217989



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {
+
+    private final static long serialVersionUID = 1L;

Review comment:
       That is a good question. I have to admit that I blindly copied it from another exception class. The field has to do with exceptions implementing the `Serializable` interface. This field tells the JVM whether the serialized object can be deserialized into an object of the class that it is available in the JVM. See https://stackoverflow.com/questions/7187302/what-is-serialversionuid-in-java-normally-in-exception-class    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r472368768



##########
File path: streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class MissingSourceTopicException extends StreamsException {

Review comment:
       I agree with @cadonna here, that all exceptions from inside Streams should be inheriting from `StreamsException`, and that may include 1) environmental issues like timeout --- note that after @mjsax KIP we would not throw TimeoutException (which is a KafkaException not StreamsException) but a new exception inherited from StreamsException, and IOException which are also wrapped as StateStoreException inherited from StreamsException, and 2) user bug in `process` etc which get caught by streams code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9191:
URL: https://github.com/apache/kafka/pull/9191


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-686393019


   I added unit tests. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class HandlingSourceTopicDeletionTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final int NUM_THREADS = 2;
+    private static final long TIMEOUT = 60000;
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void before() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
+
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String appId = "app-" + safeTestName;
+
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+        streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+        final Topology topology = builder.build();
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
+
+        final AtomicBoolean calledUncaughtExceptionHandler = new AtomicBoolean(false);
+        kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler.set(true));
+        kafkaStreams.start();
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.RUNNING,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state RUNNING"
+        );
+
+        CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.ERROR,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state ERROR"
+        );

Review comment:
       I had that thought, too. But then I thought it is not required to test the code added in this PR. On the other hand, it does not harm to use two Streams clients and broaden the test scope. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-677852192


   Tests didn't run, possible checkstyle issue? Results already cleaned up unfortunately


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r482250207



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class HandlingSourceTopicDeletionTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final int NUM_THREADS = 2;
+    private static final long TIMEOUT = 60000;
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String OUTPUT_TOPIC = "outputTopic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Before
+    public void before() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
+
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String appId = "app-" + safeTestName;
+
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
+        streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+        final Topology topology = builder.build();
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
+
+        final AtomicBoolean calledUncaughtExceptionHandler = new AtomicBoolean(false);
+        kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler.set(true));
+        kafkaStreams.start();
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.RUNNING,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state RUNNING"
+        );
+
+        CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.ERROR,
+            TIMEOUT,
+            () -> "Kafka Streams application did not reach state ERROR"
+        );

Review comment:
       What do you think about expanding the test to verify that all members really get shut down, not just all the ones in an instance? It seems simple enough: just create and start two instances and then verify they both wind up in ERROR state.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9191: [WIP] KAFKA-10355: PoC

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#issuecomment-674933287


   FYI: @guozhangwang @ableegoldman @vvcephei @mjsax @abbccdda 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org