You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/05 23:07:16 UTC

kafka git commit: MINOR: logging improvements on StreamThread

Repository: kafka
Updated Branches:
  refs/heads/0.11.0 90b6d978e -> 426057094


MINOR: logging improvements on StreamThread

This is a manual cherry-pick of https://github.com/apache/kafka/pull/3769 for 0.11.0

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>

Closes #3771 from guozhangwang/KMinor-logging-improvements


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42605709
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42605709
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42605709

Branch: refs/heads/0.11.0
Commit: 426057094cc2211578b8546708c7a9b8818aeb05
Parents: 90b6d97
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Sep 5 16:07:13 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Sep 5 16:07:13 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |  4 +-
 .../processor/internals/StreamThread.java       | 42 +++++++++-----------
 .../processor/internals/StreamThreadTest.java   |  5 +--
 3 files changed, 21 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 815d80a..1e9ec60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -62,7 +62,7 @@ class AssignedTasks<T extends AbstractTask> {
     }
 
     void addNewTask(final T task) {
-        log.trace("{} add new {} {}", logPrefix, taskTypeName, task.id());
+        log.trace("{} Add newly created {} {} with assigned partitions {}", logPrefix,  taskTypeName, task.id(), task.partitions());
         created.put(task.id(), task);
     }
 
@@ -219,7 +219,7 @@ class AssignedTasks<T extends AbstractTask> {
             final T task = suspended.get(taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
-                log.trace("{} Resuming suspended {} {}", logPrefix, taskTypeName, taskId);
+                log.trace("{} Resuming suspended {} {} with assigned partitions {}", logPrefix, taskTypeName, taskId, partitions);
                 task.resume();
                 transitionToRunning(task);
                 return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 40d741f..69cb328 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -189,8 +189,9 @@ public class StreamThread extends Thread {
                 log.trace("{} pausing partitions: {}", logPrefix, partitions);
                 consumer.pause(partitions);
             } catch (final Throwable t) {
+                log.error("{} Error caught during partition assignment, " +
+                        "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
                 rebalanceException = t;
-                throw t;
             } finally {
                 log.info("{} partition assignment took {} ms.\n" +
                                  "\tcurrent active tasks: {}\n" +
@@ -220,8 +221,9 @@ public class StreamThread extends Thread {
                 // suspend active tasks
                 suspendTasksAndState();
             } catch (final Throwable t) {
+                log.error("{} Error caught during partition revocation, " +
+                        "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
                 rebalanceException = t;
-                throw t;
             } finally {
                 streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
                 standbyRecords.clear();
@@ -1074,23 +1076,19 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
         streamsMetrics.taskCreatedSensor.record();
 
-        try {
-            return new StreamTask(
-                    id,
-                    applicationId,
-                    partitions,
-                    builder.build(id.topicGroupId),
-                    consumer,
-                    storeChangelogReader,
-                    config,
-                    streamsMetrics,
-                    stateDirectory,
-                    cache,
-                    time,
-                    createProducer(id));
-        } finally {
-            log.trace("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
-        }
+        return new StreamTask(
+                id,
+                applicationId,
+                partitions,
+                builder.build(id.topicGroupId),
+                consumer,
+                storeChangelogReader,
+                config,
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                createProducer(id));
     }
 
     private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -1154,11 +1152,7 @@ public class StreamThread extends Thread {
         final ProcessorTopology topology = builder.build(id.topicGroupId);
 
         if (!topology.stateStores().isEmpty()) {
-            try {
-                return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
-            } finally {
-                log.trace("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions);
-            }
+            return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
         } else {
             log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 07849c3..b13613a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1347,10 +1347,7 @@ public class StreamThreadTest {
 
         thread.rebalanceListener.onPartitionsRevoked(null);
         clientSupplier.producers.get(0).fenceProducer();
-        try {
-            thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
-            fail("should have thrown " + ProducerFencedException.class.getSimpleName());
-        } catch (final ProducerFencedException e) { }
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
 
         assertTrue(thread.tasks().isEmpty());
     }