You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/09/05 23:07:16 UTC
kafka git commit: MINOR: logging improvements on StreamThread
Repository: kafka
Updated Branches:
refs/heads/0.11.0 90b6d978e -> 426057094
MINOR: logging improvements on StreamThread
This is a manual cherry-pick of https://github.com/apache/kafka/pull/3769 for 0.11.0
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
Closes #3771 from guozhangwang/KMinor-logging-improvements
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/42605709
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/42605709
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/42605709
Branch: refs/heads/0.11.0
Commit: 426057094cc2211578b8546708c7a9b8818aeb05
Parents: 90b6d97
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Sep 5 16:07:13 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Sep 5 16:07:13 2017 -0700
----------------------------------------------------------------------
.../processor/internals/AssignedTasks.java | 4 +-
.../processor/internals/StreamThread.java | 42 +++++++++-----------
.../processor/internals/StreamThreadTest.java | 5 +--
3 files changed, 21 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 815d80a..1e9ec60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -62,7 +62,7 @@ class AssignedTasks<T extends AbstractTask> {
}
void addNewTask(final T task) {
- log.trace("{} add new {} {}", logPrefix, taskTypeName, task.id());
+ log.trace("{} Add newly created {} {} with assigned partitions {}", logPrefix, taskTypeName, task.id(), task.partitions());
created.put(task.id(), task);
}
@@ -219,7 +219,7 @@ class AssignedTasks<T extends AbstractTask> {
final T task = suspended.get(taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
- log.trace("{} Resuming suspended {} {}", logPrefix, taskTypeName, taskId);
+ log.trace("{} Resuming suspended {} {} with assigned partitions {}", logPrefix, taskTypeName, taskId, partitions);
task.resume();
transitionToRunning(task);
return true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 40d741f..69cb328 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -189,8 +189,9 @@ public class StreamThread extends Thread {
log.trace("{} pausing partitions: {}", logPrefix, partitions);
consumer.pause(partitions);
} catch (final Throwable t) {
+ log.error("{} Error caught during partition assignment, " +
+ "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
rebalanceException = t;
- throw t;
} finally {
log.info("{} partition assignment took {} ms.\n" +
"\tcurrent active tasks: {}\n" +
@@ -220,8 +221,9 @@ public class StreamThread extends Thread {
// suspend active tasks
suspendTasksAndState();
} catch (final Throwable t) {
+ log.error("{} Error caught during partition revocation, " +
+ "will abort the current process and re-throw at the end of rebalance: {}", logPrefix, t.getMessage());
rebalanceException = t;
- throw t;
} finally {
streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
standbyRecords.clear();
@@ -1074,23 +1076,19 @@ public class StreamThread extends Thread {
protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) {
streamsMetrics.taskCreatedSensor.record();
- try {
- return new StreamTask(
- id,
- applicationId,
- partitions,
- builder.build(id.topicGroupId),
- consumer,
- storeChangelogReader,
- config,
- streamsMetrics,
- stateDirectory,
- cache,
- time,
- createProducer(id));
- } finally {
- log.trace("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
- }
+ return new StreamTask(
+ id,
+ applicationId,
+ partitions,
+ builder.build(id.topicGroupId),
+ consumer,
+ storeChangelogReader,
+ config,
+ streamsMetrics,
+ stateDirectory,
+ cache,
+ time,
+ createProducer(id));
}
private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -1154,11 +1152,7 @@ public class StreamThread extends Thread {
final ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStores().isEmpty()) {
- try {
- return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
- } finally {
- log.trace("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions);
- }
+ return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
} else {
log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/42605709/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 07849c3..b13613a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1347,10 +1347,7 @@ public class StreamThreadTest {
thread.rebalanceListener.onPartitionsRevoked(null);
clientSupplier.producers.get(0).fenceProducer();
- try {
- thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
- fail("should have thrown " + ProducerFencedException.class.getSimpleName());
- } catch (final ProducerFencedException e) { }
+ thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
assertTrue(thread.tasks().isEmpty());
}