You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/06 05:20:28 UTC

kafka git commit: KAFKA-5557: Using a logPrefix inside the StreamPartitionAssignor

Repository: kafka
Updated Branches:
  refs/heads/trunk 70e949d52 -> e68a94056


KAFKA-5557: Using a logPrefix inside the StreamPartitionAssignor

Added logPrefix for avoiding stream thread name formatting replicated more times

Author: ppatierno <pp...@live.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3488 from ppatierno/kafka-5557


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e68a9405
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e68a9405
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e68a9405

Branch: refs/heads/trunk
Commit: e68a94056fa7eb5b8d69815bcfa522247568d3e9
Parents: 70e949d
Author: Paolo Patierno <pp...@live.com>
Authored: Wed Jul 5 22:20:24 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 5 22:20:24 2017 -0700

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 53 +++++++++++---------
 1 file changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e68a9405/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 375f350..4eadb99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -62,6 +62,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     private final static int UNKNOWN = -1;
     public final static int NOT_AVAILABLE = -2;
 
+    private String logPrefix;
+
     private static class AssignedPartition implements Comparable<AssignedPartition> {
         public final TaskId taskId;
         public final TopicPartition partition;
@@ -216,6 +218,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.setPartitionAssignor(this);
 
+        logPrefix = String.format("stream-thread [%s]", streamThread.getName());
+
         String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (userEndPoint != null && !userEndPoint.isEmpty()) {
             try {
@@ -223,12 +227,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 Integer port = getPort(userEndPoint);
 
                 if (host == null || port == null)
-                    throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+                    throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
                                     " but received %s",
-                            streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+                            logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
             } catch (NumberFormatException nfe) {
-                throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
-                        streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+                throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
+                        logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
             }
 
             this.userEndPoint = userEndPoint;
@@ -271,8 +275,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     private void updateSubscribedTopics(Set<String> topics) {
         SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
-        log.debug("stream-thread [{}] found {} topics possibly matching regex",
-                  streamThread.getName(), topics);
+        log.debug("{} found {} topics possibly matching regex", logPrefix, topics);
         // update the topic groups with the returned subscription set for regex pattern subscriptions
         subscriptionUpdates.updateTopics(topics);
         streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
@@ -321,7 +324,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             clientMetadata.addConsumer(consumerId, info);
         }
 
-        log.debug("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
+        log.debug("{} Constructed client metadata {} from the member subscriptions.", logPrefix, clientsMetadata);
 
         // ---------------- Step Zero ---------------- //
 
@@ -407,7 +410,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
         metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
 
-        log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+        log.debug("{} Created repartition topics {} from the parsed topology.", logPrefix, allRepartitionTopicPartitions.values());
 
         // ---------------- Step One ---------------- //
 
@@ -429,7 +432,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             Set<TopicPartition> partitions = entry.getValue();
             for (TopicPartition partition : partitions) {
                 if (allAssignedPartitions.contains(partition)) {
-                    log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
+                    log.warn("{} Partition {} is assigned to more than one tasks: {}", logPrefix, partition, partitionsForTask);
                 }
             }
             allAssignedPartitions.addAll(partitions);
@@ -448,11 +451,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 for (PartitionInfo partitionInfo : partitionInfoList) {
                     TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {
-                        log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+                        log.warn("{} Partition {} is not assigned to any tasks: {}", logPrefix, partition, partitionsForTask);
                     }
                 }
             } else {
-                log.warn("stream-thread [{}] No partitions found for topic {}", streamThread.getName(), topic);
+                log.warn("{} No partitions found for topic {}", logPrefix, topic);
             }
         }
 
@@ -475,14 +478,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                     changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
                 } else {
-                    log.debug("stream-thread [{}] No tasks found for topic group {}", streamThread.getName(), topicGroupId);
+                    log.debug("{} No tasks found for topic group {}", logPrefix, topicGroupId);
                 }
             }
         }
 
         prepareTopic(changelogTopicMetadata);
 
-        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata.values());
+        log.debug("{} Created state changelog topics {} from the parsed topology.", logPrefix, changelogTopicMetadata.values());
 
         // ---------------- Step Two ---------------- //
 
@@ -492,13 +495,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             states.put(entry.getKey(), entry.getValue().state);
         }
 
-        log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
-                streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+        log.debug("{} Assigning tasks {} to clients {} with number of replicas {}",
+                logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas);
 
         final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet());
         taskAssignor.assign(numStandbyReplicas);
 
-        log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
+        log.info("{} Assigned tasks to clients as {}.", logPrefix, states);
 
         // ---------------- Step Three ---------------- //
 
@@ -589,8 +592,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         // could be duplicated if one task has more than one assigned partitions
         if (partitions.size() != info.activeTasks.size()) {
             throw new TaskAssignmentException(
-                    String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
-                            ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+                    String.format("%s Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+                            ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
             );
         }
 
@@ -643,7 +646,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
      * @param topicPartitions Map that contains the topic names to be created with the number of partitions
      */
     private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
-        log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
+        log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
 
         // first construct the topics to make ready
         Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
@@ -657,7 +660,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 continue;
             }
             if (numPartitions < 0) {
-                throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+                throw new TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
             }
 
             topicsToMakeReady.put(topic, numPartitions);
@@ -677,7 +680,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
-        log.debug("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+        log.debug("{} Completed validating internal topics in partition assignor", logPrefix);
     }
 
     private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
@@ -765,10 +768,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
     }
 
     static class CopartitionedTopicsValidator {
-        private final String threadName;
+        private final String logPrefix;
 
         CopartitionedTopicsValidator(final String threadName) {
-            this.threadName = threadName;
+            this.logPrefix = String.format("stream-thread [%s]", threadName);
         }
 
         void validate(final Set<String> copartitionGroup,
@@ -781,7 +784,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     final Integer partitions = metadata.partitionCountForTopic(topic);
 
                     if (partitions == null) {
-                        throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", threadName, topic));
+                        throw new TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
                     }
 
                     if (numPartitions == UNKNOWN) {
@@ -789,7 +792,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                     } else if (numPartitions != partitions) {
                         final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                         Arrays.sort(topics);
-                        throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.join(Arrays.asList(topics), ",")));
+                        throw new TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
                     }
                 } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
                     numPartitions = NOT_AVAILABLE;