You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/07/06 05:20:28 UTC
kafka git commit: KAFKA-5557: Using a logPrefix inside the
StreamPartitionAssignor
Repository: kafka
Updated Branches:
refs/heads/trunk 70e949d52 -> e68a94056
KAFKA-5557: Using a logPrefix inside the StreamPartitionAssignor
Added logPrefix for avoiding stream thread name formatting replicated more times
Author: ppatierno <pp...@live.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3488 from ppatierno/kafka-5557
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e68a9405
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e68a9405
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e68a9405
Branch: refs/heads/trunk
Commit: e68a94056fa7eb5b8d69815bcfa522247568d3e9
Parents: 70e949d
Author: Paolo Patierno <pp...@live.com>
Authored: Wed Jul 5 22:20:24 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 5 22:20:24 2017 -0700
----------------------------------------------------------------------
.../internals/StreamPartitionAssignor.java | 53 +++++++++++---------
1 file changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e68a9405/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 375f350..4eadb99 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -62,6 +62,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private final static int UNKNOWN = -1;
public final static int NOT_AVAILABLE = -2;
+ private String logPrefix;
+
private static class AssignedPartition implements Comparable<AssignedPartition> {
public final TaskId taskId;
public final TopicPartition partition;
@@ -216,6 +218,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
streamThread = (StreamThread) o;
streamThread.setPartitionAssignor(this);
+ logPrefix = String.format("stream-thread [%s]", streamThread.getName());
+
String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
try {
@@ -223,12 +227,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Integer port = getPort(userEndPoint);
if (host == null || port == null)
- throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair" +
+ throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
" but received %s",
- streamThread.getName(), StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
+ logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
} catch (NumberFormatException nfe) {
- throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s",
- streamThread.getName(), userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
+ throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
+ logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
}
this.userEndPoint = userEndPoint;
@@ -271,8 +275,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
private void updateSubscribedTopics(Set<String> topics) {
SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
- log.debug("stream-thread [{}] found {} topics possibly matching regex",
- streamThread.getName(), topics);
+ log.debug("{} found {} topics possibly matching regex", logPrefix, topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
streamThread.builder.updateSubscriptions(subscriptionUpdates, streamThread.getName());
@@ -321,7 +324,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
clientMetadata.addConsumer(consumerId, info);
}
- log.debug("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", streamThread.getName(), clientsMetadata);
+ log.debug("{} Constructed client metadata {} from the member subscriptions.", logPrefix, clientsMetadata);
// ---------------- Step Zero ---------------- //
@@ -407,7 +410,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
- log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", streamThread.getName(), allRepartitionTopicPartitions.values());
+ log.debug("{} Created repartition topics {} from the parsed topology.", logPrefix, allRepartitionTopicPartitions.values());
// ---------------- Step One ---------------- //
@@ -429,7 +432,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
- log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", streamThread.getName(), partition, partitionsForTask);
+ log.warn("{} Partition {} is assigned to more than one tasks: {}", logPrefix, partition, partitionsForTask);
}
}
allAssignedPartitions.addAll(partitions);
@@ -448,11 +451,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
for (PartitionInfo partitionInfo : partitionInfoList) {
TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
- log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", streamThread.getName(), partition, partitionsForTask);
+ log.warn("{} Partition {} is not assigned to any tasks: {}", logPrefix, partition, partitionsForTask);
}
}
} else {
- log.warn("stream-thread [{}] No partitions found for topic {}", streamThread.getName(), topic);
+ log.warn("{} No partitions found for topic {}", logPrefix, topic);
}
}
@@ -475,14 +478,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
} else {
- log.debug("stream-thread [{}] No tasks found for topic group {}", streamThread.getName(), topicGroupId);
+ log.debug("{} No tasks found for topic group {}", logPrefix, topicGroupId);
}
}
}
prepareTopic(changelogTopicMetadata);
- log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", streamThread.getName(), changelogTopicMetadata.values());
+ log.debug("{} Created state changelog topics {} from the parsed topology.", logPrefix, changelogTopicMetadata.values());
// ---------------- Step Two ---------------- //
@@ -492,13 +495,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
states.put(entry.getKey(), entry.getValue().state);
}
- log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}",
- streamThread.getName(), partitionsForTask.keySet(), states, numStandbyReplicas);
+ log.debug("{} Assigning tasks {} to clients {} with number of replicas {}",
+ logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas);
final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet());
taskAssignor.assign(numStandbyReplicas);
- log.info("stream-thread [{}] Assigned tasks to clients as {}.", streamThread.getName(), states);
+ log.info("{} Assigned tasks to clients as {}.", logPrefix, states);
// ---------------- Step Three ---------------- //
@@ -589,8 +592,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
// could be duplicated if one task has more than one assigned partitions
if (partitions.size() != info.activeTasks.size()) {
throw new TaskAssignmentException(
- String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d" +
- ", assignmentInfo=%s", streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString())
+ String.format("%s Number of assigned partitions %d is not equal to the number of active taskIds %d" +
+ ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
);
}
@@ -643,7 +646,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
* @param topicPartitions Map that contains the topic names to be created with the number of partitions
*/
private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) {
- log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", streamThread.getName());
+ log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix);
// first construct the topics to make ready
Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>();
@@ -657,7 +660,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
continue;
}
if (numPartitions < 0) {
- throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", streamThread.getName(), topic.name()));
+ throw new TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name()));
}
topicsToMakeReady.put(topic, numPartitions);
@@ -677,7 +680,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
}
- log.debug("stream-thread [{}] Completed validating internal topics in partition assignor", streamThread.getName());
+ log.debug("{} Completed validating internal topics in partition assignor", logPrefix);
}
private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) {
@@ -765,10 +768,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
static class CopartitionedTopicsValidator {
- private final String threadName;
+ private final String logPrefix;
CopartitionedTopicsValidator(final String threadName) {
- this.threadName = threadName;
+ this.logPrefix = String.format("stream-thread [%s]", threadName);
}
void validate(final Set<String> copartitionGroup,
@@ -781,7 +784,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Integer partitions = metadata.partitionCountForTopic(topic);
if (partitions == null) {
- throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", threadName, topic));
+ throw new TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic));
}
if (numPartitions == UNKNOWN) {
@@ -789,7 +792,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
} else if (numPartitions != partitions) {
final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
Arrays.sort(topics);
- throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", threadName, Utils.join(Arrays.asList(topics), ",")));
+ throw new TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ",")));
}
} else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
numPartitions = NOT_AVAILABLE;