You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/18 08:53:27 UTC

[3/3] kafka git commit: KAFKA-5754; Refactor Streams to use LogContext

KAFKA-5754; Refactor Streams to use LogContext

This PR utilizes `org.apache.kafka.common.utils.LogContext` for logging in `KafkaStreams`. hachikuji, ijuma please review this and let me know your thoughts.

Author: umesh chaudhary <um...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>

Closes #3727 from umesh9794/KAFKA-5754


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f305dd68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f305dd68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f305dd68

Branch: refs/heads/trunk
Commit: f305dd68f6524abc25c4ed88983f0e78b4e6c243
Parents: 6055c74
Author: umesh chaudhary <um...@gmail.com>
Authored: Mon Sep 18 09:53:27 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 18 09:53:27 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  43 +++---
 .../processor/internals/AbstractTask.java       |  31 ++--
 .../processor/internals/AssignedTasks.java      |  66 ++++-----
 .../processor/internals/GlobalStreamThread.java |  37 ++---
 .../internals/ProcessorStateManager.java        |  46 +++---
 .../internals/RecordCollectorImpl.java          |  29 ++--
 .../processor/internals/StandbyContextImpl.java |   3 +-
 .../processor/internals/StandbyTask.java        |  13 +-
 .../internals/StoreChangelogReader.java         |  32 ++--
 .../internals/StreamPartitionAssignor.java      |  46 +++---
 .../streams/processor/internals/StreamTask.java |  51 +++----
 .../processor/internals/StreamThread.java       | 145 +++++++++----------
 .../processor/internals/TaskManager.java        |  40 ++---
 .../streams/state/internals/ThreadCache.java    |  15 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |   1 +
 ...reamSessionWindowAggregateProcessorTest.java |   3 +-
 .../internals/AbstractProcessorContextTest.java |   3 +-
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../processor/internals/AssignedTasksTest.java  |   3 +-
 .../processor/internals/ProcessorNodeTest.java  |   3 +-
 .../internals/ProcessorStateManagerTest.java    |  51 +++++--
 .../internals/RecordCollectorTest.java          |  25 +++-
 .../processor/internals/RecordQueueTest.java    |   3 +-
 .../processor/internals/SinkNodeTest.java       |   3 +-
 .../processor/internals/StandbyTaskTest.java    |   3 +-
 .../processor/internals/StateConsumerTest.java  |   6 +-
 .../internals/StoreChangelogReaderTest.java     |   6 +-
 .../processor/internals/StreamTaskTest.java     |  11 +-
 .../streams/state/KeyValueStoreTestDriver.java  |   5 +-
 .../internals/CachingKeyValueStoreTest.java     |   3 +-
 .../internals/CachingSessionStoreTest.java      |   3 +-
 .../state/internals/CachingWindowStoreTest.java |   3 +-
 .../ChangeLoggingKeyValueBytesStoreTest.java    |   3 +-
 .../ChangeLoggingKeyValueStoreTest.java         |   3 +-
 ...rtedCacheKeyValueBytesStoreIteratorTest.java |   5 +-
 ...rtedCacheWrappedWindowStoreIteratorTest.java |   3 +-
 .../state/internals/MeteredWindowStoreTest.java |   3 +-
 .../RocksDBKeyValueStoreSupplierTest.java       |   3 +-
 .../RocksDBSegmentedBytesStoreTest.java         |   3 +-
 .../RocksDBSessionStoreSupplierTest.java        |   3 +-
 .../internals/RocksDBSessionStoreTest.java      |   3 +-
 .../state/internals/RocksDBStoreTest.java       |   5 +-
 .../RocksDBWindowStoreSupplierTest.java         |   3 +-
 .../state/internals/RocksDBWindowStoreTest.java |   5 +-
 .../state/internals/SegmentIteratorTest.java    |   3 +-
 .../streams/state/internals/SegmentsTest.java   |   3 +-
 .../state/internals/StoreChangeLoggerTest.java  |   3 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../state/internals/ThreadCacheTest.java        |  54 +++----
 .../apache/kafka/test/KStreamTestDriver.java    |   8 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   6 +-
 51 files changed, 466 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7698f39..b31a3e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -57,7 +58,6 @@ import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -124,7 +124,6 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
 @InterfaceStability.Evolving
 public class KafkaStreams {
 
-    private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
     private static final String JMX_PREFIX = "kafka.streams";
     private static final int DEFAULT_CLOSE_TIMEOUT = 0;
 
@@ -132,10 +131,10 @@ public class KafkaStreams {
     // in userData of the subscription request to allow assignor be aware
     // of the co-location of stream thread's consumers. It is for internal
     // usage only and should not be exposed to users at all.
+    private final Logger log;
+    private final String logPrefix;
     private final UUID processId;
-
     private final Metrics metrics;
-    private final String logPrefix;
     private final StreamsConfig config;
     private final StreamThread[] threads;
     private final StateDirectory stateDirectory;
@@ -230,7 +229,7 @@ public class KafkaStreams {
                         // it is ok: just move on to the next iteration
                     }
                 } else {
-                    log.debug("{} Cannot transit to {} within {}ms", logPrefix, targetState, waitMs);
+                    log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
                     return false;
                 }
                 elapsedMs = System.currentTimeMillis() - begin;
@@ -256,10 +255,10 @@ public class KafkaStreams {
                 // will be refused but we do not throw exception here, to allow idempotent close calls
                 return false;
             } else if (!state.isValidTransition(newState)) {
-                log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState);
-                throw new IllegalStateException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+                log.error("Unexpected state transition from {} to {}", oldState, newState);
+                throw new IllegalStateException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
-                log.info("{} State transition from {} to {}", logPrefix, oldState, newState);
+                log.info("State transition from {} to {}", oldState, newState);
             }
             state = newState;
             stateLock.notifyAll();
