You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/07/04 14:05:42 UTC

kafka git commit: KAFKA-5469; Created state changelog topics not logged correctly

Repository: kafka
Updated Branches:
  refs/heads/trunk afb91a02a -> ef5867dce


KAFKA-5469; Created state changelog topics not logged correctly

Fixed debug logging for the created state changelog topics
Added toString() for InternalTopicMetadata and InternalTopicConfig for above debug logging

Author: ppatierno <pp...@live.com>

Reviewers: Damian Guy <da...@gmail.com>

Closes #3368 from ppatierno/kafka-5469


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef5867dc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef5867dc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef5867dc

Branch: refs/heads/trunk
Commit: ef5867dce3894aa57a8f815aa206693e2530b576
Parents: afb91a0
Author: ppatierno <pp...@live.com>
Authored: Tue Jul 4 15:05:36 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Tue Jul 4 15:05:36 2017 +0100

----------------------------------------------------------------------
 .../streams/processor/internals/InternalTopicConfig.java  | 10 ++++++++++
 .../processor/internals/StreamPartitionAssignor.java      | 10 +++++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5867dc/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index 1c8ca6c..7931f32 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -112,4 +112,14 @@ public class InternalTopicConfig {
     public int hashCode() {
         return Objects.hash(name, logConfig, retentionMs, cleanupPolicies);
     }
+
+    @Override
+    public String toString() {
+        return "InternalTopicConfig(" +
+                "name=" + name +
+                ", logConfig=" + logConfig +
+                ", cleanupPolicies=" + cleanupPolicies +
+                ", retentionMs=" + retentionMs +
+                ")";
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef5867dc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 0a1b2ab..375f350 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -145,6 +145,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.config = config;
             this.numPartitions = UNKNOWN;
         }
+
+        @Override
+        public String toString() {
+            return "InternalTopicMetadata(" +
+                    "config=" + config +
+                    ", numPartitions=" + numPartitions +
+                    ")";
+        }
     }
 
     private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@@ -474,7 +482,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         prepareTopic(changelogTopicMetadata);
 
-        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata);
+        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata.values());
 
         // ---------------- Step Two ---------------- //