You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/22 21:33:37 UTC
kafka git commit: MINOR: Improvements on Streams log4j
Repository: kafka
Updated Branches:
refs/heads/trunk 763ea5aad -> 57278aa82
MINOR: Improvements on Streams log4j
1. add thread id as prefix in state directory classes; also added logs for lock activities.
2. add logging for task creation / suspension.
3. add more information in rebalance listener logging.
4. add restored number of records into changlog reader.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Eno Thereska, Damian Guy, Ewen Cheslack-Postava
Closes #2702 from guozhangwang/KMinor-streams-task-creation-log4j-improvements
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57278aa8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57278aa8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57278aa8
Branch: refs/heads/trunk
Commit: 57278aa82da5dc9d040eb3dcf4a182e0731a621b
Parents: 763ea5a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Mar 22 14:33:34 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 22 14:33:34 2017 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 7 +-
.../processor/internals/GlobalStreamThread.java | 11 +-
.../internals/ProcessorStateManager.java | 2 +
.../processor/internals/StateDirectory.java | 53 ++++++----
.../processor/internals/StateRestorer.java | 25 ++++-
.../internals/StoreChangelogReader.java | 39 +++++--
.../streams/processor/internals/StreamTask.java | 4 +-
.../processor/internals/StreamThread.java | 101 ++++++++++++-------
.../streams/state/internals/ThreadCache.java | 2 +-
.../internals/GlobalStreamThreadTest.java | 4 +-
.../processor/internals/StateRestorerTest.java | 15 +++
11 files changed, 179 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b23d244..2c116d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -344,13 +344,14 @@ public class KafkaStreams {
if (globalTaskTopology != null) {
+ final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(clientId + "-global")),
- new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
+ new StateDirectory(applicationId, globalThreadId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time),
metrics,
time,
- clientId);
+ globalThreadId);
}
for (int i = 0; i < threads.length; i++) {
@@ -568,7 +569,7 @@ public class KafkaStreams {
localApplicationDir,
appId);
- final StateDirectory stateDirectory = new StateDirectory(appId, stateDir, Time.SYSTEM);
+ final StateDirectory stateDirectory = new StateDirectory(appId, "cleanup", stateDir, Time.SYSTEM);
stateDirectory.cleanRemovedTasks(0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 8745655..36a248e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -41,6 +41,7 @@ import java.util.Map;
public class GlobalStreamThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class);
+
private final StreamsConfig config;
private final Consumer<byte[], byte[]> consumer;
private final StateDirectory stateDirectory;
@@ -57,17 +58,15 @@ public class GlobalStreamThread extends Thread {
final StateDirectory stateDirectory,
final Metrics metrics,
final Time time,
- final String clientId
- ) {
- super("GlobalStreamThread");
- this.topology = topology;
+ final String threadClientId) {
+ super(threadClientId);
+ this.time = time;
this.config = config;
+ this.topology = topology;
this.consumer = globalConsumer;
this.stateDirectory = stateDirectory;
- this.time = time;
long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) /
(config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1));
- final String threadClientId = clientId + "-" + getName();
this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 840b419..0e48ddd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -100,6 +100,8 @@ public class ProcessorStateManager implements StateManager {
// load the checkpoint information
checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
this.checkpointedOffsets = new HashMap<>(checkpoint.read());
+
+ log.info("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 771bb61..85908e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -45,6 +45,7 @@ public class StateDirectory {
private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
private final File stateDir;
+ private final String logPrefix;
private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
private final HashMap<TaskId, FileLock> locks = new HashMap<>();
private final Time time;
@@ -53,7 +54,12 @@ public class StateDirectory {
private FileLock globalStateLock;
public StateDirectory(final String applicationId, final String stateDirConfig, final Time time) {
+ this(applicationId, "", stateDirConfig, time);
+ }
+
+ public StateDirectory(final String applicationId, final String threadId, final String stateDirConfig, final Time time) {
this.time = time;
+ this.logPrefix = String.format("stream-thread [%s]", threadId);
final File baseDir = new File(stateDirConfig);
if (!baseDir.exists() && !baseDir.mkdirs()) {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
@@ -64,7 +70,6 @@ public class StateDirectory {
throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created",
stateDir.getPath()));
}
-
}
/**
@@ -100,6 +105,7 @@ public class StateDirectory {
boolean lock(final TaskId taskId, int retry) throws IOException {
// we already have the lock so bail out here
if (locks.containsKey(taskId)) {
+ log.trace("{} Found cached state dir lock for task {}", logPrefix, taskId);
return true;
}
final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
@@ -118,12 +124,15 @@ public class StateDirectory {
final FileLock lock = tryLock(retry, channel);
if (lock != null) {
locks.put(taskId, lock);
+
+ log.debug("{} Acquired state dir lock for task {}", logPrefix, taskId);
}
return lock != null;
}
boolean lockGlobalState(final int retry) throws IOException {
if (globalStateLock != null) {
+ log.trace("{} Found cached state dir lock for the global task", logPrefix);
return true;
}
@@ -144,6 +153,9 @@ public class StateDirectory {
}
globalStateChannel = channel;
globalStateLock = fileLock;
+
+ log.debug("{} Acquired global state dir lock", logPrefix);
+
return true;
}
@@ -155,24 +167,10 @@ public class StateDirectory {
globalStateChannel.close();
globalStateLock = null;
globalStateChannel = null;
- }
- private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
- FileLock lock = tryAcquireLock(channel);
- while (lock == null && retry > 0) {
- try {
- Thread.sleep(200);
- } catch (Exception ex) {
- // do nothing
- }
- retry--;
- lock = tryAcquireLock(channel);
- }
- return lock;
+ log.debug("{} Released global state dir lock", logPrefix);
}
-
-
/**
* Unlock the state directory for the given {@link TaskId}
* @param taskId
@@ -182,6 +180,9 @@ public class StateDirectory {
final FileLock lock = locks.remove(taskId);
if (lock != null) {
lock.release();
+
+ log.debug("{} Released state dir lock for task {}", logPrefix, taskId);
+
final FileChannel fileChannel = channels.remove(taskId);
if (fileChannel != null) {
fileChannel.close();
@@ -209,19 +210,19 @@ public class StateDirectory {
try {
if (lock(id, 0)) {
if (time.milliseconds() > taskDir.lastModified() + cleanupDelayMs) {
- log.info("Deleting obsolete state directory {} for task {} as cleanup delay of {} ms has passed", dirName, id, cleanupDelayMs);
+ log.info("{} Deleting obsolete state directory {} for task {} as cleanup delay of {} ms has passed", logPrefix, dirName, id, cleanupDelayMs);
Utils.delete(taskDir);
}
}
} catch (OverlappingFileLockException e) {
// locked by another thread
} catch (IOException e) {
- log.error("Failed to lock the state directory due to an unexpected exception", e);
+ log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix, e);
} finally {
try {
unlock(id);
} catch (IOException e) {
- log.error("Failed to release the state directory lock");
+ log.error("{} Failed to release the state directory lock", logPrefix);
}
}
}
@@ -243,6 +244,20 @@ public class StateDirectory {
});
}
+ private FileLock tryLock(int retry, final FileChannel channel) throws IOException {
+ FileLock lock = tryAcquireLock(channel);
+ while (lock == null && retry > 0) {
+ try {
+ Thread.sleep(200);
+ } catch (Exception ex) {
+ // do nothing
+ }
+ retry--;
+ lock = tryAcquireLock(channel);
+ }
+ return lock;
+ }
+
private FileChannel getOrCreateFileChannel(final TaskId taskId, final Path lockPath) throws IOException {
if (!channels.containsKey(taskId)) {
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 4edd71c..79bfd1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -21,12 +21,15 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
public class StateRestorer {
static final int NO_CHECKPOINT = -1;
- private final TopicPartition partition;
- private final StateRestoreCallback stateRestoreCallback;
+
private final Long checkpoint;
private final long offsetLimit;
private final boolean persistent;
+ private final TopicPartition partition;
+ private final StateRestoreCallback stateRestoreCallback;
+
private long restoredOffset;
+ private long startingOffset;
StateRestorer(final TopicPartition partition,
final StateRestoreCallback stateRestoreCallback,
@@ -44,15 +47,15 @@ public class StateRestorer {
return partition;
}
- public long checkpoint() {
+ long checkpoint() {
return checkpoint == null ? NO_CHECKPOINT : checkpoint;
}
- public void restore(final byte[] key, final byte[] value) {
+ void restore(final byte[] key, final byte[] value) {
stateRestoreCallback.restore(key, value);
}
- public boolean isPersistent() {
+ boolean isPersistent() {
return persistent;
}
@@ -60,6 +63,14 @@ public class StateRestorer {
this.restoredOffset = Math.min(offsetLimit, restoredOffset);
}
+ void setStartingOffset(final long startingOffset) {
+ this.startingOffset = Math.min(offsetLimit, startingOffset);
+ }
+
+ long startingOffset() {
+ return startingOffset;
+ }
+
boolean hasCompleted(final long recordOffset, final long endOffset) {
return endOffset == 0 || recordOffset >= readTo(endOffset);
}
@@ -68,6 +79,10 @@ public class StateRestorer {
return restoredOffset;
}
+ long restoredNumRecords() {
+ return restoredOffset - startingOffset;
+ }
+
long offsetLimit() {
return offsetLimit;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 0afd6c9..8639382 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -39,18 +39,23 @@ public class StoreChangelogReader implements ChangelogReader {
private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
private final Consumer<byte[], byte[]> consumer;
+ private final String logPrefix;
private final Time time;
private final long partitionValidationTimeoutMs;
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
-
- public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
- this.consumer = consumer;
+ public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
this.time = time;
+ this.consumer = consumer;
this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
+
+ this.logPrefix = String.format("stream-thread [%s]", threadId);
}
+ public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time time, final long partitionValidationTimeoutMs) {
+ this("", consumer, time, partitionValidationTimeoutMs);
+ }
@Override
public void validatePartitionExists(final TopicPartition topicPartition, final String storeName) {
@@ -60,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader {
try {
partitionInfo.putAll(consumer.listTopics());
} catch (final TimeoutException e) {
- log.warn("Could not list topics so will fall back to partition by partition fetching");
+ log.warn("{} Could not list topics so will fall back to partition by partition fetching", logPrefix);
}
}
@@ -81,7 +86,7 @@ public class StoreChangelogReader implements ChangelogReader {
throw new StreamsException(String.format("Store %s's change log (%s) does not contain partition %s",
storeName, topicPartition.topic(), topicPartition.partition()));
}
- log.debug("Took {} ms to validate that partition {} exists", time.milliseconds() - start, topicPartition);
+ log.debug("{} Took {} ms to validate that partition {} exists", logPrefix, time.milliseconds() - start, topicPartition);
}
@Override
@@ -99,7 +104,6 @@ public class StoreChangelogReader implements ChangelogReader {
}
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(stateRestorers.keySet());
-
// remove any partitions where we already have all of the data
final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
@@ -113,6 +117,8 @@ public class StoreChangelogReader implements ChangelogReader {
}
}
+ log.info("{} Starting restoring state stores from changelog topics {}", logPrefix, needsRestoring.keySet());
+
consumer.assign(needsRestoring.keySet());
for (final StateRestorer restorer : needsRestoring.values()) {
@@ -127,6 +133,11 @@ public class StoreChangelogReader implements ChangelogReader {
consumer.position(restorer.partition()),
endOffsets.get(restorer.partition()));
}
+ // TODO: each consumer.position() call after seekToBeginning will cause
+ // a blocking round trip to reset the position for that partition; we should
+ // batch them into a single round trip to reset for all necessary partitions
+
+ restorer.setStartingOffset(consumer.position(restorer.partition()));
}
final Set<TopicPartition> partitions = new HashSet<>(needsRestoring.keySet());
@@ -139,12 +150,13 @@ public class StoreChangelogReader implements ChangelogReader {
}
} finally {
consumer.assign(Collections.<TopicPartition>emptyList());
- log.debug("Took {} ms to restore active state", time.milliseconds() - start);
+ log.debug("{} Took {} ms to restore all active states", logPrefix, time.milliseconds() - start);
}
}
private void logRestoreOffsets(final TopicPartition partition, final long checkpoint, final Long aLong) {
- log.debug("restoring partition {} from offset {} to endOffset {}",
+ log.debug("{} Restoring partition {} from offset {} to endOffset {}",
+ logPrefix,
partition,
checkpoint,
aLong);
@@ -177,7 +189,16 @@ public class StoreChangelogReader implements ChangelogReader {
endOffset,
pos));
}
+
restorer.setRestoredOffset(pos);
+
+ log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}",
+ logPrefix,
+ topicPartition,
+ restorer.restoredNumRecords(),
+ restorer.startingOffset(),
+ restorer.restoredOffset());
+
partitionIterator.remove();
}
}
@@ -209,6 +230,4 @@ public class StoreChangelogReader implements ChangelogReader {
return false;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 609878a..7bd4be4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -68,7 +68,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
private Runnable commitDelegate = new Runnable() {
@Override
public void run() {
- log.debug("{} Committing its state", logPrefix);
// 1) flush local state
stateMgr.flush(processorContext);
@@ -89,7 +88,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param consumer the instance of {@link Consumer}
- * @param restoreConsumer the instance of {@link Consumer} used when restoring state
+ * @param changelogReader the instance of {@link ChangelogReader} used for restoring state
* @param config the {@link StreamsConfig} specified by the user
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
@@ -126,7 +125,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
this.logPrefix = String.format("task [%s]", id);
-
this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
// initialize the consumed offset cache
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 7cd4b93..61d7d72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -259,7 +259,6 @@ public class StreamThread extends Thread {
}
this.cache = new ThreadCache(threadClientId, cacheSizeBytes, this.streamsMetrics);
-
this.logPrefix = String.format("stream-thread [%s]", threadClientId);
// set the producer and consumer clients
@@ -271,7 +270,7 @@ public class StreamThread extends Thread {
if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) {
originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
- log.info("{} custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset);
+ log.info("{} Custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
}
@@ -292,7 +291,7 @@ public class StreamThread extends Thread {
// standby ktables
this.standbyRecords = new HashMap<>();
- this.stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
+ this.stateDirectory = new StateDirectory(applicationId, threadClientId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time);
final Object maxPollInterval = consumerConfigs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
this.rebalanceTimeoutMs = (Integer) ConfigDef.parseType(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval, Type.INT);
this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
@@ -693,7 +692,8 @@ public class StreamThread extends Thread {
protected void maybeCommit(final long now) {
if (commitTimeMs >= 0 && lastCommitMs + commitTimeMs < now) {
- log.info("{} Committing all tasks because the commit interval {}ms has elapsed", logPrefix, commitTimeMs);
+ log.info("{} Committing all active tasks {} and standby tasks {} because the commit interval {}ms has elapsed",
+ logPrefix, commitTimeMs, activeTasks, standbyTasks);
commitAll();
lastCommitMs = now;
@@ -717,7 +717,6 @@ public class StreamThread extends Thread {
* Commit the states of all its tasks
*/
private void commitAll() {
- log.trace("{} Committing all its owned tasks", logPrefix);
for (StreamTask task : activeTasks.values()) {
commitOne(task);
}
@@ -782,22 +781,6 @@ public class StreamThread extends Thread {
return tasks;
}
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
- log.info("{} Creating active task {} with assigned partitions {}", logPrefix, id, partitions);
-
- streamsMetrics.taskCreatedSensor.record();
-
- final ProcessorTopology topology = builder.build(id.topicGroupId);
- final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
- final long start = time.milliseconds();
- try {
- return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector);
- } finally {
- log.debug("{} creation of active task {} took {} ms", logPrefix, id, time.milliseconds() - start);
- }
- }
-
-
private StreamTask findMatchingSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) {
if (suspendedTasks.containsKey(taskId)) {
final StreamTask task = suspendedTasks.get(taskId);
@@ -826,7 +809,7 @@ public class StreamThread extends Thread {
final StreamTask task = next.getValue();
final Set<TopicPartition> assignedPartitionsForTask = newTaskAssignment.get(next.getKey());
if (!task.partitions().equals(assignedPartitionsForTask)) {
- log.debug("{} closing suspended non-assigned task", logPrefix);
+ log.debug("{} Closing suspended non-assigned active task {}", logPrefix, task.id);
try {
task.close();
task.closeStateManager(true);
@@ -846,13 +829,13 @@ public class StreamThread extends Thread {
while (standByTaskIterator.hasNext()) {
final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
- log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, suspendedTask.getKey());
final StandbyTask task = suspendedTask.getValue();
+ log.debug("{} Closing suspended non-assigned standby task {}", logPrefix, task.id);
try {
task.close();
task.closeStateManager(true);
} catch (Exception e) {
- log.error("{} Failed to remove suspended task standby {}", logPrefix, suspendedTask.getKey(), e);
+ log.error("{} Failed to remove suspended standby task {}", logPrefix, task.id, e);
} finally {
standByTaskIterator.remove();
}
@@ -860,6 +843,20 @@ public class StreamThread extends Thread {
}
}
+ protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
+ log.debug("{} Creating new active task {} with assigned partitions {}", logPrefix, id, partitions);
+
+ streamsMetrics.taskCreatedSensor.record();
+
+ final ProcessorTopology topology = builder.build(id.topicGroupId);
+ final RecordCollector recordCollector = new RecordCollectorImpl(producer, id.toString());
+ try {
+ return new StreamTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory, cache, time, recordCollector);
+ } finally {
+ log.info("{} Created active task {} with assigned partitions {}", logPrefix, id, partitions);
+ }
+ }
+
private void addStreamTasks(Collection<TopicPartition> assignment, final long start) {
if (partitionAssignor == null)
throw new IllegalStateException(logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
@@ -898,19 +895,27 @@ public class StreamThread extends Thread {
// create all newly assigned tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedTasks(); eventually
+ log.debug("{} new active tasks to be created: {}", logPrefix, newTasks);
+
taskCreator.retryWithBackoff(newTasks, start);
}
- StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
- log.info("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions);
+ private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
+ log.debug("{} Creating new standby task {} with assigned partitions {}", logPrefix, id, partitions);
streamsMetrics.taskCreatedSensor.record();
ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStores().isEmpty()) {
- return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+ try {
+ return new StandbyTask(id, applicationId, partitions, topology, consumer, storeChangelogReader, config, streamsMetrics, stateDirectory);
+ } finally {
+ log.info("{} Created standby task {} with assigned partitions {}", logPrefix, id, partitions);
+ }
} else {
+ log.info("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", logPrefix, id, partitions);
+
return null;
}
}
@@ -942,6 +947,8 @@ public class StreamThread extends Thread {
// create all newly assigned standby tasks (guard against race condition with other thread via backoff and retry)
// -> other thread will call removeSuspendedStandbyTasks(); eventually
+ log.debug("{} new standby tasks to be created: {}", logPrefix, newStandbyTasks);
+
new StandbyTaskCreator(checkpointedOffsets).retryWithBackoff(newStandbyTasks, start);
restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
@@ -1006,7 +1013,7 @@ public class StreamThread extends Thread {
return performOnAllTasks(new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
- log.info("{} Closing a task {}", StreamThread.this.logPrefix, task.id());
+ log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id());
task.close();
streamsMetrics.tasksClosedSensor.record();
}
@@ -1017,7 +1024,7 @@ public class StreamThread extends Thread {
return performOnAllTasks(new AbstractTaskAction() {
@Override
public void apply(final AbstractTask task) {
- log.info("{} Closing a task's topology {}", StreamThread.this.logPrefix, task.id());
+ log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id());
task.closeTopology();
streamsMetrics.tasksClosedSensor.record();
}
@@ -1168,7 +1175,6 @@ public class StreamThread extends Thread {
class TaskCreator extends AbstractTaskCreator {
void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
- log.debug("{} creating new task {}", logPrefix, taskId);
final StreamTask task = createStreamTask(taskId, partitions);
activeTasks.put(taskId, task);
@@ -1187,7 +1193,6 @@ public class StreamThread extends Thread {
}
void createTask(final TaskId taskId, final Set<TopicPartition> partitions) {
- log.debug("{} creating new standby task {}", logPrefix, taskId);
final StandbyTask task = createStandbyTask(taskId, partitions);
updateStandByTaskMaps(checkpointedOffsets, taskId, partitions, task);
}
@@ -1204,10 +1209,19 @@ public class StreamThread extends Thread {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
+ log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n" +
+ "\tassigned active tasks: {}\n" +
+ "\tassigned standby tasks: {}\n" +
+ "\tcurrent suspended active tasks: {}\n" +
+ "\tcurrent suspended standby tasks: {}\n" +
+ "\tprevious active tasks: {}",
+ logPrefix, state, assignment,
+ partitionAssignor.activeTasks(), partitionAssignor.standbyTasks(),
+ suspendedTasks, suspendedStandbyTasks, prevActiveTasks);
+
final long start = time.milliseconds();
try {
- log.info("{} at state {}: new partitions {} assigned at the end of consumer rebalance.", logPrefix, state, assignment);
- storeChangelogReader = new StoreChangelogReader(restoreConsumer, time, requestTimeOut);
+ storeChangelogReader = new StoreChangelogReader(getName(), restoreConsumer, time, requestTimeOut);
setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
// do this first as we may have suspended standby tasks that
// will become active or vice versa
@@ -1223,14 +1237,24 @@ public class StreamThread extends Thread {
rebalanceException = t;
throw t;
} finally {
- log.debug("{} partition assignment took {} ms", logPrefix, time.milliseconds() - start);
+ log.info("{} partition assignment took {} ms.\n" +
+ "\tcurrent active tasks: {}\n" +
+ "\tcurrent standby tasks: {}",
+ logPrefix, time.milliseconds() - start,
+ activeTasks, standbyTasks);
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
+ log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n" +
+ "\tcurrent assigned active tasks: {}\n" +
+ "\tcurrent assigned standby tasks: {}\n",
+ logPrefix, state, assignment,
+ activeTasks, standbyTasks);
+
+ final long start = time.milliseconds();
try {
- log.info("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.", logPrefix, state, assignment);
setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
lastCleanMs = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
// suspend active tasks
@@ -1242,6 +1266,13 @@ public class StreamThread extends Thread {
streamsMetadataState.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), partitionAssignor.clusterMetadata());
removeStreamTasks();
removeStandbyTasks();
+
+ log.info("{} partition revocation took {} ms.\n" +
+ "\tsuspended active tasks: {}\n" +
+ "\tsuspended standby tasks: {}\n" +
+ "\tprevious active tasks: {}\n",
+ logPrefix, time.milliseconds() - start,
+ suspendedTasks, suspendedStandbyTasks, prevActiveTasks);
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 5281814..45a6488 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -96,7 +96,7 @@ public class ThreadCache {
}
cache.flush();
- log.debug("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
+ log.trace("Thread {} cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}",
name, puts(), gets(), evicts(), flushes());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 97372b8..30582ed 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -62,7 +62,7 @@ public class GlobalStreamThreadTest {
new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
new Metrics(),
new MockTime(),
- "client");
+ "clientId");
}
@Test
@@ -93,7 +93,7 @@ public class GlobalStreamThreadTest {
new StateDirectory("appId", TestUtils.tempDirectory().getPath(), time),
new Metrics(),
new MockTime(),
- "client");
+ "clientId");
try {
globalStreamThread.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/57278aa8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index ead19c7..a847a94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -65,5 +65,20 @@ public class StateRestorerTest {
assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT));
}
+ @Test
+ public void shouldSetStartingOffsetToMinOfLimitAndOffset() throws Exception {
+ restorer.setStartingOffset(20);
+ assertThat(restorer.startingOffset(), equalTo(20L));
+ restorer.setRestoredOffset(100);
+ assertThat(restorer.restoredOffset(), equalTo(OFFSET_LIMIT));
+ }
+ @Test
+ public void shouldReturnCorrectNumRestoredRecords() throws Exception {
+ restorer.setStartingOffset(20);
+ restorer.setRestoredOffset(40);
+ assertThat(restorer.restoredNumRecords(), equalTo(20L));
+ restorer.setRestoredOffset(100);
+ assertThat(restorer.restoredNumRecords(), equalTo(OFFSET_LIMIT - 20));
+ }
}
\ No newline at end of file