You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/15 17:47:00 UTC
kafka git commit: MINOR: Improve log4j on stream thread and stream
process
Repository: kafka
Updated Branches:
refs/heads/trunk 962c378cc -> b9f812491
MINOR: Improve log4j on stream thread and stream process
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #2685 from guozhangwang/KMinor-improve-log4j
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9f81249
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9f81249
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9f81249
Branch: refs/heads/trunk
Commit: b9f812491f5eb06ffe5b68f5e53df4302d2f68a8
Parents: 962c378
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Mar 15 10:46:57 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Mar 15 10:46:57 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 24 ++++++++----
.../processor/internals/StreamThread.java | 39 ++++++++------------
2 files changed, 32 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 8d8626d..b23d244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -131,6 +131,7 @@ public class KafkaStreams {
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
private final UUID processId;
+ private final String logPrefix;
private final StreamsMetadataState streamsMetadataState;
private final StreamsConfig config;
@@ -217,9 +218,13 @@ public class KafkaStreams {
private synchronized void setState(final State newState) {
final State oldState = state;
if (!state.isValidTransition(newState)) {
- log.warn("Unexpected state transition from {} to {}.", oldState, newState);
+ log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+ } else {
+ log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
}
+
state = newState;
+
if (stateListener != null) {
stateListener.onChange(state, oldState);
}
@@ -310,6 +315,8 @@ public class KafkaStreams {
if (clientId.length() <= 0)
clientId = applicationId + "-" + processId;
+ this.logPrefix = String.format("stream-client [%s]", clientId);
+
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
@@ -329,7 +336,7 @@ public class KafkaStreams {
final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
- log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+ log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
}
final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
@@ -395,7 +402,7 @@ public class KafkaStreams {
try {
client.close();
} catch (final IOException e) {
- log.warn("Could not close StreamKafkaClient.", e);
+ log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
}
}
@@ -411,7 +418,7 @@ public class KafkaStreams {
* @throws StreamsException if the Kafka brokers have version 0.10.0.x
*/
public synchronized void start() throws IllegalStateException, StreamsException {
- log.debug("Starting Kafka Stream process.");
+ log.debug("{} Starting Kafka Stream process.", logPrefix);
if (state == State.CREATED) {
checkBrokerVersionCompatibility();
@@ -425,7 +432,7 @@ public class KafkaStreams {
thread.start();
}
- log.info("Started Kafka Stream process");
+ log.info("{} Started Kafka Stream process", logPrefix);
} else {
throw new IllegalStateException("Cannot start again.");
}
@@ -450,7 +457,7 @@ public class KafkaStreams {
* before all threads stopped
*/
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
- log.debug("Stopping Kafka Stream process.");
+ log.debug("{} Stopping Kafka Stream process.", logPrefix);
if (state.isCreatedOrRunning()) {
setState(State.PENDING_SHUTDOWN);
// save the current thread so that if it is a stream thread
@@ -486,7 +493,7 @@ public class KafkaStreams {
}
metrics.close();
- log.info("Stopped Kafka Streams process.");
+ log.info("{} Stopped Kafka Streams process.", logPrefix);
}
}, "kafka-streams-close-thread");
shutdown.setDaemon(true);
@@ -556,7 +563,8 @@ public class KafkaStreams {
final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final String localApplicationDir = stateDir + File.separator + appId;
- log.debug("Removing local Kafka Streams application data in {} for application {}.",
+ log.debug("{} Removing local Kafka Streams application data in {} for application {}.",
+ logPrefix,
localApplicationDir,
appId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9a2c3fa..6a6b508 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -156,8 +156,11 @@ public class StreamThread extends Thread {
private synchronized void setState(State newState) {
State oldState = state;
if (!state.isValidTransition(newState)) {
- log.warn("Unexpected state transition from " + state + " to " + newState);
+ log.warn("Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+ } else {
+ log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
}
+
state = newState;
if (stateListener != null) {
stateListener.onChange(this, state, oldState);
@@ -296,7 +299,7 @@ public class StreamThread extends Thread {
this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
this.lastCommitMs = timerStartedMs;
this.rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
- setState(state.RUNNING);
+ setState(State.RUNNING);
}
@@ -366,7 +369,7 @@ public class StreamThread extends Thread {
try {
partitionAssignor.close();
} catch (Throwable e) {
- log.error("stream-thread [{}] Failed to close KafkaStreamClient: ", this.getName(), e);
+ log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e);
}
removeStreamTasks();
@@ -393,7 +396,7 @@ public class StreamThread extends Thread {
@SuppressWarnings("ThrowableNotThrown")
private void shutdownTasksAndState() {
- log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix,
+ log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
activeTasks.keySet(), standbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
@@ -418,7 +421,7 @@ public class StreamThread extends Thread {
* soon the tasks will be assigned again
*/
private void suspendTasksAndState() {
- log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix,
+ log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}", logPrefix,
activeTasks.keySet(), standbyTasks.keySet());
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
// Close all topology nodes
@@ -709,7 +712,7 @@ public class StreamThread extends Thread {
* Commit the states of all its tasks
*/
private void commitAll() {
- log.trace("stream-thread [{}] Committing all its owned tasks", this.getName());
+ log.trace("{} Committing all its owned tasks", logPrefix);
for (StreamTask task : activeTasks.values()) {
commitOne(task);
}
@@ -775,7 +778,7 @@ public class StreamThread extends Thread {
}
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
- log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions);
+ log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions);
streamsMetrics.taskCreatedSensor.record();
@@ -894,7 +897,7 @@ public class StreamThread extends Thread {
}
StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
- log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions);
+ log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions);
streamsMetrics.taskCreatedSensor.record();
@@ -965,14 +968,14 @@ public class StreamThread extends Thread {
}
private void updateSuspendedTasks() {
- log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet());
+ log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, activeTasks.keySet());
suspendedTasks.clear();
suspendedTasks.putAll(activeTasks);
suspendedStandbyTasks.putAll(standbyTasks);
}
private void removeStreamTasks() {
- log.info("{} Removing all active tasks [{}]", logPrefix, activeTasks.keySet());
+ log.info("{} Removing all active tasks {}", logPrefix, activeTasks.keySet());
try {
prevActiveTasks.clear();
@@ -987,7 +990,7 @@ public class StreamThread extends Thread {
}
private void removeStandbyTasks() {
- log.info("{} Removing all standby tasks [{}]", logPrefix, standbyTasks.keySet());
+ log.info("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet());
standbyTasks.clear();
standbyTasksByPartition.clear();
@@ -1197,12 +1200,7 @@ public class StreamThread extends Thread {
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
final long start = time.milliseconds();
try {
- if (state == State.PENDING_SHUTDOWN) {
- log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.",
- StreamThread.this.getName(), assignment);
- }
- log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.",
- StreamThread.this.getName(), assignment);
+ log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment);
storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut);
setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
// do this first as we may have suspended standby tasks that
@@ -1226,12 +1224,7 @@ public class StreamThread extends Thread {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
try {
- if (state == State.PENDING_SHUTDOWN) {
- log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.",
- StreamThread.this.getName(), assignment);
- }
- log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.",
- StreamThread.this.getName(), assignment);
+ log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment);
setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
// suspend active tasks