You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/08 22:49:40 UTC
kafka git commit: MINOR: some trace logging for streams debugging
Repository: kafka
Updated Branches:
refs/heads/trunk 92287849e -> 11766ad31
MINOR: some trace logging for streams debugging
Author: Ubuntu <no...@confluent.io>
Reviewers: Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1882 from norwood/streams-logging
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/11766ad3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/11766ad3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/11766ad3
Branch: refs/heads/trunk
Commit: 11766ad31861e0350178f59127e471fde2b20970
Parents: 9228784
Author: Dan Norwood <no...@confluent.io>
Authored: Tue Nov 8 14:47:08 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 8 14:47:33 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/processor/internals/AbstractTask.java | 6 ++++++
.../streams/processor/internals/ProcessorStateManager.java | 1 +
.../kafka/streams/processor/internals/StreamTask.java | 2 +-
.../kafka/streams/processor/internals/StreamThread.java | 4 ++--
.../main/java/org/apache/kafka/streams/state/Stores.java | 9 ++++++++-
.../apache/kafka/streams/state/internals/RocksDBStore.java | 3 ---
.../apache/kafka/streams/state/internals/ThreadCache.java | 1 +
7 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0981291..127a64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
@@ -37,6 +39,8 @@ import java.util.Map;
import java.util.Set;
public abstract class AbstractTask {
+ private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
+
protected final TaskId id;
protected final String applicationId;
protected final ProcessorTopology topology;
@@ -78,6 +82,7 @@ public abstract class AbstractTask {
initializeOffsetLimits();
for (StateStore store : this.topology.stateStores()) {
+ log.trace("task [{}] Initializing store {}", id(), store.name());
store.init(this.processorContext, store);
}
}
@@ -119,6 +124,7 @@ public abstract class AbstractTask {
* @throws ProcessorStateException if there is an error while closing the state manager
*/
void closeStateManager() {
+ log.trace("task [{}] Closing", id());
try {
stateMgr.close(recordCollectorOffsets());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 52a47d3..795949f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -326,6 +326,7 @@ public class ProcessorStateManager {
context.setCurrentNode(processorNode);
}
try {
+ log.trace("{} Flushing store={}", logPrefix, store.name());
store.flush();
} catch (Exception e) {
throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e);
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 0733e5a..72b0b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -262,10 +262,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
*/
public void commit() {
log.debug("{} Committing its state", logPrefix);
-
// 1) flush local state
stateMgr.flush(processorContext);
+ log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
// 2) flush produced records in the downstream and change logs of local states
recordCollector.flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 05d7126..fa5b5f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -592,6 +592,7 @@ public class StreamThread extends Thread {
* Commit the states of all its tasks
*/
private void commitAll() {
+ log.trace("stream-thread [{}] Committing all its owned tasks", this.getName());
for (StreamTask task : activeTasks.values()) {
commitOne(task);
}
@@ -604,8 +605,7 @@ public class StreamThread extends Thread {
* Commit the state of a task
*/
private void commitOne(AbstractTask task) {
- log.info("{} Committing task {}", logPrefix, task.id());
-
+ log.info("{} Committing task {} {}", logPrefix, task.getClass().getSimpleName(), task.id());
try {
task.commit();
} catch (CommitFailedException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 107f950..922f4bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,13 +16,15 @@
*/
package org.apache.kafka.streams.state;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -33,6 +35,8 @@ import java.util.Map;
*/
public class Stores {
+ private static final Logger log = LoggerFactory.getLogger(Stores.class);
+
/**
* Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
*
@@ -83,6 +87,7 @@ public class Stores {
@Override
public StateStoreSupplier build() {
+ log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged);
if (capacity < Integer.MAX_VALUE) {
return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
}
@@ -134,6 +139,7 @@ public class Stores {
@Override
public StateStoreSupplier build() {
+ log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
if (numSegments > 0) {
return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled);
}
@@ -412,3 +418,4 @@ public class Stores {
}
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e27ffd8..b0f5ee4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -41,8 +41,6 @@ import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Comparator;
@@ -66,7 +64,6 @@ import java.util.Set;
*/
public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
- private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable
http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index f7355d8..57ebfc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -195,6 +195,7 @@ public class ThreadCache {
private void maybeEvict(final String namespace) {
while (sizeBytes() > maxCacheSizeBytes) {
final NamedCache cache = getOrCreateCache(namespace);
+ log.trace("Thread {} evicting cache {}", name, namespace);
cache.evict();
numEvicts++;