You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/09/17 16:22:46 UTC
[kafka] branch trunk updated: MINOR: log and fail on missing task
in Streams (#5655)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 607dea2 MINOR: log and fail on missing task in Streams (#5655)
607dea2 is described below
commit 607dea234f30a0c40d5a48f518a51372e02fa0a0
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Sep 17 11:22:36 2018 -0500
MINOR: log and fail on missing task in Streams (#5655)
Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/streams/processor/internals/StreamThread.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index b43177d..3a19aa7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -979,9 +979,16 @@ public class StreamThread extends Thread {
for (final TopicPartition partition : records.partitions()) {
final StreamTask task = taskManager.activeTask(partition);
- if (task.isClosed()) {
+ if (task == null) {
+ log.error(
+ "Unable to locate active task for received-record partition {}. Current tasks: {}",
+ partition,
+ taskManager.toString(">")
+ );
+ throw new NullPointerException("Task was unexpectedly missing for partition " + partition);
+ } else if (task.isClosed()) {
log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
- "Notifying the thread to trigger a new rebalance immediately.", task.id());
+ "Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}