You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/10/09 18:38:14 UTC
[kafka] branch 2.1 updated: KAFKA-7477: Improve Streams close
timeout semantics (#5747)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2e500bd KAFKA-7477: Improve Streams close timeout semantics (#5747)
2e500bd is described below
commit 2e500bd40e2caca5e72dfa3d1ba3d438587e1f89
Author: Nikolay <ni...@apache.org>
AuthorDate: Tue Oct 9 21:37:08 2018 +0300
KAFKA-7477: Improve Streams close timeout semantics (#5747)
Reviewers: Matthias J. Sax <ma...@confluent.io>, John Roesler <jo...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 39 +++++++++++++++-------
.../org/apache/kafka/streams/KafkaStreamsTest.java | 28 ++++++++++++++++
2 files changed, 55 insertions(+), 12 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 49a701a..819732a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -218,13 +218,7 @@ public class KafkaStreams {
synchronized (stateLock) {
long elapsedMs = 0L;
while (state != targetState) {
- if (waitMs == 0) {
- try {
- stateLock.wait();
- } catch (final InterruptedException e) {
- // it is ok: just move on to the next iteration
- }
- } else if (waitMs > elapsedMs) {
+ if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
stateLock.wait(remainingMs);
@@ -825,17 +819,30 @@ public class KafkaStreams {
* threads to join.
* A {@code timeout} of 0 means to wait forever.
*
- * @param timeout how long to wait for the threads to shutdown
+ * @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately.
* @param timeUnit unit of time used for timeout
* @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached
* before all threads stopped
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
- * @deprecated Use {@link #close(Duration)} instead
+ * @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`.
*/
@Deprecated
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
- log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
+ long timeoutMs = timeUnit.toMillis(timeout);
+
+ log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " +
+ "Please, consider update your code.", timeoutMs);
+
+ if (timeoutMs < 0) {
+ timeoutMs = 0;
+ } else if (timeoutMs == 0) {
+ timeoutMs = Long.MAX_VALUE;
+ }
+
+ return close(timeoutMs);
+ }
+ private boolean close(final long timeoutMs) {
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
@@ -891,7 +898,7 @@ public class KafkaStreams {
shutdownThread.start();
}
- if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
+ if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
log.info("Streams client stopped completely");
return true;
} else {
@@ -913,7 +920,15 @@ public class KafkaStreams {
*/
public synchronized boolean close(final Duration timeout) throws IllegalArgumentException {
ApiUtils.validateMillisecondDuration(timeout, "timeout");
- return close(timeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ final long timeoutMs = timeout.toMillis();
+ if (timeoutMs < 0) {
+ throw new IllegalArgumentException("Timeout can't be negative.");
+ }
+
+ log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs);
+
+ return close(timeoutMs);
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index abc4cb9..b9d542b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -548,6 +548,34 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void shouldThrowOnNegativeTimeoutForClose() {
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ try {
+ streams.close(Duration.ofMillis(-1L));
+ fail("should not accept negative close parameter");
+ } catch (final IllegalArgumentException e) {
+ // expected
+ } finally {
+ streams.close();
+ }
+ }
+
+ @Test
+ public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException {
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L)));
+
+ th.start();
+
+ try {
+ th.join(30_000L);
+ assertFalse(th.isAlive());
+ } finally {
+ streams.close();
+ }
+ }
+
private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition(