You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/09/17 16:22:46 UTC

[kafka] branch trunk updated: MINOR: log and fail on missing task in Streams (#5655)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 607dea2  MINOR: log and fail on missing task in Streams (#5655)
607dea2 is described below

commit 607dea234f30a0c40d5a48f518a51372e02fa0a0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Sep 17 11:22:36 2018 -0500

    MINOR: log and fail on missing task in Streams (#5655)
    
    Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/processor/internals/StreamThread.java       | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b43177d..3a19aa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -979,9 +979,16 @@ public class StreamThread extends Thread {
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
 
-            if (task.isClosed()) {
+            if (task == null) {
+                log.error(
+                    "Unable to locate active task for received-record partition {}. Current tasks: {}",
+                    partition,
+                    taskManager.toString(">")
+                );
+                throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
+            } else if (task.isClosed()) {
                 log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
-                    "Notifying the thread to trigger a new rebalance immediately.", task.id());
+                             "Notifying the thread to trigger a new rebalance immediately.", task.id());
                 throw new TaskMigratedException(task);
             }