You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/30 04:21:38 UTC

kafka git commit: KAFKA-4387; KafkaConsumer should properly handle interrupts

Repository: kafka
Updated Branches:
  refs/heads/trunk 5819b06fa -> 1503f7603


KAFKA-4387; KafkaConsumer should properly handle interrupts

See https://issues.apache.org/jira/browse/KAFKA-4387

Author: Stig Rohde D�ssing <sd...@it-minds.dk>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2110 from srdo/KAFKA-4387


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1503f760
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1503f760
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1503f760

Branch: refs/heads/trunk
Commit: 1503f7603c1d5e0511bf71175d84bde555c80e9a
Parents: 5819b06
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 19:44:57 2016 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 29 19:57:13 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 32 ++++++++++++++-
 .../clients/consumer/OffsetCommitCallback.java  |  2 +
 .../consumer/internals/AbstractCoordinator.java |  4 +-
 .../consumer/internals/ConsumerCoordinator.java |  7 ++--
 .../internals/ConsumerNetworkClient.java        | 13 ++++++
 .../clients/consumer/KafkaConsumerTest.java     | 43 ++++++++++++++++++++
 docs/upgrade.html                               | 11 +++++
 7 files changed, 106 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ae68e02..e273a04 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
+import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * A client that consumes records from a Kafka cluster.
@@ -453,6 +454,12 @@ import java.util.regex.Pattern;
  * </pre>
  *
  * <p>
