You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/14 20:58:29 UTC

[GitHub] [kafka] jnh5y opened a new pull request, #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

jnh5y opened a new pull request, #12408:
URL: https://github.com/apache/kafka/pull/12408

   * Addresses issues with `KafkaStreams.close(CloseOptions)`.
   * Adds an integration test for this new functionality.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925862925


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   Oh. Good, find. I thought/expected that the thread were close first, not afterwards...
   
   But there is only one admin client that is shared over all threads, so why can't we keep admin open, send the "remove" request, and close admin afterwards (to avoid creating a new admin)?
   
   I am also fine if each thread removes only itself, after it called consumer.close().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925894827


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   Ok, I moved leaving the CG into the shutdown helper.  I think that'll do it...  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925808463


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

Review Comment:
   Actually just changed my mind and have set it to 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926217527


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   Changing the result consumer config fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925966739


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1474,7 +1482,7 @@ private void closeToError() {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in " + state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true);
+            final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, false);

Review Comment:
   What `timeoutMs`?  
   
   There's no `timeoutMs` in scope.
   
   The method `closeToError` is called a few places; not sure if we could add a timeout to each of them...



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1521,43 +1529,35 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+        return close(timeoutMs, options.leaveGroup);

Review Comment:
   +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r923925602


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {

Review Comment:
   nit: remove comment



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

Review Comment:
   Because we actively remove members from the group, why do we need a timeout? Could we set timeout to zero?
   
   In the end, a static member would be removed by the broker after `session.timeout.ms` passed -- we could also set a very high/infinit (`MAX_VALUE`) session timeout to ensure members are not removed by the broker but by the explicit "remove" command we send.



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Review Comment:
   Might be good to add a comment why we want to test with 2 threads (I understand why, but somebody else might miss context).



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   Guess we could remove the map() step and just copy data from input to output? In the end, we just want to verify that we are running.



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey

Review Comment:
   seems there is no `groupby` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926562692


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);

Review Comment:
   Added a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924785404


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   I tried `input.to(OUTPUT_TOPIC, ...)` but that broke the test.
   
   Is this ok as-is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925804187


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

Review Comment:
   Added a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925809931


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   Ok, I think I figured it out....
   
   There was a little non-determinism in the `close()` call.  
   
   The first approach I was using removed the consumers from the CG.  I *think* that if the StreamThreads were closed quickly enough, everything was fine.  If not, the StreamThreads re-subscribed to the CG and then we had to wait for the SESSION_TIMEOUT in order for the CG to be empty again.
   
   I've updated the `close(CloseOptions)` method to close the StreamThreads first.  This means that `close` has to get a new Admin in order to remove things from the CG.
   
   The only other option I could see would be pushing the work of leaving the CG to each StreamThread.  Maybe that's a better approach.  What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926285132


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);

Review Comment:
   Would be worth a comment to explain why this setting is important



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120_000L);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));

Review Comment:
   ```suggestion
           IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(30));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120_000L);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofMillis(30_000L)));

Review Comment:
   ```suggestion
           streams.close(new CloseOptions().leaveGroup(true).timeout(Duration. ofSeconds(30)));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120_000L);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));

Review Comment:
   Jenkins can be slow; 2 sec is very low and might make the test flaky. We usually use 30sec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926216992


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

Review Comment:
   Nicely done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r921566920


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {

Review Comment:
   I'm not wild about this IT as written.  I copied from the `AbstractResetIntegrationTest` and I'd be happy to hear a suggestion on how to make a more minimal test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924957846


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   @mjsax until we figure this out, we shouldn't merge this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926113971


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofMillis(1000)));

Review Comment:
   We should give a little more head room for the "remove" request to finish. 1sec might make the test flaky on Jenkins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12408:
URL: https://github.com/apache/kafka/pull/12408#issuecomment-1191570949

   Thanks for the fix. Merged to `trunk` and cherry-picked to `3.3` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925804504


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {

Review Comment:
   +1; done.



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey

Review Comment:
   Removed the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925895182


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1474,7 +1482,7 @@ private void closeToError() {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in " + state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true);
+            final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, false);

Review Comment:
   Of all the refactoring, this is probably the "scariest".  @mjsax thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925960794


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1431,6 +1434,10 @@ private Thread shutdownHelper(final boolean error) {
     }
 
     private boolean close(final long timeoutMs) {

Review Comment:
   Good catch.  Removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925972749


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   I tried changing it to 
   ```        input.to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));```
   and the test hung on my laptop.
   
   Am I doing something silly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925805004


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);

Review Comment:
   I've set it to MAX_VALUE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925862925


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   Oh. Good, find. I thought/expected that the thread were close first, not afterwards...
   
   But there is only one admin client that is shared over all threads, so why can't we keep admin open, send the "remove" request, and close admin afterwards (to avoid creating a new admin)? Guess not idea to "split" the logic and close admin outside of the centralized "private close()" method.
   
   I am also fine if each thread removes only itself, after it called consumer.close(). Might even be more elegant if it's not too clumsy to implement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax merged PR #12408:
URL: https://github.com/apache/kafka/pull/12408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925971253


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));

Review Comment:
   Ok, we need to pick a value that will allow for states to transition and for a blocking call to leave the CG to return.  If we are low, the test may be flaky.
   
   Does 1 second seem reasonable?  Or should it be higher?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926113246


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1516,48 +1520,41 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
      * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
      */
     public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
