You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/09 18:38:14 UTC

[kafka] branch 2.1 updated: KAFKA-7477: Improve Streams close timeout semantics (#5747)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2e500bd  KAFKA-7477: Improve Streams close timeout semantics (#5747)
2e500bd is described below

commit 2e500bd40e2caca5e72dfa3d1ba3d438587e1f89
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Oct 9 21:37:08 2018 +0300

    KAFKA-7477: Improve Streams close timeout semantics (#5747)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 39 +++++++++++++++-------
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 28 ++++++++++++++++
 2 files changed, 55 insertions(+), 12 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 49a701a..819732a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -218,13 +218,7 @@ public class KafkaStreams {
         synchronized (stateLock) {
             long elapsedMs = 0L;
             while (state != targetState) {
-                if (waitMs == 0) {
-                    try {
-                        stateLock.wait();
-                    } catch (final InterruptedException e) {
-                        // it is ok: just move on to the next iteration
-                    }
-                } else if (waitMs > elapsedMs) {
+                if (waitMs > elapsedMs) {
                     final long remainingMs = waitMs - elapsedMs;
                     try {
                         stateLock.wait(remainingMs);
@@ -825,17 +819,30 @@ public class KafkaStreams {
      * threads to join.
      * A {@code timeout} of 0 means to wait forever.
      *
-     * @param timeout  how long to wait for the threads to shutdown
+     * @param timeout  how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
      * @param timeUnit unit of time used for timeout
      * @return {@code true} if all threads were successfully stopped&mdash;{@code false} if the timeout was reached
      * before all threads stopped
      * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
-     * @deprecated Use {@link #close(Duration)} instead
+     * @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
      */
     @Deprecated
     public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
-        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
+        long timeoutMs = timeUnit.toMillis(timeout);
+
+        log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " +
+            "Please, consider update your code.", timeoutMs);
+
+        if (timeoutMs < 0) {
+            timeoutMs = 0;
+        } else if (timeoutMs == 0) {
+            timeoutMs = Long.MAX_VALUE;
+        }
+
+        return close(timeoutMs);
+    }
 
+    private boolean close(final long timeoutMs) {
         if (!setState(State.PENDING_SHUTDOWN)) {
             // if transition failed, it means it was either in PENDING_SHUTDOWN
             // or NOT_RUNNING already; just check that all threads have been stopped
@@ -891,7 +898,7 @@ public class KafkaStreams {
             shutdownThread.start();
         }
 
-        if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
+        if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
             log.info("Streams client stopped completely");
             return true;
         } else {
@@ -913,7 +920,15 @@ public class KafkaStreams {
      */
     public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
         ApiUtils.validateMillisecondDuration(timeout, "timeout");
-        return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+        final long timeoutMs = timeout.toMillis();
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout can't be negative.");
+        }
+
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+
+        return close(timeoutMs);
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index abc4cb9..b9d542b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -548,6 +548,34 @@ public class KafkaStreamsTest {
         }
     }
 
+    @Test
+    public void shouldThrowOnNegativeTimeoutForClose() {
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        try {
+            streams.close(Duration.ofMillis(-1L));
+            fail("should not accept negative close parameter");
+        } catch (final IllegalArgumentException e) {
+            // expected
+        } finally {
+            streams.close();
+        }
+    }
+
+    @Test
+    public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));
+
+        th.start();
+
+        try {
+            th.join(30_000L);
+            assertFalse(th.isAlive());
+        } finally {
+            streams.close();
+        }
+    }
+
     private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
         TestUtils.waitForCondition(