+ * Note that while it is possible to use thread interrupts instead of {@link #wakeup()} to abort a blocking operation
+ * (in which case, {@link InterruptException} will be raised), we discourage their use since they may cause a clean
+ * shutdown of the consumer to be aborted. Interrupts are mainly supported for those cases where using {@link #wakeup()}
+ * is impossible, e.g. when a consumer thread is managed by code that is unaware of the Kafka client.
+ *
+ * <p>
  * We have intentionally avoided implementing a particular threading model for processing. This leaves several
  * options for implementing multi-threaded processing of records.
  *
@@ -954,6 +961,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             partitions is undefined or out of range and no offset reset policy has been configured
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
      *             topics or to the configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
@@ -1060,6 +1069,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
@@ -1092,6 +1103,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
@@ -1228,6 +1241,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             the partition
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1259,6 +1274,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The last committed offset and metadata or null if there was no prior commit
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1301,6 +1318,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The list of partitions
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic
      * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
      *             expiration of the configured request timeout
@@ -1329,6 +1348,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The map of topics and its partitions
      * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
      *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
@@ -1461,6 +1482,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Close the consumer, waiting indefinitely for any needed cleanup. If auto-commit is enabled, this
      * will commit the current offsets. Note that {@link #wakeup()} cannot be use to interrupt close.
+     * 
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
+     * before or while this function is called
      */
     @Override
     public void close() {
@@ -1504,8 +1528,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
         log.debug("The Kafka consumer has closed.");
-        if (firstException.get() != null && !swallowException) {
-            throw new KafkaException("Failed to close kafka consumer", firstException.get());
+        Throwable exception = firstException.get();
+        if (exception != null && !swallowException) {
+            if (exception instanceof InterruptException) {
+                throw (InterruptException) exception;
+            }
+            throw new KafkaException("Failed to close kafka consumer", exception);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
index dfa8391..918087d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetCommitCallback.java
@@ -35,6 +35,8 @@ public interface OffsetCommitCallback {
      *             or if there is an active group with the same groupId which is using group management.
      * @throws org.apache.kafka.common.errors.WakeupException if {@link KafkaConsumer#wakeup()} is called before or while this
      *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c205273..c9888a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * AbstractCoordinator implements group management for a single group member by interacting with
@@ -886,7 +887,8 @@ public abstract class AbstractCoordinator implements Closeable {
                         }
                     }
                 }
-            } catch (InterruptedException e) {
+            } catch (InterruptedException | InterruptException e) {
+                Thread.interrupted();
                 log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
                 this.failed.set(new RuntimeException(e));
             } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 56f6951..a122575 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -55,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * This class manages the coordination process with the consumer coordinator.
@@ -227,7 +228,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         try {
             Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsAssigned(assigned);
-        } catch (WakeupException e) {
+        } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
             log.error("User provided listener {} for group {} failed on partition assignment",
@@ -335,7 +336,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         try {
             Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
             listener.onPartitionsRevoked(revoked);
-        } catch (WakeupException e) {
+        } catch (WakeupException | InterruptException e) {
             throw e;
         } catch (Exception e) {
             log.error("User provided listener {} for group {} failed on partition revocation",
@@ -559,7 +560,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         if (autoCommitEnabled) {
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
-            } catch (WakeupException e) {
+            } catch (WakeupException | InterruptException e) {
                 // rethrow wakeups since they are triggered by the user
                 throw e;
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index d9baa56..b09b787 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.kafka.common.errors.InterruptException;
 
 /**
  * Higher level consumer access to the network layer with basic support for request futures. This class
@@ -173,6 +174,7 @@ public class ConsumerNetworkClient implements Closeable {
      * Block indefinitely until the given request future has finished.
      * @param future The request future to await.
      * @throws WakeupException if {@link #wakeup()} is called from another thread
+     * @throws InterruptException if the calling thread is interrupted
      */
     public void poll(RequestFuture<?> future) {
         while (!future.isDone())
@@ -185,6 +187,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @param timeout The maximum duration (in ms) to wait for the request
      * @return true if the future is done, false otherwise
      * @throws WakeupException if {@link #wakeup()} is called from another thread
+     * @throws InterruptException if the calling thread is interrupted
      */
     public boolean poll(RequestFuture<?> future, long timeout) {
         long begin = time.milliseconds();
@@ -203,6 +206,7 @@ public class ConsumerNetworkClient implements Closeable {
      * Poll for any network IO.
      * @param timeout The maximum time to wait for an IO event.
      * @throws WakeupException if {@link #wakeup()} is called from another thread
+     * @throws InterruptException if the calling thread is interrupted
      */
     public void poll(long timeout) {
         poll(timeout, time.milliseconds(), null);
@@ -242,6 +246,9 @@ public class ConsumerNetworkClient implements Closeable {
             // trigger wakeups after checking for disconnects so that the callbacks will be ready
             // to be fired on the next call to poll()
             maybeTriggerWakeup();
+            
+            // throw InterruptException if this thread is interrupted
+            maybeThrowInterruptException();
 
             // try again to send requests since buffer space may have been
             // cleared or a connect finished in the poll
@@ -403,6 +410,12 @@ public class ConsumerNetworkClient implements Closeable {
             throw new WakeupException();
         }
     }
+    
+    private void maybeThrowInterruptException() {
+        if (Thread.interrupted()) {
+            throw new InterruptException(new InterruptedException());
+        }
+    }
 
     public void disableWakeups() {
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index e34f438..ad6c127 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -87,6 +87,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.kafka.common.errors.InterruptException;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
 public class KafkaConsumerTest {
     private final String topic = "test";
     private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -97,6 +101,9 @@ public class KafkaConsumerTest {
 
     private final String topic3 = "test3";
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
+    
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
 
     @Test
     public void testConstructorClose() throws Exception {
@@ -678,6 +685,42 @@ public class KafkaConsumerTest {
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
     }
+    
+    @Test
+    public void testPollThrowsInterruptExceptionIfInterrupted() throws Exception {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+
+        final Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        final Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+
+        final MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        final PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, 0);
+        
+        consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+        prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+        consumer.poll(0);
+
+        // interrupt the thread and call poll
+        try {
+            Thread.currentThread().interrupt();
+            expectedException.expect(InterruptException.class);
+            consumer.poll(0);
+        } finally {
+            // clear interrupted state again since this thread may be reused by JUnit
+            Thread.interrupted();
+        }
+    }
 
     @Test
     public void fetchResponseWithUnexpectedPartitionIsIgnored() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1503f760/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6b9747..c63487d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -14,6 +14,17 @@
  See the License for the specific language governing permissions and
  limitations under the License.
 -->
+<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0</a></h4>
+Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
+can upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
+<br>
+0.10.2.0 has <a href="#upgrade_10_2_0_breaking">Potential breaking changes</a> (Please review before upgrading).
+
+<h5><a id="upgrade_10_2_0_breaking" href="#upgrade_10_2_0_breaking">Potential breaking changes in 0.10.2.0</a></h5>
+<ul>
+    <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted. 
+        Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
+</ul>
 
 <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.