You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/18 08:53:27 UTC
[3/3] kafka git commit: KAFKA-5754; Refactor Streams to use LogContext
KAFKA-5754; Refactor Streams to use LogContext
This PR utilizes `org.apache.kafka.common.utils.LogContext` for logging in `KafkaStreams`. hachikuji, ijuma please review this and let me know your thoughts.
Author: umesh chaudhary <um...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #3727 from umesh9794/KAFKA-5754
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f305dd68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f305dd68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f305dd68
Branch: refs/heads/trunk
Commit: f305dd68f6524abc25c4ed88983f0e78b4e6c243
Parents: 6055c74
Author: umesh chaudhary <um...@gmail.com>
Authored: Mon Sep 18 09:53:27 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 18 09:53:27 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 43 +++---
.../processor/internals/AbstractTask.java | 31 ++--
.../processor/internals/AssignedTasks.java | 66 ++++-----
.../processor/internals/GlobalStreamThread.java | 37 ++---
.../internals/ProcessorStateManager.java | 46 +++---
.../internals/RecordCollectorImpl.java | 29 ++--
.../processor/internals/StandbyContextImpl.java | 3 +-
.../processor/internals/StandbyTask.java | 13 +-
.../internals/StoreChangelogReader.java | 32 ++--
.../internals/StreamPartitionAssignor.java | 46 +++---
.../streams/processor/internals/StreamTask.java | 51 +++----
.../processor/internals/StreamThread.java | 145 +++++++++----------
.../processor/internals/TaskManager.java | 40 ++---
.../streams/state/internals/ThreadCache.java | 15 +-
.../apache/kafka/streams/KafkaStreamsTest.java | 1 +
...reamSessionWindowAggregateProcessorTest.java | 3 +-
.../internals/AbstractProcessorContextTest.java | 3 +-
.../processor/internals/AbstractTaskTest.java | 3 +-
.../processor/internals/AssignedTasksTest.java | 3 +-
.../processor/internals/ProcessorNodeTest.java | 3 +-
.../internals/ProcessorStateManagerTest.java | 51 +++++--
.../internals/RecordCollectorTest.java | 25 +++-
.../processor/internals/RecordQueueTest.java | 3 +-
.../processor/internals/SinkNodeTest.java | 3 +-
.../processor/internals/StandbyTaskTest.java | 3 +-
.../processor/internals/StateConsumerTest.java | 6 +-
.../internals/StoreChangelogReaderTest.java | 6 +-
.../processor/internals/StreamTaskTest.java | 11 +-
.../streams/state/KeyValueStoreTestDriver.java | 5 +-
.../internals/CachingKeyValueStoreTest.java | 3 +-
.../internals/CachingSessionStoreTest.java | 3 +-
.../state/internals/CachingWindowStoreTest.java | 3 +-
.../ChangeLoggingKeyValueBytesStoreTest.java | 3 +-
.../ChangeLoggingKeyValueStoreTest.java | 3 +-
...rtedCacheKeyValueBytesStoreIteratorTest.java | 5 +-
...rtedCacheWrappedWindowStoreIteratorTest.java | 3 +-
.../state/internals/MeteredWindowStoreTest.java | 3 +-
.../RocksDBKeyValueStoreSupplierTest.java | 3 +-
.../RocksDBSegmentedBytesStoreTest.java | 3 +-
.../RocksDBSessionStoreSupplierTest.java | 3 +-
.../internals/RocksDBSessionStoreTest.java | 3 +-
.../state/internals/RocksDBStoreTest.java | 5 +-
.../RocksDBWindowStoreSupplierTest.java | 3 +-
.../state/internals/RocksDBWindowStoreTest.java | 5 +-
.../state/internals/SegmentIteratorTest.java | 3 +-
.../streams/state/internals/SegmentsTest.java | 3 +-
.../state/internals/StoreChangeLoggerTest.java | 3 +-
.../StreamThreadStateStoreProviderTest.java | 3 +-
.../state/internals/ThreadCacheTest.java | 54 +++----
.../apache/kafka/test/KStreamTestDriver.java | 8 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 6 +-
51 files changed, 466 insertions(+), 391 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 7698f39..b31a3e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
@@ -57,7 +58,6 @@ import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -124,7 +124,6 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG
@InterfaceStability.Evolving
public class KafkaStreams {
- private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
private static final String JMX_PREFIX = "kafka.streams";
private static final int DEFAULT_CLOSE_TIMEOUT = 0;
@@ -132,10 +131,10 @@ public class KafkaStreams {
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
// usage only and should not be exposed to users at all.
+ private final Logger log;
+ private final String logPrefix;
private final UUID processId;
-
private final Metrics metrics;
- private final String logPrefix;
private final StreamsConfig config;
private final StreamThread[] threads;
private final StateDirectory stateDirectory;
@@ -230,7 +229,7 @@ public class KafkaStreams {
// it is ok: just move on to the next iteration
}
} else {
- log.debug("{} Cannot transit to {} within {}ms", logPrefix, targetState, waitMs);
+ log.debug("Cannot transit to {} within {}ms", targetState, waitMs);
return false;
}
elapsedMs = System.currentTimeMillis() - begin;
@@ -256,10 +255,10 @@ public class KafkaStreams {
// will be refused but we do not throw exception here, to allow idempotent close calls
return false;
} else if (!state.isValidTransition(newState)) {
- log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState);
- throw new IllegalStateException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+ log.error("Unexpected state transition from {} to {}", oldState, newState);
+ throw new IllegalStateException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
} else {
- log.info("{} State transition from {} to {}", logPrefix, oldState, newState);
+ log.info("State transition from {} to {}", oldState, newState);
}
state = newState;
stateLock.notifyAll();
@@ -406,7 +405,7 @@ public class KafkaStreams {
}
if (setState(State.ERROR)) {
- log.warn("{} All stream threads have died. The instance will be in error state and should be closed.", logPrefix);
+ log.warn("All stream threads have died. The instance will be in error state and should be closed.");
}
}
@@ -453,7 +452,7 @@ public class KafkaStreams {
// special case when global thread is dead
if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) {
- log.warn("{} Global thread has died. The instance will be in error state and should be closed.", logPrefix);
+ log.warn("Global thread has died. The instance will be in error state and should be closed.");
}
}
}
@@ -542,7 +541,11 @@ public class KafkaStreams {
if (clientId.length() <= 0)
clientId = applicationId + "-" + processId;
- this.logPrefix = String.format("stream-client [%s]", clientId);
+ this.logPrefix = String.format("stream-client [%s] ", clientId);
+
+ final LogContext logContext = new LogContext(logPrefix);
+
+ this.log = logContext.logger(getClass());
final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
@@ -565,7 +568,7 @@ public class KafkaStreams {
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) {
- log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix);
+ log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
@@ -673,7 +676,7 @@ public class KafkaStreams {
try {
client.close();
} catch (final IOException e) {
- log.warn("{} Could not close StreamKafkaClient.", logPrefix, e);
+ log.warn("Could not close StreamKafkaClient.", e);
}
}
@@ -690,7 +693,7 @@ public class KafkaStreams {
* @throws StreamsException if the Kafka brokers have version 0.10.0.x
*/
public synchronized void start() throws IllegalStateException, StreamsException {
- log.debug("{} Starting Streams client", logPrefix);
+ log.debug("Starting Streams client");
// first set state to RUNNING before kicking off the threads,
// making sure the state will always transit to RUNNING before REBALANCING
@@ -715,12 +718,12 @@ public class KafkaStreams {
}
}, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
- log.info("{} Started Streams client", logPrefix);
+ log.info("Started Streams client");
} else {
// if transition failed but no exception is thrown; currently it is not possible
// since we do not allow calling start multiple times whether or not it is already shutdown.
// TODO: In the future if we lift this restriction this code path could then be triggered and be updated
- log.error("{} Already stopped, cannot re-start", logPrefix);
+ log.error("Already stopped, cannot re-start");
}
}
@@ -744,12 +747,12 @@ public class KafkaStreams {
* Note that this method must not be called in the {@code onChange} callback of {@link StateListener}.
*/
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
- log.debug("{} Stopping Streams client with timeoutMillis = {} ms.", logPrefix, timeUnit.toMillis(timeout));
+ log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
- log.info("{} Already in the pending shutdown state, wait to complete shutdown", logPrefix);
+ log.info("Already in the pending shutdown state, wait to complete shutdown");
} else {
stateDirCleaner.shutdownNow();
@@ -798,10 +801,10 @@ public class KafkaStreams {
}
if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) {
- log.info("{} Streams client stopped completely", logPrefix);
+ log.info("Streams client stopped completely");
return true;
} else {
- log.info("{} Streams client cannot stop completely within the timeout", logPrefix);
+ log.info("Streams client cannot stop completely within the timeout");
return false;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index b4c8c16..5ed9aae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -30,7 +31,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
@@ -40,7 +40,6 @@ import java.util.Map;
import java.util.Set;
public abstract class AbstractTask implements Task {
- private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
final TaskId id;
final String applicationId;
@@ -50,6 +49,8 @@ public abstract class AbstractTask implements Task {
final Consumer consumer;
final String logPrefix;
final boolean eosEnabled;
+ final Logger log;
+ final LogContext logContext;
boolean taskInitialized;
private final StateDirectory stateDirectory;
@@ -75,7 +76,9 @@ public abstract class AbstractTask implements Task {
this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
this.stateDirectory = stateDirectory;
- logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
+ this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" : "task", id());
+ this.logContext = new LogContext(logPrefix);
+ this.log = logContext.logger(getClass());
// create the processor state manager
try {
@@ -86,9 +89,10 @@ public abstract class AbstractTask implements Task {
stateDirectory,
topology.storeToChangelogTopic(),
changelogReader,
- eosEnabled);
+ eosEnabled,
+ logContext);
} catch (final IOException e) {
- throw new ProcessorStateException(String.format("%s Error while creating the state manager", logPrefix), e);
+ throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e);
}
}
@@ -173,7 +177,7 @@ public abstract class AbstractTask implements Task {
stateMgr.putOffsetLimit(partition, offset);
if (log.isTraceEnabled()) {
- log.trace("{} Updating store offset limits {} for changelog {}", logPrefix, offset, partition);
+ log.trace("Updating store offset limits {} for changelog {}", offset, partition);
}
} catch (final AuthorizationException e) {
throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e);
@@ -199,21 +203,20 @@ public abstract class AbstractTask implements Task {
try {
if (!stateDirectory.lock(id, 5)) {
- throw new LockException(String.format("%s Failed to lock the state directory for task %s",
+ throw new LockException(String.format("%sFailed to lock the state directory for task %s",
logPrefix, id));
}
} catch (IOException e) {
- throw new StreamsException(String.format("%s fatal error while trying to lock the state directory for task %s",
- logPrefix,
- id));
+ throw new StreamsException(String.format("%sFatal error while trying to lock the state directory for task %s",
+ logPrefix, id));
}
- log.trace("{} Initializing state stores", logPrefix);
+ log.trace("Initializing state stores");
// set initial offset limits
updateOffsetLimits();
for (final StateStore store : topology.stateStores()) {
- log.trace("{} Initializing store {}", logPrefix, store.name());
+ log.trace("Initializing store {}", store.name());
store.init(processorContext, store);
}
}
@@ -225,7 +228,7 @@ public abstract class AbstractTask implements Task {
*/
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
ProcessorStateException exception = null;
- log.trace("{} Closing state manager", logPrefix);
+ log.trace("Closing state manager");
try {
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
} catch (final ProcessorStateException e) {
@@ -235,7 +238,7 @@ public abstract class AbstractTask implements Task {
stateDirectory.unlock(id);
} catch (IOException e) {
if (exception == null) {
- exception = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e);
+ exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index a1966b1..2d886b7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -20,10 +20,10 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,8 +38,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
class AssignedTasks {
- private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class);
- private final String logPrefix;
+ private final Logger log;
private final String taskTypeName;
private final TaskAction maybeCommitAction;
private final TaskAction commitAction;
@@ -54,11 +53,12 @@ class AssignedTasks {
private int committed = 0;
- AssignedTasks(final String logPrefix,
+ AssignedTasks(final LogContext logContext,
final String taskTypeName) {
- this.logPrefix = logPrefix;
this.taskTypeName = taskTypeName;
+ this.log = logContext.logger(getClass());
+
maybeCommitAction = new TaskAction() {
@Override
public String name() {
@@ -71,8 +71,7 @@ class AssignedTasks {
committed++;
task.commit();
if (log.isDebugEnabled()) {
- log.debug("{} Committed active task {} per user request in",
- logPrefix, task.id());
+ log.debug("Committed active task {} per user request in", task.id());
}
}
}
@@ -110,13 +109,13 @@ class AssignedTasks {
void initializeNewTasks() {
if (!created.isEmpty()) {
- log.trace("{} Initializing {}s {}", logPrefix, taskTypeName, created.keySet());
+ log.trace("Initializing {}s {}", taskTypeName, created.keySet());
}
for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, Task> entry = it.next();
try {
if (!entry.getValue().initialize()) {
- log.debug("{} transitioning {} {} to restoring", logPrefix, taskTypeName, entry.getKey());
+ log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
restoring.put(entry.getKey(), entry.getValue());
} else {
transitionToRunning(entry.getValue());
@@ -124,7 +123,7 @@ class AssignedTasks {
it.remove();
} catch (final LockException e) {
// made this trace as it will spam the logs in the poll loop.
- log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
+ log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
}
}
}
@@ -133,7 +132,7 @@ class AssignedTasks {
if (restored.isEmpty()) {
return Collections.emptySet();
}
- log.trace("{} {} partitions restored for {}", logPrefix, taskTypeName, restored);
+ log.trace("{} partitions restored for {}", taskTypeName, restored);
final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
@@ -147,8 +146,7 @@ class AssignedTasks {
if (log.isTraceEnabled()) {
final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions());
outstandingPartitions.removeAll(restoredPartitions);
- log.trace("{} partition restoration not complete for {} {} partitions: {}",
- logPrefix,
+ log.trace("partition restoration not complete for {} {} partitions: {}",
taskTypeName,
task.id(),
task.changelogPartitions());
@@ -173,11 +171,11 @@ class AssignedTasks {
RuntimeException suspend() {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
- log.trace("{} Suspending running {} {}", logPrefix, taskTypeName, runningTaskIds());
+ log.trace("Suspending running {} {}", taskTypeName, runningTaskIds());
firstException.compareAndSet(null, suspendTasks(running.values()));
- log.trace("{} Close restoring {} {}", logPrefix, taskTypeName, restoring.keySet());
+ log.trace("Close restoring {} {}", taskTypeName, restoring.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(restoring.values()));
- log.trace("{} Close created {} {}", logPrefix, taskTypeName, created.keySet());
+ log.trace("Close created {} {}", taskTypeName, created.keySet());
firstException.compareAndSet(null, closeNonRunningTasks(created.values()));
previousActiveTasks.clear();
previousActiveTasks.addAll(running.keySet());
@@ -194,7 +192,7 @@ class AssignedTasks {
try {
task.close(false, false);
} catch (final RuntimeException e) {
- log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id(), e);
+ log.error("Failed to close {}, {}", taskTypeName, task.id(), e);
if (exception == null) {
exception = e;
}
@@ -213,16 +211,16 @@ class AssignedTasks {
} catch (final CommitFailedException e) {
suspended.put(task.id(), task);
// commit failed during suspension. Just log it.
- log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", logPrefix, taskTypeName, task.id());
+ log.warn("Failed to commit {} {} state when suspending due to CommitFailedException", taskTypeName, task.id());
} catch (final ProducerFencedException e) {
closeZombieTask(task);
it.remove();
} catch (final RuntimeException e) {
- log.error("{} Suspending {} {} failed due to the following error:", logPrefix, taskTypeName, task.id(), e);
+ log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e);
try {
task.close(false, false);
} catch (final Exception f) {
- log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", logPrefix, taskTypeName, task.id(), f);
+ log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f);
}
if (exception == null) {
exception = e;
@@ -233,11 +231,11 @@ class AssignedTasks {
}
private void closeZombieTask(final Task task) {
- log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id());
+ log.warn("Producer of task {} fenced; closing zombie task", task.id());
try {
task.close(false, true);
} catch (final Exception e) {
- log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName, logPrefix, e);
+ log.warn("{} Failed to close zombie due to {}, ignore and proceed", taskTypeName, e);
}
}
@@ -248,22 +246,22 @@ class AssignedTasks {
boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspended.containsKey(taskId)) {
final Task task = suspended.get(taskId);
- log.trace("{} found suspended {} {}", logPrefix, taskTypeName, taskId);
+ log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
task.resume();
transitionToRunning(task);
- log.trace("{} resuming suspended {} {}", logPrefix, taskTypeName, task.id());
+ log.trace("resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
- log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", logPrefix, taskId, partitions, task.partitions());
+ log.trace("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions());
}
}
return false;
}
private void transitionToRunning(final Task task) {
- log.debug("{} transitioning {} {} to running", logPrefix, taskTypeName, task.id());
+ log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
for (TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
@@ -357,7 +355,7 @@ class AssignedTasks {
processed++;
}
} catch (RuntimeException e) {
- log.error("{} Failed to process {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e);
+ log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e);
throw e;
}
}
@@ -375,7 +373,7 @@ class AssignedTasks {
punctuated++;
}
} catch (KafkaException e) {
- log.error("{} Failed to punctuate {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e);
+ log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e);
throw e;
}
}
@@ -391,13 +389,12 @@ class AssignedTasks {
action.apply(task);
} catch (final CommitFailedException e) {
// commit failed. This is already logged inside the task as WARN and we can just log it again here.
- log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", logPrefix, taskTypeName, task.id(), action.name());
+ log.warn("Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", taskTypeName, task.id(), action.name());
} catch (final ProducerFencedException e) {
closeZombieTask(task);
it.remove();
} catch (final RuntimeException t) {
- log.error("{} Failed to {} {} {} due to the following error:",
- logPrefix,
+ log.error("Failed to {} {} {} due to the following error:",
action.name(),
taskTypeName,
task.id(),
@@ -418,11 +415,11 @@ class AssignedTasks {
while (standByTaskIterator.hasNext()) {
final Task suspendedTask = standByTaskIterator.next();
if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) {
- log.debug("{} Closing suspended and not re-assigned {} {}", logPrefix, taskTypeName, suspendedTask.id());
+ log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id());
try {
suspendedTask.closeSuspended(true, false, null);
} catch (final Exception e) {
- log.error("{} Failed to remove suspended {} {} due to the following error:", logPrefix, taskTypeName, suspendedTask.id(), e);
+ log.error("Failed to remove suspended {} {} due to the following error:", taskTypeName, suspendedTask.id(), e);
} finally {
standByTaskIterator.remove();
}
@@ -441,8 +438,7 @@ class AssignedTasks {
try {
task.close(clean, false);
} catch (final Throwable t) {
- log.error("{} Failed while closing {} {} due to the following error:",
- logPrefix,
+ log.error("Failed while closing {} {} due to the following error:",
task.getClass().getSimpleName(),
task.id(),
t);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 32e8330..41ebcca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@@ -46,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.St
*/
public class GlobalStreamThread extends Thread {
- private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
-
+ private final Logger log;
+ private final LogContext logContext;
private final StreamsConfig config;
private final Consumer<byte[], byte[]> consumer;
private final StateDirectory stateDirectory;
@@ -147,10 +147,10 @@ public class GlobalStreamThread extends Thread {
// will be refused but we do not throw exception here
return false;
} else if (!state.isValidTransition(newState)) {
- log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState);
- throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState);
+ log.error("Unexpected state transition from {} to {}", oldState, newState);
+ throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState);
} else {
- log.info("{} State transition from {} to {}", logPrefix, oldState, newState);
+ log.info("State transition from {} to {}", oldState, newState);
}
state = newState;
@@ -185,8 +185,11 @@ public class GlobalStreamThread extends Thread {
long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
- this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
- this.logPrefix = String.format("global-stream-thread [%s]", threadClientId);
+ this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
+ this.logContext = new LogContext(logPrefix);
+ this.log = logContext.logger(getClass());
+ this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
+
}
static class StateConsumer {
@@ -195,17 +198,17 @@ public class GlobalStreamThread extends Thread {
private final Time time;
private final long pollMs;
private final long flushInterval;
- private final String logPrefix;
+ private final Logger log;
private long lastFlush;
- StateConsumer(final String logPrefix,
+ StateConsumer(final LogContext logContext,
final Consumer<byte[], byte[]> consumer,
final GlobalStateMaintainer stateMaintainer,
final Time time,
final long pollMs,
final long flushInterval) {
- this.logPrefix = logPrefix;
+ this.log = logContext.logger(getClass());
this.consumer = consumer;
this.stateMaintainer = stateMaintainer;
this.time = time;
@@ -240,7 +243,7 @@ public class GlobalStreamThread extends Thread {
} catch (Exception e) {
// just log an error if the consumer throws an exception during close
// so we can always attempt to close the state stores.
- log.error("{} Failed to close consumer due to the following error:", logPrefix, e);
+ log.error("Failed to close consumer due to the following error:", e);
}
stateMaintainer.close();
@@ -260,7 +263,7 @@ public class GlobalStreamThread extends Thread {
setState(State.PENDING_SHUTDOWN);
setState(State.DEAD);
- log.warn("{} Error happened during initialization of the global state store; this thread has shutdown", logPrefix);
+ log.warn("Error happened during initialization of the global state store; this thread has shutdown");
return;
}
@@ -276,16 +279,16 @@ public class GlobalStreamThread extends Thread {
// intentionally do not check the returned flag
setState(State.PENDING_SHUTDOWN);
- log.info("{} Shutting down", logPrefix);
+ log.info("Shutting down");
try {
stateConsumer.close();
} catch (IOException e) {
- log.error("{} Failed to close state maintainer due to the following error:", logPrefix, e);
+ log.error("Failed to close state maintainer due to the following error:", e);
}
setState(DEAD);
- log.info("{} Shutdown complete", logPrefix);
+ log.info("Shutdown complete");
}
}
@@ -293,7 +296,7 @@ public class GlobalStreamThread extends Thread {
try {
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
final StateConsumer stateConsumer
- = new StateConsumer(logPrefix,
+ = new StateConsumer(this.logContext,
consumer,
new GlobalStateUpdateTask(topology,
new GlobalProcessorContextImpl(
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index acd7674..942e41a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -42,11 +42,11 @@ import java.util.Map;
public class ProcessorStateManager implements StateManager {
- private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+ private final Logger log;
private final File baseDir;
private final TaskId taskId;
private final String logPrefix;
@@ -77,10 +77,12 @@ public class ProcessorStateManager implements StateManager {
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final ChangelogReader changelogReader,
- final boolean eosEnabled) throws IOException {
+ final boolean eosEnabled,
+ final LogContext logContext) throws IOException {
this.taskId = taskId;
this.changelogReader = changelogReader;
- logPrefix = String.format("task [%s]", taskId);
+ logPrefix = String.format("task [%s] ", taskId);
+ this.log = logContext.logger(getClass());
partitionForTopic = new HashMap<>();
for (final TopicPartition source : sources) {
@@ -100,7 +102,7 @@ public class ProcessorStateManager implements StateManager {
try {
baseDir = stateDirectory.directoryForTask(taskId);
} catch (final ProcessorStateException e) {
- throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
+ throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s",
logPrefix, taskId, e));
}
@@ -114,7 +116,7 @@ public class ProcessorStateManager implements StateManager {
checkpoint = null;
}
- log.debug("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId);
+ log.debug("Created state store manager for task {} with the acquired state dir lock", taskId);
}
@@ -139,14 +141,14 @@ public class ProcessorStateManager implements StateManager {
public void register(final StateStore store,
final boolean loggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
- log.debug("{} Registering state store {} to its state manager", logPrefix, store.name());
+ log.debug("Registering state store {} to its state manager", store.name());
if (store.name().equals(CHECKPOINT_FILE_NAME)) {
- throw new IllegalArgumentException(String.format("%s Illegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME));
+ throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME));
}
if (stores.containsKey(store.name())) {
- throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name()));
+ throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", logPrefix, store.name()));
}
// check that the underlying change log topic exist or not
@@ -160,12 +162,12 @@ public class ProcessorStateManager implements StateManager {
if (isStandby) {
if (store.persistent()) {
- log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", logPrefix, store.name(), topic);
+ log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic);
restoreCallbacks.put(topic, stateRestoreCallback);
}
} else {
- log.trace("{} Restoring state store {} from changelog topic {}", logPrefix, store.name(), topic);
+ log.trace("Restoring state store {} from changelog topic {}", store.name(), topic);
final StateRestorer restorer = new StateRestorer(storePartition,
new CompositeRestoreListener(stateRestoreCallback),
checkpointedOffsets.get(storePartition),
@@ -227,7 +229,7 @@ public class ProcessorStateManager implements StateManager {
try {
restoreCallback.restoreAll(restoreRecords);
} catch (final Exception e) {
- throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", logPrefix, storePartition), e);
+ throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e);
}
}
@@ -238,7 +240,7 @@ public class ProcessorStateManager implements StateManager {
}
void putOffsetLimit(final TopicPartition partition, final long limit) {
- log.trace("{} Updating store offset limit for partition {} to {}", logPrefix, partition, limit);
+ log.trace("Updating store offset limit for partition {} to {}", partition, limit);
offsetLimits.put(partition, limit);
}
@@ -255,13 +257,13 @@ public class ProcessorStateManager implements StateManager {
@Override
public void flush() {
if (!stores.isEmpty()) {
- log.debug("{} Flushing all stores registered in the state manager", logPrefix);
+ log.debug("Flushing all stores registered in the state manager");
for (final StateStore store : stores.values()) {
try {
- log.trace("{} Flushing store={}", logPrefix, store.name());
+ log.trace("Flushing store={}", store.name());
store.flush();
} catch (final Exception e) {
- throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e);
+ throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e);
}
}
}
@@ -278,16 +280,16 @@ public class ProcessorStateManager implements StateManager {
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
if (!stores.isEmpty()) {
- log.debug("{} Closing its state manager and all the registered state stores", logPrefix);
+ log.debug("Closing its state manager and all the registered state stores");
for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
- log.debug("{} Closing storage engine {}", logPrefix, entry.getKey());
+ log.debug("Closing storage engine {}", entry.getKey());
try {
entry.getValue().close();
} catch (final Exception e) {
if (firstException == null) {
- firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", logPrefix, entry.getKey()), e);
+ firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e);
}
- log.error("{} Failed to close state store {}: ", logPrefix, entry.getKey(), e);
+ log.error("Failed to close state store {}: ", entry.getKey(), e);
}
}
@@ -305,7 +307,7 @@ public class ProcessorStateManager implements StateManager {
// write the checkpoint
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
- log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets);
+ log.trace("Writing checkpoint: {}", ackedOffsets);
checkpointedOffsets.putAll(changelogReader.restoredOffsets());
for (final Map.Entry<String, StateStore> entry : stores.entrySet()) {
final String storeName = entry.getKey();
@@ -339,7 +341,7 @@ public class ProcessorStateManager implements StateManager {
}
void registerGlobalStateStores(final List<StateStore> stateStores) {
- log.debug("{} Register global stores {}", logPrefix, stateStores);
+ log.debug("Register global stores {}", stateStores);
for (final StateStore stateStore : stateStores) {
globalStores.put(stateStore.name(), stateStore);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 79e3350..4eec2d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
@@ -40,18 +40,19 @@ public class RecordCollectorImpl implements RecordCollector {
private static final int MAX_SEND_ATTEMPTS = 3;
private static final long SEND_RETRY_BACKOFF = 100L;
- private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
-
+
+ private final Logger log;
private final Producer<byte[], byte[]> producer;
private final Map<TopicPartition, Long> offsets;
private final String logPrefix;
private volatile KafkaException sendException;
- public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) {
+ public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) {
this.producer = producer;
- offsets = new HashMap<>();
- logPrefix = String.format("task [%s]", streamTaskId);
+ this.offsets = new HashMap<>();
+ this.logPrefix = String.format("task [%s] ", streamTaskId);
+ this.log = logContext.logger(getClass());
}
@Override
@@ -107,14 +108,14 @@ public class RecordCollectorImpl implements RecordCollector {
offsets.put(tp, metadata.offset());
} else {
if (sendException == null) {
- log.error("{} Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
+ log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
"No more records will be sent and no more offsets will be recorded for this task.",
- logPrefix, key, value, timestamp, topic, exception);
+ key, value, timestamp, topic, exception);
if (exception instanceof ProducerFencedException) {
- sendException = new ProducerFencedException(String.format("%s Abort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
+ sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s",
logPrefix, key, value, timestamp, topic, exception.getMessage()));
} else {
- sendException = new StreamsException(String.format("%s Abort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
+ sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.",
logPrefix, key, value, timestamp, topic, exception), exception);
}
}
@@ -124,9 +125,9 @@ public class RecordCollectorImpl implements RecordCollector {
return;
} catch (final TimeoutException e) {
if (attempt == MAX_SEND_ATTEMPTS) {
- throw new StreamsException(String.format("%s Failed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
+ throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt));
}
- log.warn("{} Timeout exception caught when sending record to topic {}; retrying with {} attempt", logPrefix, topic, attempt);
+ log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt);
Utils.sleep(SEND_RETRY_BACKOFF);
}
}
@@ -140,14 +141,14 @@ public class RecordCollectorImpl implements RecordCollector {
@Override
public void flush() {
- log.debug("{} Flushing producer", logPrefix);
+ log.debug("Flushing producer");
producer.flush();
checkForException();
}
@Override
public void close() {
- log.debug("{} Closing producer", logPrefix);
+ log.debug("Closing producer");
producer.close();
checkForException();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 812a4ab..a9a03ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.Cancellable;
@@ -70,7 +71,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
final StreamsConfig config,
final ProcessorStateManager stateMgr,
final StreamsMetrics metrics) {
- super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics));
+ super(id, applicationId, config, metrics, stateMgr, new ThreadCache(new LogContext("zeroCache "), 0, metrics));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 75151a8..033af24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -22,8 +22,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -36,7 +34,6 @@ import java.util.Map;
*/
public class StandbyTask extends AbstractTask {
- private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
/**
@@ -73,7 +70,7 @@ public class StandbyTask extends AbstractTask {
*/
@Override
public void resume() {
- log.debug("{} Resuming", logPrefix);
+ log.debug("Resuming");
updateOffsetLimits();
}
@@ -86,7 +83,7 @@ public class StandbyTask extends AbstractTask {
*/
@Override
public void commit() {
- log.trace("{} Committing", logPrefix);
+ log.trace("Committing");
flushAndCheckpointState();
// reinitialize offset limits
updateOffsetLimits();
@@ -100,7 +97,7 @@ public class StandbyTask extends AbstractTask {
*/
@Override
public void suspend() {
- log.debug("{} Suspending", logPrefix);
+ log.debug("Suspending");
flushAndCheckpointState();
}
@@ -124,7 +121,7 @@ public class StandbyTask extends AbstractTask {
if (!taskInitialized) {
return;
}
- log.debug("{} Closing", logPrefix);
+ log.debug("Closing");
boolean committedSuccessfully = false;
try {
commit();
@@ -163,7 +160,7 @@ public class StandbyTask extends AbstractTask {
*/
public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition,
final List<ConsumerRecord<byte[], byte[]>> records) {
- log.trace("{} Updating standby replicas of its state store for partition [{}]", logPrefix, partition);
+ log.trace("Updating standby replicas of its state store for partition [{}]", partition);
return stateMgr.updateStandbyStates(partition, records);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 57dff64..8ecc7e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -22,10 +22,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@@ -38,9 +38,8 @@ import java.util.Map;
import java.util.Set;
public class StoreChangelogReader implements ChangelogReader {
- private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
- private final String logPrefix;
+ private final Logger log;
private final Consumer<byte[], byte[]> consumer;
private final StateRestoreListener stateRestoreListener;
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -51,16 +50,17 @@ public class StoreChangelogReader implements ChangelogReader {
public StoreChangelogReader(final String threadId,
final Consumer<byte[], byte[]> consumer,
- final StateRestoreListener stateRestoreListener) {
+ final StateRestoreListener stateRestoreListener,
+ final LogContext logContext) {
this.consumer = consumer;
-
- this.logPrefix = String.format("stream-thread [%s]", threadId);
+ this.log = logContext.logger(getClass());
this.stateRestoreListener = stateRestoreListener;
}
public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
- final StateRestoreListener stateRestoreListener) {
- this("", consumer, stateRestoreListener);
+ final StateRestoreListener stateRestoreListener,
+ final LogContext logContext) {
+ this("", consumer, stateRestoreListener, logContext);
}
@Override
@@ -116,7 +116,7 @@ public class StoreChangelogReader implements ChangelogReader {
endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
} catch (final TimeoutException e) {
// if timeout exception gets thrown we just give up this time and retry in the next run loop
- log.debug("{} Could not fetch end offset for {}; will fall back to partition by partition fetching", logPrefix, initializable);
+ log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
return;
}
@@ -140,7 +140,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
needsInitializing.remove(topicPartition);
} else {
- log.info("{} End offset cannot be found form the returned metadata; removing this partition from the current run loop", logPrefix);
+ log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop");
iter.remove();
}
}
@@ -152,7 +152,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) {
- log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet());
+ log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
assignment.addAll(initialized.keySet());
@@ -186,8 +186,7 @@ public class StoreChangelogReader implements ChangelogReader {
}
private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) {
- log.debug("{} Restoring partition {} from offset {} to endOffset {}",
- logPrefix,
+ log.debug("Restoring partition {} from offset {} to endOffset {}",
partition,
startingOffset,
endOffset);
@@ -196,7 +195,7 @@ public class StoreChangelogReader implements ChangelogReader {
private Collection<TopicPartition> completed() {
final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
completed.removeAll(needsRestoring.keySet());
- log.debug("{} completed partitions {}", logPrefix, completed);
+ log.debug("completed partitions {}", completed);
return completed;
}
@@ -204,7 +203,7 @@ public class StoreChangelogReader implements ChangelogReader {
try {
partitionInfo.putAll(consumer.listTopics());
} catch (final TimeoutException e) {
- log.debug("{} Could not fetch topic metadata within the timeout, will retry in the next run loop", logPrefix);
+ log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
}
}
@@ -244,8 +243,7 @@ public class StoreChangelogReader implements ChangelogReader {
pos));
}
- log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
- logPrefix,
+ log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
topicPartition,
restorer.restoredNumRecords(),
restorer.startingOffset(),
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index d479a72..34e9e8a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
@@ -35,7 +36,6 @@ import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssigno
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -55,11 +55,11 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager.
public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider {
- private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
private Time time = Time.SYSTEM;
private final static int UNKNOWN = -1;
public final static int NOT_AVAILABLE = -2;
+ private Logger log;
private String logPrefix;
private static class AssignedPartition implements Comparable<AssignedPartition> {
@@ -200,6 +200,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
public void configure(Map<String, ?> configs) {
numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+ //Initializing the logger without threadDataProvider name because provider name is not known/verified at this point
+ logPrefix = String.format("stream-thread ");
+ LogContext logContext = new LogContext(logPrefix);
+ this.log = logContext.logger(getClass());
+
Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
if (o == null) {
KafkaException ex = new KafkaException("StreamThread is not specified");
@@ -216,7 +221,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
threadDataProvider = (ThreadDataProvider) o;
threadDataProvider.setThreadMetadataProvider(this);
- logPrefix = String.format("stream-thread [%s]", threadDataProvider.name());
+ //Reassigning the logger with threadDataProvider name
+ logPrefix = String.format("stream-thread [%s] ", threadDataProvider.name());
+ logContext = new LogContext(logPrefix);
+ this.log = logContext.logger(getClass());
String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
@@ -273,7 +281,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
private void updateSubscribedTopics(Set<String> topics) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
- log.debug("{} found {} topics possibly matching regex", logPrefix, topics);
+ log.debug("found {} topics possibly matching regex", topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, threadDataProvider.name());
@@ -322,7 +330,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
clientMetadata.addConsumer(consumerId, info);
}
- log.debug("{} Constructed client metadata {} from the member subscriptions.", logPrefix, clientsMetadata);
+ log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata);
// ---------------- Step Zero ---------------- //
@@ -409,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
- log.debug("{} Created repartition topics {} from the parsed topology.", logPrefix, allRepartitionTopicPartitions.values());
+ log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values());
// ---------------- Step One ---------------- //
@@ -431,7 +439,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
- log.warn("{} Partition {} is assigned to more than one tasks: {}", logPrefix, partition, partitionsForTask);
+ log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
}
}
allAssignedPartitions.addAll(partitions);
@@ -450,11 +458,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
for (PartitionInfo partitionInfo : partitionInfoList) {
TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
- log.warn("{} Partition {} is not assigned to any tasks: {}", logPrefix, partition, partitionsForTask);
+ log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
}
}
} else {
- log.warn("{} No partitions found for topic {}", logPrefix, topic);
+ log.warn("No partitions found for topic {}", topic);
}
}
@@ -477,14 +485,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
} else {
- log.debug("{} No tasks found for topic group {}", logPrefix, topicGroupId);
+ log.debug("No tasks found for topic group {}", topicGroupId);
}
}
}
prepareTopic(changelogTopicMetadata);
- log.debug("{} Created state changelog topics {} from the parsed topology.", logPrefix, changelogTopicMetadata.values());
+ log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
// ---------------- Step Two ---------------- //
@@ -494,13 +502,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
states.put(entry.getKey(), entry.getValue().state);
}
- log.debug("{} Assigning tasks {} to clients {} with number of replicas {}",
+ log.debug("Assigning tasks {} to clients {} with number of replicas {}",
logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas);
final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet());
taskAssignor.assign(numStandbyReplicas);
- log.info("{} Assigned tasks to clients as {}.", logPrefix, states);
+ log.info("Assigned tasks to clients as {}.", states);
// ---------------- Step Three ---------------- //
@@ -591,7 +599,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks.size()) {
throw new TaskAssignmentException(
- String.format("%s Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+ String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
);
}
@@ -646,7 +654,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
*/
@SuppressWarnings("deprecation")
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
- log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
+ log.debug("Starting to validate internal topics in partition assignor.");
// first construct the topics to make ready
Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
@@ -660,7 +668,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
continue;
}
if (numPartitions < 0) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name()));
}
topicsToMakeReady.put(topic, numPartitions);
@@ -680,7 +688,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
}
}
- log.debug("{} Completed validating internal topics in partition assignor", logPrefix);
+ log.debug("Completed validating internal topics in partition assignor");
}
private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
@@ -785,7 +793,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
final Integer partitions = metadata.partitionCountForTopic(topic);
if (partitions == null) {
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic not found: %s", logPrefix, topic));
}
if (numPartitions == UNKNOWN) {
@@ -793,7 +801,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable,
} else if (numPartitions != partitions) {
final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
+ throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
}
} else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
numPartitions = NOT_AVAILABLE;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 288a597..084a991 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
@@ -37,8 +38,6 @@ import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
@@ -53,8 +52,6 @@ import static java.util.Collections.singleton;
*/
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator {
- private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
-
private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
private final PartitionGroup partitionGroup;
@@ -129,7 +126,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
consumedOffsets = new HashMap<>();
this.producer = producer;
- recordCollector = createRecordCollector();
+ recordCollector = createRecordCollector(logContext);
// initialize the topology with its own context
processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
@@ -162,7 +159,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*/
@Override
public void resume() {
- log.debug("{} Resuming", logPrefix);
+ log.debug("Resuming");
if (eosEnabled) {
producer.beginTransaction();
transactionInFlight = true;
@@ -190,12 +187,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
final ProcessorNode currNode = recordInfo.node();
final TopicPartition partition = recordInfo.partition();
- log.trace("{} Start processing one record [{}]", logPrefix, record);
+ log.trace("Start processing one record [{}]", record);
updateProcessorContext(record, currNode);
currNode.process(record.key(), record.value());
- log.trace("{} Completed processing one record [{}]", logPrefix, record);
+ log.trace("Completed processing one record [{}]", record);
// update the consumed offset map after processing is done
consumedOffsets.put(partition, record.offset());
@@ -227,19 +224,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
@Override
public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) {
if (processorContext.currentNode() != null) {
- throw new IllegalStateException(String.format("%s Current node is not null", logPrefix));
+ throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix));
}
updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
if (log.isTraceEnabled()) {
- log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type);
+ log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type);
}
try {
node.punctuate(timestamp, punctuator);
} catch (final KafkaException e) {
- throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix, node.name()), e);
+ throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e);
} finally {
processorContext.setCurrentNode(null);
}
@@ -264,7 +261,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// visible for testing
void commit(final boolean startNewTransaction) {
- log.debug("{} Committing", logPrefix);
+ log.debug("Committing");
metrics.metrics.measureLatencyNs(
time,
new Runnable() {
@@ -289,14 +286,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
@Override
protected void flushState() {
- log.trace("{} Flushing state and producer", logPrefix);
+ log.trace("Flushing state and producer");
super.flushState();
recordCollector.flush();
}
private void commitOffsets(final boolean startNewTransaction) {
if (commitOffsetNeeded) {
- log.trace("{} Committing offsets", logPrefix);
+ log.trace("Committing offsets");
final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
@@ -317,7 +314,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
try {
consumer.commitSync(consumedOffsetsAndMetadata);
} catch (final CommitFailedException e) {
- log.warn("{} Failed offset commits {} due to CommitFailedException", logPrefix, consumedOffsetsAndMetadata);
+ log.warn("Failed offset commits {} due to CommitFailedException", consumedOffsetsAndMetadata);
throw e;
}
}
@@ -330,7 +327,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
void initTopology() {
// initialize the task by initializing all its processor nodes in the topology
- log.trace("{} Initializing processor nodes of the topology", logPrefix);
+ log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode node : topology.processors()) {
processorContext.setCurrentNode(node);
try {
@@ -352,7 +349,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*/
@Override
public void suspend() {
- log.debug("{} Suspending", logPrefix);
+ log.debug("Suspending");
suspend(true);
}
@@ -374,7 +371,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
private void closeTopology() {
- log.trace("{} Closing processor topology", logPrefix);
+ log.trace("Closing processor topology");
partitionGroup.clear();
@@ -411,7 +408,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
if (firstException == null) {
firstException = e;
}
- log.error("{} Could not close state manager due to the following error:", logPrefix, e);
+ log.error("Could not close state manager due to the following error:", e);
}
try {
@@ -434,7 +431,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
recordCollector.close();
}
} catch (final Throwable e) {
- log.error("{} Failed to close producer due to the following error:", logPrefix, e);
+ log.error("Failed to close producer due to the following error:", e);
}
}
}
@@ -467,7 +464,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
@Override
public void close(boolean clean,
final boolean isZombie) {
- log.debug("{} Closing", logPrefix);
+ log.debug("Closing");
RuntimeException firstException = null;
try {
@@ -475,7 +472,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
} catch (final RuntimeException e) {
clean = false;
firstException = e;
- log.error("{} Could not close task due to the following error:", logPrefix, e);
+ log.error("Could not close task due to the following error:", e);
}
closeSuspended(clean, isZombie, firstException);
@@ -495,7 +492,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
if (log.isTraceEnabled()) {
- log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize);
+ log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
}
// if after adding these records, its partition queue's buffered size has been
@@ -516,7 +513,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
*/
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) {
if (processorContext.currentNode() == null) {
- throw new IllegalStateException(String.format("%s Current node is null", logPrefix));
+ throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix));
}
final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator);
@@ -596,12 +593,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
// visible for testing only
- RecordCollector createRecordCollector() {
- return new RecordCollectorImpl(producer, id.toString());
+ RecordCollector createRecordCollector(final LogContext logContext) {
+ return new RecordCollectorImpl(producer, id.toString(), logContext);
}
public boolean initialize() {
- log.debug("{} Initializing", logPrefix);
+ log.debug("Initializing");
initializeStateStores();
initTopology();
processorContext.initialized();