You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/30 04:21:38 UTC
kafka git commit: KAFKA-4387;
KafkaConsumer should properly handle interrupts
Repository: kafka
Updated Branches:
refs/heads/trunk 5819b06fa -> 1503f7603
KAFKA-4387; KafkaConsumer should properly handle interrupts
See https://issues.apache.org/jira/browse/KAFKA-4387
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2110 from srdo/KAFKA-4387
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1503f760
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1503f760
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1503f760
Branch: refs/heads/trunk
Commit: 1503f7603c1d5e0511bf71175d84bde555c80e9a
Parents: 5819b06
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 19:44:57 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 29 19:57:13 2016 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 32 ++++++++++++++-
.../clients/consumer/OffsetCommitCallback.java | 2 +
.../consumer/internals/AbstractCoordinator.java | 4 +-
.../consumer/internals/ConsumerCoordinator.java | 7 ++--
.../internals/ConsumerNetworkClient.java | 13 ++++++
.../clients/consumer/KafkaConsumerTest.java | 43 ++++++++++++++++++++
docs/upgrade.html | 11 +++++
7 files changed, 106 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ae68e02..e273a04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import org.apache.kafka.common.errors.InterruptException;
/**
* A client that consumes records from a Kafka cluster.
@@ -453,6 +454,12 @@ import java.util.regex.Pattern;
* </pre>
*
* <p>
+ * Note that while it is possible to use thread interrupts instead of {@link #wakeup()} to abort a blocking operation
+ * (in which case, {@link InterruptException} will be raised), we discourage their use since they may cause a clean
+ * shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using {@link #wakeup()}
+ * is impossible, e.g. when a consumer thread is managed by code that is unaware of the Kafka client.
+ *
+ * <p>
* We have intentionally avoided implementing a particular threading model for processing. This leaves several
* options for implementing multi-threaded processing of records.
*
@@ -954,6 +961,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* partitions is undefined or out of range and no offset reset policy has been configured
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
@@ -1060,6 +1069,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
@@ -1092,6 +1103,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
@@ -1228,6 +1241,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* the partition
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1259,6 +1274,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The last committed offset and metadata or null if there was no prior commit
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1301,6 +1318,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The list of partitions
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
* @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
* expiration of the configured request timeout
@@ -1329,6 +1348,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The map of topics and its partitions
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
* expiration of the configured request timeout
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1461,6 +1482,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this
* will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close.
+ *
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
+ * before or while this function is called
*/
@Override
public void close() {
@@ -1504,8 +1528,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
log.debug("The Kafka consumer has closed.");
- if (firstException.get() != null && !swallowException) {
- throw new KafkaException("Failed to close kafka consumer", firstException.get());
+ Throwable exception = firstException.get();
+ if (exception != null && !swallowException) {
+ if (exception instanceof InterruptException) {
+ throw (InterruptException) exception;
+ }
+ throw new KafkaException("Failed to close kafka consumer", exception);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index dfa8391..918087d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -35,6 +35,8 @@ public interface OffsetCommitCallback {
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link KafkaConsumer#wakeup()} is called before or while this
* function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c205273..c9888a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.kafka.common.errors.InterruptException;
/**
* AbstractCoordinator implements group management for a single group member by interacting with
@@ -886,7 +887,8 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
}
- } catch (InterruptedException e) {
+ } catch (InterruptedException | InterruptException e) {
+ Thread.interrupted();
log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
this.failed.set(new RuntimeException(e));
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 56f6951..a122575 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -55,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.kafka.common.errors.InterruptException;
/**
* This class manages the coordination process with the consumer coordinator.
@@ -227,7 +228,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
- } catch (WakeupException e) {
+ } catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition assignment",
@@ -335,7 +336,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
try {
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
- } catch (WakeupException e) {
+ } catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition revocation",
@@ -559,7 +560,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
if (autoCommitEnabled) {
try {
commitOffsetsSync(subscriptions.allConsumed());
- } catch (WakeupException e) {
+ } catch (WakeupException | InterruptException e) {
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index d9baa56..b09b787 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.common.errors.InterruptException;
/**
* Higher level consumer access to the network layer with basic support for request futures. This class
@@ -173,6 +174,7 @@ public class ConsumerNetworkClient implements Closeable {
* Block indefinitely until the given request future has finished.
* @param future The request future to await.
* @throws WakeupException if {@link #wakeup()} is called from another thread
+ * @throws InterruptException if the calling thread is interrupted
*/
public void poll(RequestFuture<?> future) {
while (!future.isDone())
@@ -185,6 +187,7 @@ public class ConsumerNetworkClient implements Closeable {
* @param timeout The maximum duration (in ms) to wait for the request
* @return true if the future is done, false otherwise
* @throws WakeupException if {@link #wakeup()} is called from another thread
+ * @throws InterruptException if the calling thread is interrupted
*/
public boolean poll(RequestFuture<?> future, long timeout) {
long begin = time.milliseconds();
@@ -203,6 +206,7 @@ public class ConsumerNetworkClient implements Closeable {
* Poll for any network IO.
* @param timeout The maximum time to wait for an IO event.
* @throws WakeupException if {@link #wakeup()} is called from another thread
+ * @throws InterruptException if the calling thread is interrupted
*/
public void poll(long timeout) {
poll(timeout, time.milliseconds(), null);
@@ -242,6 +246,9 @@ public class ConsumerNetworkClient implements Closeable {
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();
+
+ // throw InterruptException if this thread is interrupted
+ maybeThrowInterruptException();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
@@ -403,6 +410,12 @@ public class ConsumerNetworkClient implements Closeable {
throw new WakeupException();
}
}
+
+ private void maybeThrowInterruptException() {
+ if (Thread.interrupted()) {
+ throw new InterruptException(new InterruptedException());
+ }
+ }
public void disableWakeups() {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e34f438..ad6c127 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -87,6 +87,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.kafka.common.errors.InterruptException;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
public class KafkaConsumerTest {
private final String topic = "test";
private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -97,6 +101,9 @@ public class KafkaConsumerTest {
private final String topic3 = "test3";
private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
@Test
public void testConstructorClose() throws Exception {
@@ -678,6 +685,42 @@ public class KafkaConsumerTest {
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
}
+
+ @Test
+ public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+
+ final Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster(topic, 1);
+ final Node node = cluster.nodes().get(0);
+
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+
+ final MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+ final PartitionAssignor assignor = new RoundRobinAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);
+
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+ consumer.poll(0);
+
+ // interrupt the thread and call poll
+ try {
+ Thread.currentThread().interrupt();
+ expectedException.expect(InterruptException.class);
+ consumer.poll(0);
+ } finally {
+ // clear interrupted state again since this thread may be reused by JUnit
+ Thread.interrupted();
+ }
+ }
@Test
public void fetchResponseWithUnexpectedPartitionIsIgnored() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6b9747..c63487d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -14,6 +14,17 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
+<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0</a></h4>
+Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
+can upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
+<br>
+0.10.2.0 has <a href="#upgrade_10_2_0_breaking">Potential breaking changes</a> (Please review before upgrading).
+
+<h5><a id="upgrade_10_2_0_breaking" href="#upgrade_10_2_0_breaking">Potential breaking changes in 0.10.2.0</a></h5>
+<ul>
+ <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
+ Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
+</ul>
<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.