You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/11/08 22:49:40 UTC

kafka git commit: MINOR: some trace logging for streams debugging

Repository: kafka
Updated Branches:
  refs/heads/trunk 92287849e -> 11766ad31


MINOR: some trace logging for streams debugging

Author: Ubuntu <no...@confluent.io>

Reviewers: Eno Thereska <en...@gmail.com>, Guozhang Wang <wa...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1882 from norwood/streams-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/11766ad3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/11766ad3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/11766ad3

Branch: refs/heads/trunk
Commit: 11766ad31861e0350178f59127e471fde2b20970
Parents: 9228784
Author: Dan Norwood <no...@confluent.io>
Authored: Tue Nov 8 14:47:08 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Nov 8 14:47:33 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/AbstractTask.java     | 6 ++++++
 .../streams/processor/internals/ProcessorStateManager.java  | 1 +
 .../kafka/streams/processor/internals/StreamTask.java       | 2 +-
 .../kafka/streams/processor/internals/StreamThread.java     | 4 ++--
 .../main/java/org/apache/kafka/streams/state/Stores.java    | 9 ++++++++-
 .../apache/kafka/streams/state/internals/RocksDBStore.java  | 3 ---
 .../apache/kafka/streams/state/internals/ThreadCache.java   | 1 +
 7 files changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0981291..127a64e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -28,6 +28,8 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -37,6 +39,8 @@ import java.util.Map;
 import java.util.Set;
 
 public abstract class AbstractTask {
+    private static final Logger log = LoggerFactory.getLogger(AbstractTask.class);
+
     protected final TaskId id;
     protected final String applicationId;
     protected final ProcessorTopology topology;
@@ -78,6 +82,7 @@ public abstract class AbstractTask {
         initializeOffsetLimits();
 
         for (StateStore store : this.topology.stateStores()) {
+            log.trace("task [{}] Initializing store {}", id(), store.name());
             store.init(this.processorContext, store);
         }
     }
@@ -119,6 +124,7 @@ public abstract class AbstractTask {
      * @throws ProcessorStateException if there is an error while closing the state manager
      */
     void closeStateManager() {
+        log.trace("task [{}] Closing", id());
         try {
             stateMgr.close(recordCollectorOffsets());
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 52a47d3..795949f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -326,6 +326,7 @@ public class ProcessorStateManager {
                     context.setCurrentNode(processorNode);
                 }
                 try {
+                    log.trace("{} Flushing store={}", logPrefix, store.name());
                     store.flush();
                 } catch (Exception e) {
                     throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 0733e5a..72b0b99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -262,10 +262,10 @@ public class StreamTask extends AbstractTask implements Punctuator {
      */
     public void commit() {
         log.debug("{} Committing its state", logPrefix);
-
         // 1) flush local state
         stateMgr.flush(processorContext);
 
+        log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
         // 2) flush produced records in the downstream and change logs of local states
         recordCollector.flush();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 05d7126..fa5b5f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -592,6 +592,7 @@ public class StreamThread extends Thread {
      * Commit the states of all its tasks
      */
     private void commitAll() {
+        log.trace("stream-thread [{}] Committing all its owned tasks", this.getName());
         for (StreamTask task : activeTasks.values()) {
             commitOne(task);
         }
@@ -604,8 +605,7 @@ public class StreamThread extends Thread {
      * Commit the state of a task
      */
     private void commitOne(AbstractTask task) {
-        log.info("{} Committing task {}", logPrefix, task.id());
-
+        log.info("{} Committing task {} {}", logPrefix, task.getClass().getSimpleName(), task.id());
         try {
             task.commit();
         } catch (CommitFailedException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 107f950..922f4bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,13 +16,15 @@
  */
 package org.apache.kafka.streams.state;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -33,6 +35,8 @@ import java.util.Map;
  */
 public class Stores {
 
+    private static final Logger log = LoggerFactory.getLogger(Stores.class);
+
     /**
      * Begin to create a new {@link org.apache.kafka.streams.processor.StateStoreSupplier} instance.
      *
@@ -83,6 +87,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
+                                        log.trace("Creating InMemory Store name={} capacity={} logged={}", name, capacity, logged);
                                         if (capacity < Integer.MAX_VALUE) {
                                             return new InMemoryLRUCacheStoreSupplier<>(name, capacity, keySerde, valueSerde, logged, logConfig);
                                         }
@@ -134,6 +139,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
+                                        log.trace("Creating RocksDb Store name={} numSegments={} logged={}", name, numSegments, logged);
                                         if (numSegments > 0) {
                                             return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, windowSize, logged, logConfig, cachingEnabled);
                                         }
@@ -412,3 +418,4 @@ public class Stores {
 
     }
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e27ffd8..b0f5ee4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -41,8 +41,6 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.Comparator;
@@ -66,7 +64,6 @@ import java.util.Set;
  */
 public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
-    private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
     private static final int TTL_NOT_USED = -1;
 
     // TODO: these values should be configurable

http://git-wip-us.apache.org/repos/asf/kafka/blob/11766ad3/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index f7355d8..57ebfc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -195,6 +195,7 @@ public class ThreadCache {
     private void maybeEvict(final String namespace) {
         while (sizeBytes() > maxCacheSizeBytes) {
             final NamedCache cache = getOrCreateCache(namespace);
+            log.trace("Thread {} evicting cache {}", name, namespace);
             cache.evict();
 
             numEvicts++;