@@ -406,7 +405,7 @@ public class KafkaStreams {
             }
 
             if (setState(State.ERROR)) {
-                log.warn("{} All stream threads have died. The instance will be in error state and should be closed.", logPrefix);
+                log.warn("All stream threads have died. The instance will be in error state and should be closed.");
             }
         }
 
@@ -453,7 +452,7 @@ public class KafkaStreams {
 
                 // special case when global thread is dead
                 if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) {
-                    log.warn("{} Global thread has died. The instance will be in error state and should be closed.", logPrefix);
+                    log.warn("Global thread has died. The instance will be in error state and should be closed.");
                 }
             }
         }
@@ -542,7 +541,11 @@ public class KafkaStreams {
         if (clientId.length() <= 0)
             clientId = applicationId + "-" + processId;
 
-        this.logPrefix = String.format("stream-client [%s]", clientId);
+        this.logPrefix = String.format("stream-client [%s] ", clientId);
+
+        final LogContext logContext = new LogContext(logPrefix);
+
+        this.log = logContext.logger(getClass());
 
         final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
             MetricsReporter.class);
@@ -565,7 +568,7 @@ public class KafkaStreams {
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
 
         if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
-            log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
+            log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
 
         final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
@@ -673,7 +676,7 @@ public class KafkaStreams {
         try {
             client.close();
         } catch (final IOException e) {
-            log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
+            log.warn("Could not close StreamKafkaClient.", e);
         }
 
     }
@@ -690,7 +693,7 @@ public class KafkaStreams {
      * @throws StreamsException if the Kafka brokers have version 0.10.0.x
      */
     public synchronized void start() throws IllegalStateException, StreamsException {
-        log.debug("{} Starting Streams client", logPrefix);
+        log.debug("Starting Streams client");
 
         // first set state to RUNNING before kicking off the threads,
         // making sure the state will always transit to RUNNING before REBALANCING
@@ -715,12 +718,12 @@ public class KafkaStreams {
                 }
             }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
 
-            log.info("{} Started Streams client", logPrefix);
+            log.info("Started Streams client");
         } else {
             // if transition failed but no exception is thrown; currently it is not possible
             // since we do not allow calling start multiple times whether or not it is already shutdown.
             // TODO: In the future if we lift this restriction this code path could then be triggered and be updated
-            log.error("{} Already stopped, cannot re-start", logPrefix);
+            log.error("Already stopped, cannot re-start");
         }
     }
 
