You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/21 17:57:25 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12035: KAFKA-13217: Reconsider skipping the LeaveGroup on close() or add an overload that does so

guozhangwang commented on code in PR #12035:
URL: https://github.com/apache/kafka/pull/12035#discussion_r878743871


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1498,6 +1516,61 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
         return close(timeoutMs);
     }
 
+    /**
+     * Shutdown this {@code KafkaStreams} by signaling all the threads to stop, and then wait up to the timeout for the
+     * threads to join.
+     * @param options  contains timeout to specify how long to wait for the threads to shutdown, and a flag leaveGroup to
+     *                 trigger consumer leave call
+     * @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
+     * before all threads stopped
+     * Note that this method must not be called in the {@link StateListener#onChange(KafkaStreams.State, KafkaStreams.State)} callback of {@link StateListener}.
+     * @throws IllegalArgumentException if {@code timeout} can't be represented as {@code long milliseconds}
+     */
+    public synchronized boolean close(final CloseOptions options) throws IllegalArgumentException {
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(options.timeout, "timeout");
+        final long timeoutMs = validateMillisecondDuration(options.timeout, msgPrefix);
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout can't be negative.");
+        }
+
+        final long startMs = time.milliseconds();
+
+        final boolean closeStatus = close(timeoutMs);
+
+        final Optional<String> groupInstanceId = clientSupplier
+                .getConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId))
+                .groupMetadata()
+                .groupInstanceId();
+
+        final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs);

Review Comment:
   It's rare but possible that the `remainingTimeMs` here could be negative, let's guard it with `max(0, ...)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org