+        Objects.requireNonNull(options);

Review Comment:
   ```suggestion
           Objects.requireNonNull(options, "options cannot be null");
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);

Review Comment:
   ```suggestion
           CLUSTER.deleteAllTopicsAndWait(120_000L);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   Where does it hang? Did you also change the _result_ consumer config so it can deserialize the output data on read?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1474,7 +1482,7 @@ private void closeToError() {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in " + state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true);
+            final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, false);

Review Comment:
   Ah. My bad. I thought this is `close(timeout)`...
   
   But if we `closeOnError` we don't send "remove" request and thus it should be fine as it's unused? Could we make it an `Optional` or `Long` (and pass `null`)? 



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

Review Comment:
   The error message is that we violate a broker max config setting. We should add
   ```
           brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), Integer.toString(Integer.MAX_VALUE));
   ```
   inside the `static` block that creates `EmbeddedKafkaCluster`
   
   This allows us to set `MAX_VALUE` here (btw: no need to call `toString()`



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofMillis(1000)));

Review Comment:
   We should give a little more head room for the "remove" request to finish. 1sec might make the test flaky on Jenkins. -- Guess we need to align this to `session.timeout.ms` though (which should be larger to ensure the group members are not removed by the broker...)



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofMillis(1000)));

Review Comment:
   ```suggestion
           streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofMillis(30_000L)));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925952015


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1431,6 +1434,10 @@ private Thread shutdownHelper(final boolean error) {
     }
 
     private boolean close(final long timeoutMs) {

Review Comment:
   Do we still need this helper? Seems it's called only in a single place?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1521,43 +1529,35 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+        return close(timeoutMs, options.leaveGroup);

Review Comment:
   Seems we are missing a non-null check for `options`? (Unrelated to this PR, but we should just fix it on the side -- should be the first line of code in this method.)



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

Review Comment:
   Should we set this to `MAX_VALUE` ?



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   What about this minor cleanup?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1474,7 +1482,7 @@ private void closeToError() {
         if (!setState(State.PENDING_ERROR)) {
             log.info("Skipping shutdown since we are already in " + state());
         } else {
-            final Thread shutdownThread = shutdownHelper(true);
+            final Thread shutdownThread = shutdownHelper(true, Long.MAX_VALUE, false);

Review Comment:
   Why can't we pass `timeoutMs`?



##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
+        CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        // Test with two threads to show that each of the threads is being called to remove clients from the CG.
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(2));
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));

Review Comment:
   I think we should pass in a timeout via `CloseOptions` to overwrite default MAX_VALUE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r925969588


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 10000;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));

Review Comment:
   That change is breaking things on my laptop.
   
   The KafkaStreams application doesn't start in time...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924838158


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+    private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+    protected static final int TIMEOUT_MULTIPLIER = 15;
+
+    protected Properties streamsConfig;
+    protected static KafkaStreams streams;
+    protected static Admin adminClient;
+    protected Properties commonClientConfig;
+    private Properties producerConfig;
+    protected Properties resultConsumerConfig;
+
+    public static final EmbeddedKafkaCluster CLUSTER;
+
+    static {
+        final Properties brokerProps = new Properties();
+        CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+    }
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    @Before
+    public void before() throws Exception {
+        mockTime = CLUSTER.time;
+
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+        commonClientConfig = new Properties();
+        commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+
+        streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT));
+        streamsConfig.putAll(commonClientConfig);
+
+        producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.putAll(commonClientConfig);
+
+        resultConsumerConfig = new Properties();
+        resultConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        resultConsumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        resultConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        resultConsumerConfig.putAll(commonClientConfig);
+
+        if (adminClient == null) {
+            adminClient = Admin.create(commonClientConfig);
+        }
+
+        CLUSTER.deleteAllTopicsAndWait(120000);
+        CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);
+
+        add10InputElements();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (streams != null) {
+            streams.close(Duration.ofSeconds(30));
+        }
+    }
+
+    @Test
+    public void testCloseOptions() throws Exception {
+        final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance");
+        streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
+
+        // RUN
+        streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
+        streams.start();
+        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10);
+
+        streams.close(new CloseOptions().leaveGroup(true));
+        waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * STREAMS_CONSUMER_TIMEOUT);
+    }
+
+    protected Topology setupTopologyWithoutIntermediateUserTopic() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
+
+        // use map to trigger internal re-partitioning before groupByKey
+        input.map((key, value) -> new KeyValue<>(key, key))

Review Comment:
   Did you upadate the serdes?
   ```
   // String for the value instead of Long
   .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));
   
   // also for the `resultConsumerConfig` value L133 above:
   resultConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

Posted by GitBox <gi...@apache.org>.
jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924957060


##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    @Rule
+    public final TestName testName = new TestName();
+    private static MockTime mockTime;
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    protected static final String INPUT_TOPIC = "inputTopic";
+    protected static final String OUTPUT_TOPIC = "outputTopic";
+
+    protected static final int STREAMS_CONSUMER_TIMEOUT = 20000;

Review Comment:
   The test takes at least 20 seconds to run.  This makes me wonder if the removal from the CG is working properly....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org