@@ -744,12 +747,12 @@ public class KafkaStreams {
      * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
      */
     public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
-        log.debug("{} Stopping Streams client with timeoutMillis = {} ms.", logPrefix, timeUnit.toMillis(timeout));
+        log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
 
         if (!setState(State.PENDING_SHUTDOWN)) {
             // if transition failed, it means it was either in PENDING_SHUTDOWN
             // or NOT_RUNNING already; just check that all threads have been stopped
-            log.info("{} Already in the pending shutdown state, wait to complete shutdown", logPrefix);
+            log.info("Already in the pending shutdown state, wait to complete shutdown");
         } else {
             stateDirCleaner.shutdownNow();
 
@@ -798,10 +801,10 @@ public class KafkaStreams {
         }
 
         if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
-            log.info("{} Streams client stopped completely", logPrefix);
+            log.info("Streams client stopped completely");
             return true;
         } else {
-            log.info("{} Streams client cannot stop completely within the timeout", logPrefix);
+            log.info("Streams client cannot stop completely within the timeout");
             return false;
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b4c8c16..5ed9aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -30,7 +31,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -40,7 +40,6 @@ import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractTask implements Task {
-    private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
 
     final TaskId id;
     final String applicationId;
@@ -50,6 +49,8 @@ public abstract class AbstractTask implements Task {
     final Consumer consumer;
     final String logPrefix;
     final boolean eosEnabled;
+    final Logger log;
+    final LogContext logContext;
     boolean taskInitialized;
     private final StateDirectory stateDirectory;
 
@@ -75,7 +76,9 @@ public abstract class AbstractTask implements Task {
         this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
         this.stateDirectory = stateDirectory;
 
-        logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
+        this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" : "task", id());
+        this.logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
 
         // create the processor state manager
         try {
@@ -86,9 +89,10 @@ public abstract class AbstractTask implements Task {
                 stateDirectory,
                 topology.storeToChangelogTopic(),
                 changelogReader,
-                eosEnabled);
+                eosEnabled,
+                    logContext);
         } catch (final IOException e) {
-            throw new ProcessorStateException(String.format("%s Error while creating the state manager", logPrefix), e);
+            throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e);
         }
     }
 
@@ -173,7 +177,7 @@ public abstract class AbstractTask implements Task {
                 stateMgr.putOffsetLimit(partition, offset);
 
                 if (log.isTraceEnabled()) {
-                    log.trace("{} Updating store offset limits {} for changelog {}", logPrefix, offset, partition);
+                    log.trace("Updating store offset limits {} for changelog {}", offset, partition);
                 }
             } catch (final AuthorizationException e) {
                 throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e);
@@ -199,21 +203,20 @@ public abstract class AbstractTask implements Task {
 
         try {
             if (!stateDirectory.lock(id, 5)) {
-                throw new LockException(String.format("%s Failed to lock the state directory for task %s",
+                throw new LockException(String.format("%sFailed to lock the state directory for task %s",
                                                       logPrefix, id));
             }
         } catch (IOException e) {
-            throw new StreamsException(String.format("%s fatal error while trying to lock the state directory for task %s",
-                                                     logPrefix,
-                                                     id));
+            throw new StreamsException(String.format("%sFatal error while trying to lock the state directory for task %s",
+                                                     logPrefix, id));
         }
-        log.trace("{} Initializing state stores", logPrefix);
+        log.trace("Initializing state stores");
 
         // set initial offset limits
         updateOffsetLimits();
 
         for (final StateStore store : topology.stateStores()) {
-            log.trace("{} Initializing store {}", logPrefix, store.name());
+            log.trace("Initializing store {}", store.name());
             store.init(processorContext, store);
         }
     }
@@ -225,7 +228,7 @@ public abstract class AbstractTask implements Task {
      */
     void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
         ProcessorStateException exception = null;
-        log.trace("{} Closing state manager", logPrefix);
+        log.trace("Closing state manager");
         try {
             stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
         } catch (final ProcessorStateException e) {
@@ -235,7 +238,7 @@ public abstract class AbstractTask implements Task {
                 stateDirectory.unlock(id);
             } catch (IOException e) {
                 if (exception == null) {
-                    exception = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e);
+                    exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a1966b1..2d886b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -20,10 +20,10 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,8 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 class AssignedTasks {
-    private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class);
-    private final String logPrefix;
+    private final Logger log;
     private final String taskTypeName;
     private final TaskAction maybeCommitAction;
     private final TaskAction commitAction;
@@ -54,11 +53,12 @@ class AssignedTasks {
     private int committed = 0;
 
 
-    AssignedTasks(final String logPrefix,
+    AssignedTasks(final LogContext logContext,
                   final String taskTypeName) {
-        this.logPrefix = logPrefix;
         this.taskTypeName = taskTypeName;
 
+        this.log = logContext.logger(getClass());
+
         maybeCommitAction = new TaskAction() {
             @Override
             public String name() {
@@ -71,8 +71,7 @@ class AssignedTasks {
                     committed++;
                     task.commit();
                     if (log.isDebugEnabled()) {
-                        log.debug("{} Committed active task {} per user request in",
-                                  logPrefix, task.id());
+                        log.debug("Committed active task {} per user request in", task.id());
                     }
                 }
             }
@@ -110,13 +109,13 @@ class AssignedTasks {
 
     void initializeNewTasks() {
         if (!created.isEmpty()) {
-            log.trace("{} Initializing {}s {}", logPrefix, taskTypeName, created.keySet());
+            log.trace("Initializing {}s {}", taskTypeName, created.keySet());
         }
         for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, Task> entry = it.next();
             try {
                 if (!entry.getValue().initialize()) {
-                    log.debug("{} transitioning {} {} to restoring", logPrefix, taskTypeName, entry.getKey());
+                    log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     restoring.put(entry.getKey(), entry.getValue());
                 } else {
                     transitionToRunning(entry.getValue());
@@ -124,7 +123,7 @@ class AssignedTasks {
                 it.remove();
             } catch (final LockException e) {
                 // made this trace as it will spam the logs in the poll loop.
-                log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
+                log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
             }
         }
     }
@@ -133,7 +132,7 @@ class AssignedTasks {
         if (restored.isEmpty()) {
             return Collections.emptySet();
         }
-        log.trace("{} {} partitions restored for {}", logPrefix, taskTypeName, restored);
+        log.trace("{} partitions restored for {}", taskTypeName, restored);
         final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
@@ -147,8 +146,7 @@ class AssignedTasks {
                 if (log.isTraceEnabled()) {
                     final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
                     outstandingPartitions.removeAll(restoredPartitions);
-                    log.trace("{} partition restoration not complete for {} {} partitions: {}",
-                              logPrefix,
+                    log.trace("partition restoration not complete for {} {} partitions: {}",
                               taskTypeName,
                               task.id(),
                               task.changelogPartitions());
@@ -173,11 +171,11 @@ class AssignedTasks {
 
     RuntimeException suspend() {
         final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
-        log.trace("{} Suspending running {} {}", logPrefix, taskTypeName, runningTaskIds());
+        log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
         firstException.compareAndSet(null, suspendTasks(running.values()));
-        log.trace("{} Close restoring {} {}", logPrefix, taskTypeName, restoring.keySet());
+        log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
         firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
-        log.trace("{} Close created {} {}", logPrefix, taskTypeName, created.keySet());
+        log.trace("Close created {} {}", taskTypeName, created.keySet());
         firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
         previousActiveTasks.clear();
         previousActiveTasks.addAll(running.keySet());
@@ -194,7 +192,7 @@ class AssignedTasks {
             try {
                 task.close(false, false);
             } catch (final RuntimeException e) {
-                log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id(), e);
+                log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
                 if (exception == null) {
                     exception = e;
                 }
@@ -213,16 +211,16 @@ class AssignedTasks {
             } catch (final CommitFailedException e) {
                 suspended.put(task.id(), task);
                 // commit failed during suspension. Just log it.
-                log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", logPrefix, taskTypeName, task.id());
+                log.warn("Failed to commit {} {} state when suspending due to CommitFailedException", taskTypeName, task.id());
             } catch (final ProducerFencedException e) {
                 closeZombieTask(task);
                 it.remove();
             } catch (final RuntimeException e) {
-                log.error("{} Suspending {} {} failed due to the following error:", logPrefix, taskTypeName, task.id(), e);
+                log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
                 try {
                     task.close(false, false);
                 } catch (final Exception f) {
-                    log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", logPrefix, taskTypeName, task.id(), f);
+                    log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
                 }
                 if (exception == null) {
                     exception = e;
@@ -233,11 +231,11 @@ class AssignedTasks {
     }
 
     private void closeZombieTask(final Task task) {
-        log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id());
+        log.warn("Producer of task {} fenced; closing zombie task", task.id());
         try {
             task.close(false, true);
         } catch (final Exception e) {
-            log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName, logPrefix, e);
+            log.warn("{} Failed to close zombie due to {}, ignore and proceed", taskTypeName, e);
         }
     }
 
@@ -248,22 +246,22 @@ class AssignedTasks {
     boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
         if (suspended.containsKey(taskId)) {
             final Task task = suspended.get(taskId);
-            log.trace("{} found suspended {} {}", logPrefix, taskTypeName, taskId);
+            log.trace("found suspended {} {}", taskTypeName, taskId);
             if (task.partitions().equals(partitions)) {
                 suspended.remove(taskId);
                 task.resume();
                 transitionToRunning(task);
-                log.trace("{} resuming suspended {} {}", logPrefix, taskTypeName, task.id());
+                log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
-                log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", logPrefix, taskId, partitions, task.partitions());
+                log.trace("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
             }
         }
         return false;
     }
 
     private void transitionToRunning(final Task task) {
-        log.debug("{} transitioning {} {} to running", logPrefix, taskTypeName, task.id());
+        log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
@@ -357,7 +355,7 @@ class AssignedTasks {
                     processed++;
                 }
             } catch (RuntimeException e) {
-                log.error("{} Failed to process {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e);
+                log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
                 throw e;
             }
         }
@@ -375,7 +373,7 @@ class AssignedTasks {
                     punctuated++;
                 }
             } catch (KafkaException e) {
-                log.error("{} Failed to punctuate {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e);
+                log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
                 throw e;
             }
         }
@@ -391,13 +389,12 @@ class AssignedTasks {
                 action.apply(task);
             } catch (final CommitFailedException e) {
                 // commit failed. This is already logged inside the task as WARN and we can just log it again here.
-                log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", logPrefix, taskTypeName, task.id(), action.name());
+                log.warn("Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", taskTypeName, task.id(), action.name());
             } catch (final ProducerFencedException e) {
                 closeZombieTask(task);
                 it.remove();
             } catch (final RuntimeException t) {
-                log.error("{} Failed to {} {} {} due to the following error:",
-                          logPrefix,
+                log.error("Failed to {} {} {} due to the following error:",
                           action.name(),
                           taskTypeName,
                           task.id(),
@@ -418,11 +415,11 @@ class AssignedTasks {
         while (standByTaskIterator.hasNext()) {
             final Task suspendedTask = standByTaskIterator.next();
             if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
-                log.debug("{} Closing suspended and not re-assigned {} {}", logPrefix, taskTypeName, suspendedTask.id());
+                log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
                 try {
                     suspendedTask.closeSuspended(true, false, null);
                 } catch (final Exception e) {
-                    log.error("{} Failed to remove suspended {} {} due to the following error:", logPrefix, taskTypeName, suspendedTask.id(), e);
+                    log.error("Failed to remove suspended {} {} due to the following error:", taskTypeName, suspendedTask.id(), e);
                 } finally {
                     standByTaskIterator.remove();
                 }
@@ -441,8 +438,7 @@ class AssignedTasks {
             try {
                 task.close(clean, false);
             } catch (final Throwable t) {
-                log.error("{} Failed while closing {} {} due to the following error:",
-                          logPrefix,
+                log.error("Failed while closing {} {} due to the following error:",
                           task.getClass().getSimpleName(),
                           task.id(),
                           t);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 32e8330..41ebcca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
@@ -46,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.St
  */
 public class GlobalStreamThread extends Thread {
 
-    private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
-
+    private final Logger log;
+    private final LogContext logContext;
     private final StreamsConfig config;
     private final Consumer<byte[], byte[]> consumer;
     private final StateDirectory stateDirectory;
@@ -147,10 +147,10 @@ public class GlobalStreamThread extends Thread {
                 // will be refused but we do not throw exception here
                 return false;
             } else if (!state.isValidTransition(newState)) {
-                log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState);
-                throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+                log.error("Unexpected state transition from {} to {}", oldState, newState);
+                throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
             } else {
-                log.info("{} State transition from {} to {}", logPrefix, oldState, newState);
+                log.info("State transition from {} to {}", oldState, newState);
             }
 
             state = newState;
@@ -185,8 +185,11 @@ public class GlobalStreamThread extends Thread {
         long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
                 (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
         this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
-        this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
-        this.logPrefix = String.format("global-stream-thread [%s]", threadClientId);
+        this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
+        this.logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
+        this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
+
     }
 
     static class StateConsumer {
@@ -195,17 +198,17 @@ public class GlobalStreamThread extends Thread {
         private final Time time;
         private final long pollMs;
         private final long flushInterval;
-        private final String logPrefix;
+        private final Logger log;
 
         private long lastFlush;
 
-        StateConsumer(final String logPrefix,
+        StateConsumer(final LogContext logContext,
                       final Consumer<byte[], byte[]> consumer,
                       final GlobalStateMaintainer stateMaintainer,
                       final Time time,
                       final long pollMs,
                       final long flushInterval) {
-            this.logPrefix = logPrefix;
+            this.log = logContext.logger(getClass());
             this.consumer = consumer;
             this.stateMaintainer = stateMaintainer;
             this.time = time;
@@ -240,7 +243,7 @@ public class GlobalStreamThread extends Thread {
             } catch (Exception e) {
                 // just log an error if the consumer throws an exception during close
                 // so we can always attempt to close the state stores.
-                log.error("{} Failed to close consumer due to the following error:", logPrefix, e);
+                log.error("Failed to close consumer due to the following error:", e);
             }
 
             stateMaintainer.close();
@@ -260,7 +263,7 @@ public class GlobalStreamThread extends Thread {
             setState(State.PENDING_SHUTDOWN);
             setState(State.DEAD);
 
-            log.warn("{} Error happened during initialization of the global state store; this thread has shutdown", logPrefix);
+            log.warn("Error happened during initialization of the global state store; this thread has shutdown");
 
             return;
         }
@@ -276,16 +279,16 @@ public class GlobalStreamThread extends Thread {
             // intentionally do not check the returned flag
             setState(State.PENDING_SHUTDOWN);
 
-            log.info("{} Shutting down", logPrefix);
+            log.info("Shutting down");
 
             try {
                 stateConsumer.close();
             } catch (IOException e) {
-                log.error("{} Failed to close state maintainer due to the following error:", logPrefix, e);
+                log.error("Failed to close state maintainer due to the following error:", e);
             }
             setState(DEAD);
 
-            log.info("{} Shutdown complete", logPrefix);
+            log.info("Shutdown complete");
         }
     }
 
@@ -293,7 +296,7 @@ public class GlobalStreamThread extends Thread {
         try {
             final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
             final StateConsumer stateConsumer
-                    = new StateConsumer(logPrefix,
+                    = new StateConsumer(this.logContext,
                                         consumer,
                                         new GlobalStateUpdateTask(topology,
                                                                   new GlobalProcessorContextImpl(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index acd7674..942e41a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,11 +42,11 @@ import java.util.Map;
 
 public class ProcessorStateManager implements StateManager {
 
-    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
 
     public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
 
+    private final Logger log;
     private final File baseDir;
     private final TaskId taskId;
     private final String logPrefix;
@@ -77,10 +77,12 @@ public class ProcessorStateManager implements StateManager {
                                  final StateDirectory stateDirectory,
                                  final Map<String, String> storeToChangelogTopic,
                                  final ChangelogReader changelogReader,
-                                 final boolean eosEnabled) throws IOException {
+                                 final boolean eosEnabled,
+                                 final LogContext logContext) throws IOException {
         this.taskId = taskId;
         this.changelogReader = changelogReader;
-        logPrefix = String.format("task [%s]", taskId);
+        logPrefix = String.format("task [%s] ", taskId);
+        this.log = logContext.logger(getClass());
 
         partitionForTopic = new HashMap<>();
         for (final TopicPartition source : sources) {
@@ -100,7 +102,7 @@ public class ProcessorStateManager implements StateManager {
         try {
             baseDir = stateDirectory.directoryForTask(taskId);
         } catch (final ProcessorStateException e) {
-            throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
+            throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s",
                 logPrefix, taskId, e));
         }
 
@@ -114,7 +116,7 @@ public class ProcessorStateManager implements StateManager {
             checkpoint = null;
         }
 
-        log.debug("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId);
+        log.debug("Created state store manager for task {} with the acquired state dir lock", taskId);
     }
 
 
@@ -139,14 +141,14 @@ public class ProcessorStateManager implements StateManager {
     public void register(final StateStore store,
                          final boolean loggingEnabled,
                          final StateRestoreCallback stateRestoreCallback) {
-        log.debug("{} Registering state store {} to its state manager", logPrefix, store.name());
+        log.debug("Registering state store {} to its state manager", store.name());
 
         if (store.name().equals(CHECKPOINT_FILE_NAME)) {
-            throw new IllegalArgumentException(String.format("%s Illegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME));
+            throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME));
         }
 
         if (stores.containsKey(store.name())) {
-            throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name()));
+            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", logPrefix, store.name()));
         }
 
         // check that the underlying change log topic exist or not
@@ -160,12 +162,12 @@ public class ProcessorStateManager implements StateManager {
 
         if (isStandby) {
             if (store.persistent()) {
-                log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", logPrefix, store.name(), topic);
+                log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic);
 
                 restoreCallbacks.put(topic, stateRestoreCallback);
             }
         } else {
-            log.trace("{} Restoring state store {} from changelog topic {}", logPrefix, store.name(), topic);
+            log.trace("Restoring state store {} from changelog topic {}", store.name(), topic);
             final StateRestorer restorer = new StateRestorer(storePartition,
                                                              new CompositeRestoreListener(stateRestoreCallback),
                                                              checkpointedOffsets.get(storePartition),
@@ -227,7 +229,7 @@ public class ProcessorStateManager implements StateManager {
             try {
                 restoreCallback.restoreAll(restoreRecords);
             } catch (final Exception e) {
-                throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", logPrefix, storePartition), e);
+                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e);
             }
         }
 
@@ -238,7 +240,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     void putOffsetLimit(final TopicPartition partition, final long limit) {
-        log.trace("{} Updating store offset limit for partition {} to {}", logPrefix, partition, limit);
+        log.trace("Updating store offset limit for partition {} to {}", partition, limit);
         offsetLimits.put(partition, limit);
     }
 
@@ -255,13 +257,13 @@ public class ProcessorStateManager implements StateManager {
     @Override
     public void flush() {
         if (!stores.isEmpty()) {
-            log.debug("{} Flushing all stores registered in the state manager", logPrefix);
+            log.debug("Flushing all stores registered in the state manager");
             for (final StateStore store : stores.values()) {
                 try {
-                    log.trace("{} Flushing store={}", logPrefix, store.name());
+                    log.trace("Flushing store={}", store.name());
                     store.flush();
                 } catch (final Exception e) {
-                    throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e);
+                    throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e);
                 }
             }
         }
@@ -278,16 +280,16 @@ public class ProcessorStateManager implements StateManager {
         // attempting to close the stores, just in case they
         // are not closed by a ProcessorNode yet
         if (!stores.isEmpty()) {
-            log.debug("{} Closing its state manager and all the registered state stores", logPrefix);
+            log.debug("Closing its state manager and all the registered state stores");
             for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
-                log.debug("{} Closing storage engine {}", logPrefix, entry.getKey());
+                log.debug("Closing storage engine {}", entry.getKey());
                 try {
                     entry.getValue().close();
                 } catch (final Exception e) {
                     if (firstException == null) {
-                        firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", logPrefix, entry.getKey()), e);
+                        firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e);
                     }
-                    log.error("{} Failed to close state store {}: ", logPrefix, entry.getKey(), e);
+                    log.error("Failed to close state store {}: ", entry.getKey(), e);
                 }
             }
 
@@ -305,7 +307,7 @@ public class ProcessorStateManager implements StateManager {
     // write the checkpoint
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
+        log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointedOffsets.putAll(changelogReader.restoredOffsets());
         for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
             final String storeName = entry.getKey();
@@ -339,7 +341,7 @@ public class ProcessorStateManager implements StateManager {
     }
 
     void registerGlobalStateStores(final List<StateStore> stateStores) {
-        log.debug("{} Register global stores {}", logPrefix, stateStores);
+        log.debug("Register global stores {}", stateStores);
         for (final StateStore stateStore : stateStores) {
             globalStores.put(stateStore.name(), stateStore);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 79e3350..4eec2d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -40,18 +40,19 @@ public class RecordCollectorImpl implements RecordCollector {
     private static final int MAX_SEND_ATTEMPTS = 3;
     private static final long SEND_RETRY_BACKOFF = 100L;
 
-    private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
-    
+
+    private final Logger log;
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
 
     private volatile KafkaException sendException;
 
-    public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) {
+    public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
         this.producer = producer;
-        offsets = new HashMap<>();
-        logPrefix = String.format("task [%s]", streamTaskId);
+        this.offsets = new HashMap<>();
+        this.logPrefix = String.format("task [%s] ", streamTaskId);
+        this.log = logContext.logger(getClass());
     }
 
     @Override
@@ -107,14 +108,14 @@ public class RecordCollectorImpl implements RecordCollector {
                             offsets.put(tp, metadata.offset());
                         } else {
                             if (sendException == null) {
-                                log.error("{} Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
+                                log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
                                                 "No more records will be sent and no more offsets will be recorded for this task.",
-                                        logPrefix, key, value, timestamp, topic, exception);
+                                        key, value, timestamp, topic, exception);
                                 if (exception instanceof ProducerFencedException) {
-                                    sendException = new ProducerFencedException(String.format("%s Abort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
+                                    sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
                                             logPrefix, key, value, timestamp, topic, exception.getMessage()));
                                 } else {
-                                    sendException = new StreamsException(String.format("%s Abort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
+                                    sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
                                             logPrefix, key, value, timestamp, topic, exception), exception);
                                 }
                             }
@@ -124,9 +125,9 @@ public class RecordCollectorImpl implements RecordCollector {
                 return;
             } catch (final TimeoutException e) {
                 if (attempt == MAX_SEND_ATTEMPTS) {
-                    throw new StreamsException(String.format("%s Failed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
+                    throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
                 }
-                log.warn("{} Timeout exception caught when sending record to topic {}; retrying with {} attempt", logPrefix, topic, attempt);
+                log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt);
                 Utils.sleep(SEND_RETRY_BACKOFF);
             }
         }
@@ -140,14 +141,14 @@ public class RecordCollectorImpl implements RecordCollector {
 
     @Override
     public void flush() {
-        log.debug("{} Flushing producer", logPrefix);
+        log.debug("Flushing producer");
         producer.flush();
         checkForException();
     }
 
     @Override
     public void close() {
-        log.debug("{} Closing producer", logPrefix);
+        log.debug("Closing producer");
         producer.close();
         checkForException();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 812a4ab..a9a03ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
@@ -70,7 +71,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
                        final StreamsConfig config,
                        final ProcessorStateManager stateMgr,
                        final StreamsMetrics metrics) {
-        super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
+        super(id, applicationId, config, metrics, stateMgr, new ThreadCache(new LogContext("zeroCache "), 0, metrics));
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 75151a8..033af24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -22,8 +22,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -36,7 +34,6 @@ import java.util.Map;
  */
 public class StandbyTask extends AbstractTask {
 
-    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
     private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
     /**
@@ -73,7 +70,7 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void resume() {
-        log.debug("{} Resuming", logPrefix);
+        log.debug("Resuming");
         updateOffsetLimits();
     }
 
@@ -86,7 +83,7 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void commit() {
-        log.trace("{} Committing", logPrefix);
+        log.trace("Committing");
         flushAndCheckpointState();
         // reinitialize offset limits
         updateOffsetLimits();
@@ -100,7 +97,7 @@ public class StandbyTask extends AbstractTask {
      */
     @Override
     public void suspend() {
-        log.debug("{} Suspending", logPrefix);
+        log.debug("Suspending");
         flushAndCheckpointState();
     }
 
@@ -124,7 +121,7 @@ public class StandbyTask extends AbstractTask {
         if (!taskInitialized) {
             return;
         }
-        log.debug("{} Closing", logPrefix);
+        log.debug("Closing");
         boolean committedSuccessfully = false;
         try {
             commit();
@@ -163,7 +160,7 @@ public class StandbyTask extends AbstractTask {
      */
     public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
                                                        final List<ConsumerRecord<byte[], byte[]>> records) {
-        log.trace("{} Updating standby replicas of its state store for partition [{}]", logPrefix, partition);
+        log.trace("Updating standby replicas of its state store for partition [{}]", partition);
         return stateMgr.updateStandbyStates(partition, records);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 57dff64..8ecc7e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -22,10 +22,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,9 +38,8 @@ import java.util.Map;
 import java.util.Set;
 
 public class StoreChangelogReader implements ChangelogReader {
-    private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
 
-    private final String logPrefix;
+    private final Logger log;
     private final Consumer<byte[], byte[]> consumer;
     private final StateRestoreListener stateRestoreListener;
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -51,16 +50,17 @@ public class StoreChangelogReader implements ChangelogReader {
 
     public StoreChangelogReader(final String threadId,
                                 final Consumer<byte[], byte[]> consumer,
-                                final StateRestoreListener stateRestoreListener) {
+                                final StateRestoreListener stateRestoreListener,
+                                final LogContext logContext) {
         this.consumer = consumer;
-
-        this.logPrefix = String.format("stream-thread [%s]", threadId);
+        this.log = logContext.logger(getClass());
         this.stateRestoreListener = stateRestoreListener;
     }
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
-                                final StateRestoreListener stateRestoreListener) {
-        this("", consumer, stateRestoreListener);
+                                final StateRestoreListener stateRestoreListener,
+                                final LogContext logContext) {
+        this("", consumer, stateRestoreListener, logContext);
     }
 
     @Override
@@ -116,7 +116,7 @@ public class StoreChangelogReader implements ChangelogReader {
             endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
         } catch (final TimeoutException e) {
             // if timeout exception gets thrown we just give up this time and retry in the next run loop
-            log.debug("{} Could not fetch end offset for {}; will fall back to partition by partition fetching", logPrefix, initializable);
+            log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
             return;
         }
 
@@ -140,7 +140,7 @@ public class StoreChangelogReader implements ChangelogReader {
                 }
                 needsInitializing.remove(topicPartition);
             } else {
-                log.info("{} End offset cannot be found form the returned metadata; removing this partition from the current run loop", logPrefix);
+                log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop");
                 iter.remove();
             }
         }
@@ -152,7 +152,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
-        log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet());
+        log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
         assignment.addAll(initialized.keySet());
@@ -186,8 +186,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
-        log.debug("{} Restoring partition {} from offset {} to endOffset {}",
-                  logPrefix,
+        log.debug("Restoring partition {} from offset {} to endOffset {}",
                   partition,
                   startingOffset,
                   endOffset);
@@ -196,7 +195,7 @@ public class StoreChangelogReader implements ChangelogReader {
     private Collection<TopicPartition> completed() {
         final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
         completed.removeAll(needsRestoring.keySet());
-        log.debug("{} completed partitions {}", logPrefix, completed);
+        log.debug("completed partitions {}", completed);
         return completed;
     }
 
@@ -204,7 +203,7 @@ public class StoreChangelogReader implements ChangelogReader {
         try {
             partitionInfo.putAll(consumer.listTopics());
         } catch (final TimeoutException e) {
-            log.debug("{} Could not fetch topic metadata within the timeout, will retry in the next run loop", logPrefix);
+            log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
         }
     }
 
@@ -244,8 +243,7 @@ public class StoreChangelogReader implements ChangelogReader {
                                       pos));
             }
 
-            log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
-                      logPrefix,
+            log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
                       topicPartition,
                       restorer.restoredNumRecords(),
                       restorer.startingOffset(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index d479a72..34e9e8a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
@@ -35,7 +36,6 @@ import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssigno
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -55,11 +55,11 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager.
 
 public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider {
 
-    private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
     private Time time = Time.SYSTEM;
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
 
+    private Logger log;
     private String logPrefix;
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
@@ -200,6 +200,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+        //Initializing the logger without threadDataProvider name because provider name is not known/verified at this point
+        logPrefix = String.format("stream-thread ");
+        LogContext logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
+
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
             KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -216,7 +221,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
         threadDataProvider = (ThreadDataProvider) o;
         threadDataProvider.setThreadMetadataProvider(this);
 
-        logPrefix = String.format("stream-thread [%s]", threadDataProvider.name());
+        //Reassigning the logger with threadDataProvider name
+        logPrefix = String.format("stream-thread [%s] ", threadDataProvider.name());
+        logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
 
         String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
@@ -273,7 +281,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
 
     private void updateSubscribedTopics(Set<String> topics) {
         SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-        log.debug("{} found {} topics possibly matching regex", logPrefix, topics);
+        log.debug("found {} topics possibly matching regex", topics);
         // update the topic groups with the returned subscription set for regex pattern subscriptions
         subscriptionUpdates.updateTopics(topics);
         threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, threadDataProvider.name());
@@ -322,7 +330,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             clientMetadata.addConsumer(consumerId, info);
         }
 
-        log.debug("{} Constructed client metadata {} from the member subscriptions.", logPrefix, clientsMetadata);
+        log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
 
         // ---------------- Step Zero ---------------- //
 
@@ -409,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
 
         metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
 
-        log.debug("{} Created repartition topics {} from the parsed topology.", logPrefix, allRepartitionTopicPartitions.values());
+        log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
 
         // ---------------- Step One ---------------- //
 
@@ -431,7 +439,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             Set<TopicPartition> partitions = entry.getValue();
             for (TopicPartition partition : partitions) {
                 if (allAssignedPartitions.contains(partition)) {
-                    log.warn("{} Partition {} is assigned to more than one tasks: {}", logPrefix, partition, partitionsForTask);
+                    log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
                 }
             }
             allAssignedPartitions.addAll(partitions);
@@ -450,11 +458,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
                 for (PartitionInfo partitionInfo : partitionInfoList) {
                     TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
-                        log.warn("{} Partition {} is not assigned to any tasks: {}", logPrefix, partition, partitionsForTask);
+                        log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
                     }
                 }
             } else {
-                log.warn("{} No partitions found for topic {}", logPrefix, topic);
+                log.warn("No partitions found for topic {}", topic);
             }
         }
 
@@ -477,14 +485,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
 
                     changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
                 } else {
-                    log.debug("{} No tasks found for topic group {}", logPrefix, topicGroupId);
+                    log.debug("No tasks found for topic group {}", topicGroupId);
                 }
             }
         }
 
         prepareTopic(changelogTopicMetadata);
 
-        log.debug("{} Created state changelog topics {} from the parsed topology.", logPrefix, changelogTopicMetadata.values());
+        log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
 
         // ---------------- Step Two ---------------- //
 
@@ -494,13 +502,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             states.put(entry.getKey(), entry.getValue().state);
         }
 
-        log.debug("{} Assigning tasks {} to clients {} with number of replicas {}",
+        log.debug("Assigning tasks {} to clients {} with number of replicas {}",
                 logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas);
 
         final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet());
         taskAssignor.assign(numStandbyReplicas);
 
-        log.info("{} Assigned tasks to clients as {}.", logPrefix, states);
+        log.info("Assigned tasks to clients as {}.", states);
 
         // ---------------- Step Three ---------------- //
 
@@ -591,7 +599,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
         // could be duplicated if one task has more than one assigned partitions
         if (partitions.size() != info.activeTasks.size()) {
             throw new TaskAssignmentException(
-                    String.format("%s Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+                    String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
                             ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
             );
         }
@@ -646,7 +654,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
      */
     @SuppressWarnings("deprecation")
     private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
-        log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
+        log.debug("Starting to validate internal topics in partition assignor.");
 
         // first construct the topics to make ready
         Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
@@ -660,7 +668,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
                 continue;
             }
             if (numPartitions < 0) {
-                throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
+                throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
 
             topicsToMakeReady.put(topic, numPartitions);
@@ -680,7 +688,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
             }
         }
 
-        log.debug("{} Completed validating internal topics in partition assignor", logPrefix);
+        log.debug("Completed validating internal topics in partition assignor");
     }
 
     private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
@@ -785,7 +793,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
                     final Integer partitions = metadata.partitionCountForTopic(topic);
 
                     if (partitions == null) {
-                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
+                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic not found: %s", logPrefix, topic));
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -793,7 +801,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
                     } else if (numPartitions != partitions) {
                         final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                         Arrays.sort(topics);
-                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+                        throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
                 } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
                     numPartitions = NOT_AVAILABLE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 288a597..084a991 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -37,8 +38,6 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -53,8 +52,6 @@ import static java.util.Collections.singleton;
  */
 public class StreamTask extends AbstractTask implements ProcessorNodePunctuator {
 
-    private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
-
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
 
     private final PartitionGroup partitionGroup;
@@ -129,7 +126,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         consumedOffsets = new HashMap<>();
 
         this.producer = producer;
-        recordCollector = createRecordCollector();
+        recordCollector = createRecordCollector(logContext);
 
         // initialize the topology with its own context
         processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
@@ -162,7 +159,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     @Override
     public void resume() {
-        log.debug("{} Resuming", logPrefix);
+        log.debug("Resuming");
         if (eosEnabled) {
             producer.beginTransaction();
             transactionInFlight = true;
@@ -190,12 +187,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             final ProcessorNode currNode = recordInfo.node();
             final TopicPartition partition = recordInfo.partition();
 
-            log.trace("{} Start processing one record [{}]", logPrefix, record);
+            log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode);
             currNode.process(record.key(), record.value());
 
-            log.trace("{} Completed processing one record [{}]", logPrefix, record);
+            log.trace("Completed processing one record [{}]", record);
 
             // update the consumed offset map after processing is done
             consumedOffsets.put(partition, record.offset());
@@ -227,19 +224,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     @Override
     public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() != null) {
-            throw new IllegalStateException(String.format("%s Current node is not null", logPrefix));
+            throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
         }
 
         updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
 
         if (log.isTraceEnabled()) {
-            log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type);
+            log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
         }
 
         try {
             node.punctuate(timestamp, punctuator);
         } catch (final KafkaException e) {
-            throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix,  node.name()), e);
+            throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix,  node.name()), e);
         } finally {
             processorContext.setCurrentNode(null);
         }
@@ -264,7 +261,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     // visible for testing
     void commit(final boolean startNewTransaction) {
-        log.debug("{} Committing", logPrefix);
+        log.debug("Committing");
         metrics.metrics.measureLatencyNs(
             time,
             new Runnable() {
@@ -289,14 +286,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     @Override
     protected void flushState() {
-        log.trace("{} Flushing state and producer", logPrefix);
+        log.trace("Flushing state and producer");
         super.flushState();
         recordCollector.flush();
     }
 
     private void commitOffsets(final boolean startNewTransaction) {
         if (commitOffsetNeeded) {
-            log.trace("{} Committing offsets", logPrefix);
+            log.trace("Committing offsets");
             final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
             for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
                 final TopicPartition partition = entry.getKey();
@@ -317,7 +314,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 try {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 } catch (final CommitFailedException e) {
-                    log.warn("{} Failed offset commits {} due to CommitFailedException", logPrefix, consumedOffsetsAndMetadata);
+                    log.warn("Failed offset commits {} due to CommitFailedException", consumedOffsetsAndMetadata);
                     throw e;
                 }
             }
@@ -330,7 +327,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     void initTopology() {
         // initialize the task by initializing all its processor nodes in the topology
-        log.trace("{} Initializing processor nodes of the topology", logPrefix);
+        log.trace("Initializing processor nodes of the topology");
         for (final ProcessorNode node : topology.processors()) {
             processorContext.setCurrentNode(node);
             try {
@@ -352,7 +349,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     @Override
     public void suspend() {
-        log.debug("{} Suspending", logPrefix);
+        log.debug("Suspending");
         suspend(true);
     }
 
@@ -374,7 +371,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     private void closeTopology() {
-        log.trace("{} Closing processor topology", logPrefix);
+        log.trace("Closing processor topology");
 
         partitionGroup.clear();
 
@@ -411,7 +408,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (firstException == null) {
                 firstException = e;
             }
-            log.error("{} Could not close state manager due to the following error:", logPrefix, e);
+            log.error("Could not close state manager due to the following error:", e);
         }
 
         try {
@@ -434,7 +431,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                         recordCollector.close();
                     }
                 } catch (final Throwable e) {
-                    log.error("{} Failed to close producer due to the following error:", logPrefix, e);
+                    log.error("Failed to close producer due to the following error:", e);
                 }
             }
         }
@@ -467,7 +464,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     @Override
     public void close(boolean clean,
                       final boolean isZombie) {
-        log.debug("{} Closing", logPrefix);
+        log.debug("Closing");
 
         RuntimeException firstException = null;
         try {
@@ -475,7 +472,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         } catch (final RuntimeException e) {
             clean = false;
             firstException = e;
-            log.error("{} Could not close task due to the following error:", logPrefix, e);
+            log.error("Could not close task due to the following error:", e);
         }
 
         closeSuspended(clean, isZombie, firstException);
@@ -495,7 +492,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
         if (log.isTraceEnabled()) {
-            log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);
+            log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
         }
 
         // if after adding these records, its partition queue's buffered size has been
@@ -516,7 +513,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
         if (processorContext.currentNode() == null) {
-            throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
+            throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
         }
 
         final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
@@ -596,12 +593,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     // visible for testing only
-    RecordCollector createRecordCollector() {
-        return new RecordCollectorImpl(producer, id.toString());
+    RecordCollector createRecordCollector(final LogContext logContext) {
+        return new RecordCollectorImpl(producer, id.toString(), logContext);
     }
 
     public boolean initialize() {
-        log.debug("{} Initializing", logPrefix);
+        log.debug("Initializing");
         initializeStateStores();
         initTopology();
         processorContext.initialized();