You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/15 17:47:00 UTC

kafka git commit: MINOR: Improve log4j on stream thread and stream process

Repository: kafka
Updated Branches:
  refs/heads/trunk 962c378cc -> b9f812491


MINOR: Improve log4j on stream thread and stream process

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Eno Thereska <en...@confluent.io>, Damian Guy <da...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #2685 from guozhangwang/KMinor-improve-log4j


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9f81249
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9f81249
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9f81249

Branch: refs/heads/trunk
Commit: b9f812491f5eb06ffe5b68f5e53df4302d2f68a8
Parents: 962c378
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Mar 15 10:46:57 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Mar 15 10:46:57 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 24 ++++++++----
 .../processor/internals/StreamThread.java       | 39 ++++++++------------
 2 files changed, 32 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 8d8626d..b23d244 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -131,6 +131,7 @@ public class KafkaStreams {
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
     private final UUID processId;
+    private final String logPrefix;
     private final StreamsMetadataState streamsMetadataState;
 
     private final StreamsConfig config;
@@ -217,9 +218,13 @@ public class KafkaStreams {
     private synchronized void setState(final State newState) {
         final State oldState = state;
         if (!state.isValidTransition(newState)) {
-            log.warn("Unexpected state transition from {} to {}.", oldState, newState);
+            log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+        } else {
+            log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
         }
+
         state = newState;
+
         if (stateListener != null) {
             stateListener.onChange(state, oldState);
         }
@@ -310,6 +315,8 @@ public class KafkaStreams {
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + processId;
 
+        this.logPrefix = String.format("stream-client [%s]", clientId);
+
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);
         reporters.add(new JmxReporter(JMX_PREFIX));
@@ -329,7 +336,7 @@ public class KafkaStreams {
         final ProcessorTopology globalTaskTopology = builder.buildGlobalStateTopology();
 
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
+            log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
         }
 
         final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
@@ -395,7 +402,7 @@ public class KafkaStreams {
         try {
             client.close();
         } catch (final IOException e) {
-            log.warn("Could not close StreamKafkaClient.", e);
+            log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
         }
 
     }
@@ -411,7 +418,7 @@ public class KafkaStreams {
      * @throws StreamsException if the Kafka brokers have version 0.10.0.x
      */
     public synchronized void start() throws IllegalStateException, StreamsException {
-        log.debug("Starting Kafka Stream process.");
+        log.debug("{} Starting Kafka Stream process.", logPrefix);
 
         if (state == State.CREATED) {
             checkBrokerVersionCompatibility();
@@ -425,7 +432,7 @@ public class KafkaStreams {
                 thread.start();
             }
 
-            log.info("Started Kafka Stream process");
+            log.info("{} Started Kafka Stream process", logPrefix);
         } else {
             throw new IllegalStateException("Cannot start again.");
         }
@@ -450,7 +457,7 @@ public class KafkaStreams {
      * before all threads stopped
      */
     public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
-        log.debug("Stopping Kafka Stream process.");
+        log.debug("{} Stopping Kafka Stream process.", logPrefix);
         if (state.isCreatedOrRunning()) {
             setState(State.PENDING_SHUTDOWN);
             // save the current thread so that if it is a stream thread
@@ -486,7 +493,7 @@ public class KafkaStreams {
                     }
 
                     metrics.close();
-                    log.info("Stopped Kafka Streams process.");
+                    log.info("{} Stopped Kafka Streams process.", logPrefix);
                 }
             }, "kafka-streams-close-thread");
             shutdown.setDaemon(true);
@@ -556,7 +563,8 @@ public class KafkaStreams {
         final String stateDir = config.getString(StreamsConfig.STATE_DIR_CONFIG);
 
         final String localApplicationDir = stateDir + File.separator + appId;
-        log.debug("Removing local Kafka Streams application data in {} for application {}.",
+        log.debug("{} Removing local Kafka Streams application data in {} for application {}.",
+            logPrefix,
             localApplicationDir,
             appId);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b9f81249/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9a2c3fa..6a6b508 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -156,8 +156,11 @@ public class StreamThread extends Thread {
     private synchronized void setState(State newState) {
         State oldState = state;
         if (!state.isValidTransition(newState)) {
-            log.warn("Unexpected state transition from " + state + " to " + newState);
+            log.warn("Unexpected state transition from {} to {}.", logPrefix, oldState, newState);
+        } else {
+            log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
         }
+
         state = newState;
         if (stateListener != null) {
             stateListener.onChange(this, state, oldState);
@@ -296,7 +299,7 @@ public class StreamThread extends Thread {
         this.lastCleanMs = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
         this.lastCommitMs = timerStartedMs;
         this.rebalanceListener = new RebalanceListener(time, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
-        setState(state.RUNNING);
+        setState(State.RUNNING);
 
     }
 
@@ -366,7 +369,7 @@ public class StreamThread extends Thread {
         try {
             partitionAssignor.close();
         } catch (Throwable e) {
-            log.error("stream-thread [{}] Failed to close KafkaStreamClient: ", this.getName(), e);
+            log.error("{} Failed to close KafkaStreamClient: ", logPrefix, e);
         }
 
         removeStreamTasks();
@@ -393,7 +396,7 @@ public class StreamThread extends Thread {
 
     @SuppressWarnings("ThrowableNotThrown")
     private void shutdownTasksAndState() {
-        log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix,
+        log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
 
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
@@ -418,7 +421,7 @@ public class StreamThread extends Thread {
      * soon the tasks will be assigned again
      */
     private void suspendTasksAndState()  {
-        log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix,
+        log.debug("{} suspendTasksAndState: suspending all active tasks {} and standby tasks {}", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
         // Close all topology nodes
@@ -709,7 +712,7 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
-        log.trace("stream-thread [{}] Committing all its owned tasks", this.getName());
+        log.trace("{} Committing all its owned tasks", logPrefix);
         for (StreamTask task : activeTasks.values()) {
             commitOne(task);
         }
@@ -775,7 +778,7 @@ public class StreamThread extends Thread {
     }
 
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
-        log.info("{} Creating active task {} with assigned partitions [{}]", logPrefix, id, partitions);
+        log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
@@ -894,7 +897,7 @@ public class StreamThread extends Thread {
     }
 
     StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
-        log.info("{} Creating new standby task {} with assigned partitions [{}]", logPrefix, id, partitions);
+        log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions);
 
         streamsMetrics.taskCreatedSensor.record();
 
@@ -965,14 +968,14 @@ public class StreamThread extends Thread {
     }
 
     private void updateSuspendedTasks() {
-        log.info("{} Updating suspended tasks to contain active tasks [{}]", logPrefix, activeTasks.keySet());
+        log.info("{} Updating suspended tasks to contain active tasks {}", logPrefix, activeTasks.keySet());
         suspendedTasks.clear();
         suspendedTasks.putAll(activeTasks);
         suspendedStandbyTasks.putAll(standbyTasks);
     }
 
     private void removeStreamTasks() {
-        log.info("{} Removing all active tasks [{}]", logPrefix, activeTasks.keySet());
+        log.info("{} Removing all active tasks {}", logPrefix, activeTasks.keySet());
 
         try {
             prevActiveTasks.clear();
@@ -987,7 +990,7 @@ public class StreamThread extends Thread {
     }
 
     private void removeStandbyTasks() {
-        log.info("{} Removing all standby tasks [{}]", logPrefix, standbyTasks.keySet());
+        log.info("{} Removing all standby tasks {}", logPrefix, standbyTasks.keySet());
 
         standbyTasks.clear();
         standbyTasksByPartition.clear();
@@ -1197,12 +1200,7 @@ public class StreamThread extends Thread {
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
             final long start = time.milliseconds();
             try {
-                if (state == State.PENDING_SHUTDOWN) {
-                    log.info("stream-thread [{}] New partitions [{}] assigned while shutting down.",
-                        StreamThread.this.getName(), assignment);
-                }
-                log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.",
-                    StreamThread.this.getName(), assignment);
+                log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment);
                 storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut);
                 setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                 // do this first as we may have suspended standby tasks that
@@ -1226,12 +1224,7 @@ public class StreamThread extends Thread {
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
             try {
-                if (state == State.PENDING_SHUTDOWN) {
-                    log.info("stream-thread [{}] New partitions [{}] revoked while shutting down.",
-                             StreamThread.this.getName(), assignment);
-                }
-                log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.",
-                         StreamThread.this.getName(), assignment);
+                log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment);
                 setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                 lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
                 // suspend active tasks