You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/05 18:56:49 UTC
[kafka] branch trunk updated: KAFKA-6054: Code cleanup to prepare
the actual fix for an upgrade path (#4630)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a4ba75 KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)
2a4ba75 is described below
commit 2a4ba75e1338eaa97da87077330b43d7448a18bc
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Mar 5 10:56:42 2018 -0800
KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path (#4630)
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../org/apache/kafka/streams/StreamsConfig.java | 4 +-
.../internals/StreamsPartitionAssignor.java | 230 ++++++++++++--------
.../internals/assignment/AssignmentInfo.java | 236 +++++++++++++--------
.../internals/assignment/SubscriptionInfo.java | 224 +++++++++++++------
.../integration/QueryableStateIntegrationTest.java | 1 -
.../internals/StreamsPartitionAssignorTest.java | 95 ++++++---
.../internals/assignment/AssignmentInfoTest.java | 19 +-
.../internals/assignment/SubscriptionInfoTest.java | 10 +-
8 files changed, 530 insertions(+), 289 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6b36261..47becfc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -226,11 +226,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
- /** {@code default timestamp.extractor} */
+ /** {@code default.timestamp.extractor} */
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
- /** {@code default value.serde} */
+ /** {@code default.value.serde} */
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 9aa0e94..71a84b2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -66,7 +66,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
public final TaskId taskId;
public final TopicPartition partition;
- AssignedPartition(final TaskId taskId, final TopicPartition partition) {
+ AssignedPartition(final TaskId taskId,
+ final TopicPartition partition) {
this.taskId = taskId;
this.partition = partition;
}
@@ -77,11 +78,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (!(o instanceof AssignedPartition)) {
return false;
}
- AssignedPartition other = (AssignedPartition) o;
+ final AssignedPartition other = (AssignedPartition) o;
return compareTo(other) == 0;
}
@@ -104,8 +105,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final String host = getHost(endPoint);
final Integer port = getPort(endPoint);
- if (host == null || port == null)
+ if (host == null || port == null) {
throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
+ }
hostInfo = new HostInfo(host, port);
} else {
@@ -119,10 +121,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
state = new ClientState();
}
- void addConsumer(final String consumerMemberId, final SubscriptionInfo info) {
+ void addConsumer(final String consumerMemberId,
+ final SubscriptionInfo info) {
consumers.add(consumerMemberId);
- state.addPreviousActiveTasks(info.prevTasks);
- state.addPreviousStandbyTasks(info.standbyTasks);
+ state.addPreviousActiveTasks(info.prevTasks());
+ state.addPreviousStandbyTasks(info.standbyTasks());
state.incrementCapacity();
}
@@ -157,8 +160,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() {
@Override
- public int compare(TopicPartition p1, TopicPartition p2) {
- int result = p1.topic().compareTo(p2.topic());
+ public int compare(final TopicPartition p1,
+ final TopicPartition p2) {
+ final int result = p1.topic().compareTo(p2.topic());
if (result != 0) {
return result;
@@ -194,15 +198,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
if (o == null) {
- KafkaException ex = new KafkaException("TaskManager is not specified");
- log.error(ex.getMessage(), ex);
- throw ex;
+ final KafkaException fatalException = new KafkaException("TaskManager is not specified");
+ log.error(fatalException.getMessage(), fatalException);
+ throw fatalException;
}
if (!(o instanceof TaskManager)) {
- KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
- log.error(ex.getMessage(), ex);
- throw ex;
+ final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName()));
+ log.error(fatalException.getMessage(), fatalException);
+ throw fatalException;
}
taskManager = (TaskManager) o;
@@ -214,14 +218,14 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final String userEndPoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (userEndPoint != null && !userEndPoint.isEmpty()) {
try {
- String host = getHost(userEndPoint);
- Integer port = getPort(userEndPoint);
+ final String host = getHost(userEndPoint);
+ final Integer port = getPort(userEndPoint);
if (host == null || port == null)
throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair" +
" but received %s",
logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, userEndPoint));
- } catch (NumberFormatException nfe) {
+ } catch (final NumberFormatException nfe) {
throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s",
logPrefix, userEndPoint, StreamsConfig.APPLICATION_SERVER_CONFIG));
}
@@ -240,7 +244,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
@Override
- public Subscription subscription(Set<String> topics) {
+ public Subscription subscription(final Set<String> topics) {
// Adds the following information to subscription
// 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
// 2. Task ids of previously running tasks
@@ -249,7 +253,11 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Set<TaskId> previousActiveTasks = taskManager.prevActiveTaskIds();
final Set<TaskId> standbyTasks = taskManager.cachedTasksIds();
standbyTasks.removeAll(previousActiveTasks);
- final SubscriptionInfo data = new SubscriptionInfo(taskManager.processId(), previousActiveTasks, standbyTasks, this.userEndPoint);
+ final SubscriptionInfo data = new SubscriptionInfo(
+ taskManager.processId(),
+ previousActiveTasks,
+ standbyTasks,
+ this.userEndPoint);
taskManager.updateSubscriptionsFromMetadata(topics);
@@ -277,22 +285,32 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* 3. within each client, tasks are assigned to consumer clients in round-robin manner.
*/
@Override
- public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) {
+ public Map<String, Assignment> assign(final Cluster metadata,
+ final Map<String, Subscription> subscriptions) {
// construct the client metadata from the decoded subscription info
- Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
-
- for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
- String consumerId = entry.getKey();
- Subscription subscription = entry.getValue();
-
- SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+ final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
+
+ int minUserMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+ for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
+ final String consumerId = entry.getKey();
+ final Subscription subscription = entry.getValue();
+
+ final SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+ final int usedVersion = info.version();
+ if (usedVersion > SubscriptionInfo.LATEST_SUPPORTED_VERSION) {
+ throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION);
+ }
+ if (usedVersion < minUserMetadataVersion) {
+ minUserMetadataVersion = usedVersion;
+ }
// create the new client metadata if necessary
- ClientMetadata clientMetadata = clientsMetadata.get(info.processId);
+ ClientMetadata clientMetadata = clientsMetadata.get(info.processId());
if (clientMetadata == null) {
- clientMetadata = new ClientMetadata(info.userEndPoint);
- clientsMetadata.put(info.processId, clientMetadata);
+ clientMetadata = new ClientMetadata(info.userEndPoint());
+ clientsMetadata.put(info.processId(), clientMetadata);
}
// add the consumer to the client
@@ -309,8 +327,8 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = taskManager.builder().topicGroups();
final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>();
- for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
- for (InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
+ for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic));
}
}
@@ -319,13 +337,13 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
do {
numPartitionsNeeded = false;
- for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
- for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
+ for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
+ for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
int numPartitions = repartitionTopicMetadata.get(topicName).numPartitions;
// try set the number of partitions for this repartition topic if it is not set yet
if (numPartitions == UNKNOWN) {
- for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
+ for (final InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) {
@@ -375,7 +393,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// augment the metadata with the newly computed number of partitions for all the
// repartition source topics
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<>();
- for (Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
+ for (final Map.Entry<String, InternalTopicMetadata> entry : repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
final int numPartitions = entry.getValue().numPartitions;
@@ -395,7 +413,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// get the tasks as partition groups from the partition grouper
final Set<String> allSourceTopics = new HashSet<>();
final Map<Integer, Set<String>> sourceTopicsByGroup = new HashMap<>();
- for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
allSourceTopics.addAll(entry.getValue().sourceTopics);
sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
}
@@ -405,9 +423,9 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// check if all partitions are assigned, and there are no duplicates of partitions in multiple tasks
final Set<TopicPartition> allAssignedPartitions = new HashSet<>();
final Map<Integer, Set<TaskId>> tasksByTopicGroup = new HashMap<>();
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
+ for (final Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
final Set<TopicPartition> partitions = entry.getValue();
- for (TopicPartition partition : partitions) {
+ for (final TopicPartition partition : partitions) {
if (allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask);
}
@@ -422,10 +440,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
ids.add(id);
}
- for (String topic : allSourceTopics) {
+ for (final String topic : allSourceTopics) {
final List<PartitionInfo> partitionInfoList = fullMetadata.partitionsForTopic(topic);
if (!partitionInfoList.isEmpty()) {
- for (PartitionInfo partitionInfo : partitionInfoList) {
+ for (final PartitionInfo partitionInfo : partitionInfoList) {
final TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask);
@@ -438,15 +456,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// add tasks to state change log topic subscribers
final Map<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<>();
- for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
+ for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
- for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
+ for (final InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
// the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = UNKNOWN;
if (tasksByTopicGroup.get(topicGroupId) != null) {
- for (TaskId task : tasksByTopicGroup.get(topicGroupId)) {
+ for (final TaskId task : tasksByTopicGroup.get(topicGroupId)) {
if (numPartitions < task.partition + 1)
numPartitions = task.partition + 1;
}
@@ -468,7 +486,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// assign tasks to clients
final Map<UUID, ClientState> states = new HashMap<>();
- for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
states.put(entry.getKey(), entry.getValue().state);
}
@@ -484,25 +502,27 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// construct the global partition assignment per host map
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>();
- for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
- final HostInfo hostInfo = entry.getValue().hostInfo;
+ if (minUserMetadataVersion == 2) {
+ for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ final HostInfo hostInfo = entry.getValue().hostInfo;
- if (hostInfo != null) {
- final Set<TopicPartition> topicPartitions = new HashSet<>();
- final ClientState state = entry.getValue().state;
+ if (hostInfo != null) {
+ final Set<TopicPartition> topicPartitions = new HashSet<>();
+ final ClientState state = entry.getValue().state;
- for (final TaskId id : state.activeTasks()) {
- topicPartitions.addAll(partitionsForTask.get(id));
- }
+ for (final TaskId id : state.activeTasks()) {
+ topicPartitions.addAll(partitionsForTask.get(id));
+ }
- partitionsByHostState.put(hostInfo, topicPartitions);
+ partitionsByHostState.put(hostInfo, topicPartitions);
+ }
}
}
taskManager.setPartitionsByHostState(partitionsByHostState);
// within the client, distribute tasks to its owned consumers
final Map<String, Assignment> assignment = new HashMap<>();
- for (Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
+ for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) {
final Set<String> consumers = entry.getValue().consumers;
final ClientState state = entry.getValue().state;
@@ -511,7 +531,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
int consumerTaskIndex = 0;
- for (String consumer : consumers) {
+ for (final String consumer : consumers) {
final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
final ArrayList<AssignedPartition> assignedPartitions = new ArrayList<>();
@@ -540,13 +560,15 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
Collections.sort(assignedPartitions);
final List<TaskId> active = new ArrayList<>();
final List<TopicPartition> activePartitions = new ArrayList<>();
- for (AssignedPartition partition : assignedPartitions) {
+ for (final AssignedPartition partition : assignedPartitions) {
active.add(partition.taskId);
activePartitions.add(partition.partition);
}
// finally, encode the assignment before sending back to coordinator
- assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
+ assignment.put(consumer, new Assignment(
+ activePartitions,
+ new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode()));
}
}
@@ -577,26 +599,54 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@Override
- public void onAssignment(Assignment assignment) {
- List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
+ public void onAssignment(final Assignment assignment) {
+ final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
Collections.sort(partitions, PARTITION_COMPARATOR);
- AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+ final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+ final int usedVersion = info.version();
- Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ // version 1 field
+ final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+ // version 2 fields
+ final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+ switch (usedVersion) {
+ case 1:
+ processVersionOneAssignment(info, partitions, activeTasks);
+ partitionsByHost = Collections.emptyMap();
+ break;
+ case 2:
+ processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo);
+ partitionsByHost = info.partitionsByHost();
+ break;
+ default:
+ throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ + "; latest supported version: " + AssignmentInfo.LATEST_SUPPORTED_VERSION);
+ }
+ taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
+ taskManager.setPartitionsByHostState(partitionsByHost);
+ taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
+ taskManager.updateSubscriptionsFromAssignment(partitions);
+ }
+
+ private void processVersionOneAssignment(final AssignmentInfo info,
+ final List<TopicPartition> partitions,
+ final Map<TaskId, Set<TopicPartition>> activeTasks) {
// the number of assigned partitions should be the same as number of active tasks, which
// could be duplicated if one task has more than one assigned partitions
- if (partitions.size() != info.activeTasks.size()) {
+ if (partitions.size() != info.activeTasks().size()) {
throw new TaskAssignmentException(
- String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
- ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString())
+ String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" +
+ ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString())
);
}
for (int i = 0; i < partitions.size(); i++) {
- TopicPartition partition = partitions.get(i);
- TaskId id = info.activeTasks.get(i);
+ final TopicPartition partition = partitions.get(i);
+ final TaskId id = info.activeTasks().get(i);
Set<TopicPartition> assignedPartitions = activeTasks.get(id);
if (assignedPartitions == null) {
@@ -605,23 +655,23 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
assignedPartitions.add(partition);
}
+ }
- final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>();
- for (Set<TopicPartition> value : info.partitionsByHost.values()) {
- for (TopicPartition topicPartition : value) {
- topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(),
- topicPartition.partition(),
- null,
- new Node[0],
- new Node[0]));
+ private void processVersionTwoAssignment(final AssignmentInfo info,
+ final List<TopicPartition> partitions,
+ final Map<TaskId, Set<TopicPartition>> activeTasks,
+ final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
+ processVersionOneAssignment(info, partitions, activeTasks);
+
+ // process partitions by host
+ final Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
+ for (final Set<TopicPartition> value : partitionsByHost.values()) {
+ for (final TopicPartition topicPartition : value) {
+ topicToPartitionInfo.put(
+ topicPartition,
+ new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
}
}
-
- taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
- taskManager.setPartitionsByHostState(info.partitionsByHost);
- taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks);
-
- taskManager.updateSubscriptionsFromAssignment(partitions);
}
/**
@@ -658,10 +708,10 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
log.debug("Completed validating internal topics in partition assignor.");
}
- private void ensureCopartitioning(Collection<Set<String>> copartitionGroups,
- Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
- Cluster metadata) {
- for (Set<String> copartitionGroup : copartitionGroups) {
+ private void ensureCopartitioning(final Collection<Set<String>> copartitionGroups,
+ final Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions,
+ final Cluster metadata) {
+ for (final Set<String> copartitionGroup : copartitionGroups) {
copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
}
}
@@ -677,7 +727,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
private final Set<String> updatedTopicSubscriptions = new HashSet<>();
- public void updateTopics(Collection<String> topicNames) {
+ public void updateTopics(final Collection<String> topicNames) {
updatedTopicSubscriptions.clear();
updatedTopicSubscriptions.addAll(topicNames);
}
@@ -735,7 +785,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
// if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the number of partitions.
if (numPartitions == UNKNOWN) {
- for (Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
+ for (final Map.Entry<String, InternalTopicMetadata> entry: allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
final int partitions = entry.getValue().numPartitions;
if (partitions > numPartitions) {
@@ -745,7 +795,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
}
// enforce co-partitioning restrictions to repartition topics by updating their number of partitions
- for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
+ for (final Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
entry.getValue().numPartitions = numPartitions;
}
@@ -755,7 +805,7 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable
}
// following functions are for test only
- void setInternalTopicManager(InternalTopicManager internalTopicManager) {
+ void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 8607472..c8df749 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -39,76 +39,123 @@ import java.util.Set;
public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
- /**
- * A new field was added, partitionsByHost. CURRENT_VERSION
- * is required so we can decode the previous version. For example, this may occur
- * during a rolling upgrade
- */
- private static final int CURRENT_VERSION = 2;
- public final int version;
- public final List<TaskId> activeTasks; // each element corresponds to a partition
- public final Map<TaskId, Set<TopicPartition>> standbyTasks;
- public final Map<HostInfo, Set<TopicPartition>> partitionsByHost;
- public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
- Map<HostInfo, Set<TopicPartition>> hostState) {
- this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
+ public static final int LATEST_SUPPORTED_VERSION = 2;
+
+ private final int usedVersion;
+ private List<TaskId> activeTasks;
+ private Map<TaskId, Set<TopicPartition>> standbyTasks;
+ private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
+
+ private AssignmentInfo(final int version) {
+ this.usedVersion = version;
+ }
+
+ public AssignmentInfo(final List<TaskId> activeTasks,
+ final Map<TaskId, Set<TopicPartition>> standbyTasks,
+ final Map<HostInfo, Set<TopicPartition>> hostState) {
+ this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState);
}
- protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
- Map<HostInfo, Set<TopicPartition>> hostState) {
- this.version = version;
+ public AssignmentInfo(final int version,
+ final List<TaskId> activeTasks,
+ final Map<TaskId, Set<TopicPartition>> standbyTasks,
+ final Map<HostInfo, Set<TopicPartition>> hostState) {
+ this.usedVersion = version;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.partitionsByHost = hostState;
}
+ public int version() {
+ return usedVersion;
+ }
+
+ public List<TaskId> activeTasks() {
+ return activeTasks;
+ }
+
+ public Map<TaskId, Set<TopicPartition>> standbyTasks() {
+ return standbyTasks;
+ }
+
+ public Map<HostInfo, Set<TopicPartition>> partitionsByHost() {
+ return partitionsByHost;
+ }
+
/**
* @throws TaskAssignmentException if method fails to encode the data, e.g., if there is an
* IO exception during encoding
*/
public ByteBuffer encode() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
-
- try {
- // Encode version
- out.writeInt(version);
- // Encode active tasks
- out.writeInt(activeTasks.size());
- for (TaskId id : activeTasks) {
- id.writeTo(out);
- }
- // Encode standby tasks
- out.writeInt(standbyTasks.size());
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
- TaskId id = entry.getKey();
- id.writeTo(out);
-
- Set<TopicPartition> partitions = entry.getValue();
- writeTopicPartitions(out, partitions);
- }
- out.writeInt(partitionsByHost.size());
- for (Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost
- .entrySet()) {
- final HostInfo hostInfo = entry.getKey();
- out.writeUTF(hostInfo.host());
- out.writeInt(hostInfo.port());
- writeTopicPartitions(out, entry.getValue());
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (final DataOutputStream out = new DataOutputStream(baos)) {
+ switch (usedVersion) {
+ case 1:
+ encodeVersionOne(out);
+ break;
+ case 2:
+ encodeVersionTwo(out);
+ break;
+ default:
+ throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}
out.flush();
out.close();
return ByteBuffer.wrap(baos.toByteArray());
- } catch (IOException ex) {
+ } catch (final IOException ex) {
throw new TaskAssignmentException("Failed to encode AssignmentInfo", ex);
}
}
- private void writeTopicPartitions(DataOutputStream out, Set<TopicPartition> partitions) throws IOException {
+ private void encodeVersionOne(final DataOutputStream out) throws IOException {
+ out.writeInt(1); // version
+ encodeActiveAndStandbyTaskAssignment(out);
+ }
+
+ private void encodeActiveAndStandbyTaskAssignment(final DataOutputStream out) throws IOException {
+ // encode active tasks
+ out.writeInt(activeTasks.size());
+ for (final TaskId id : activeTasks) {
+ id.writeTo(out);
+ }
+
+ // encode standby tasks
+ out.writeInt(standbyTasks.size());
+ for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
+ final TaskId id = entry.getKey();
+ id.writeTo(out);
+
+ final Set<TopicPartition> partitions = entry.getValue();
+ writeTopicPartitions(out, partitions);
+ }
+ }
+
+ private void encodeVersionTwo(final DataOutputStream out) throws IOException {
+ out.writeInt(2); // version
+ encodeActiveAndStandbyTaskAssignment(out);
+ encodePartitionsByHost(out);
+ }
+
+ private void encodePartitionsByHost(final DataOutputStream out) throws IOException {
+ // encode partitions by host
+ out.writeInt(partitionsByHost.size());
+ for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
+ final HostInfo hostInfo = entry.getKey();
+ out.writeUTF(hostInfo.host());
+ out.writeInt(hostInfo.port());
+ writeTopicPartitions(out, entry.getValue());
+ }
+ }
+
+ private void writeTopicPartitions(final DataOutputStream out,
+ final Set<TopicPartition> partitions) throws IOException {
out.writeInt(partitions.size());
- for (TopicPartition partition : partitions) {
+ for (final TopicPartition partition : partitions) {
out.writeUTF(partition.topic());
out.writeInt(partition.partition());
}
@@ -117,52 +164,69 @@ public class AssignmentInfo {
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
- public static AssignmentInfo decode(ByteBuffer data) {
+ public static AssignmentInfo decode(final ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
- try (DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
- // Decode version
- int version = in.readInt();
- if (version < 0 || version > CURRENT_VERSION) {
- TaskAssignmentException ex = new TaskAssignmentException("Unknown assignment data version: " + version);
- log.error(ex.getMessage(), ex);
- throw ex;
- }
+ try (final DataInputStream in = new DataInputStream(new ByteBufferInputStream(data))) {
+ // decode used version
+ final int usedVersion = in.readInt();
+ final AssignmentInfo assignmentInfo = new AssignmentInfo(usedVersion);
- // Decode active tasks
- int count = in.readInt();
- List<TaskId> activeTasks = new ArrayList<>(count);
- for (int i = 0; i < count; i++) {
- activeTasks.add(TaskId.readFrom(in));
- }
- // Decode standby tasks
- count = in.readInt();
- Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(count);
- for (int i = 0; i < count; i++) {
- TaskId id = TaskId.readFrom(in);
- standbyTasks.put(id, readTopicPartitions(in));
+ switch (usedVersion) {
+ case 1:
+ decodeVersionOneData(assignmentInfo, in);
+ break;
+ case 2:
+ decodeVersionTwoData(assignmentInfo, in);
+ break;
+ default:
+ TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+ "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+ log.error(fatalException.getMessage(), fatalException);
+ throw fatalException;
}
- Map<HostInfo, Set<TopicPartition>> hostStateToTopicPartitions = new HashMap<>();
- if (version == CURRENT_VERSION) {
- int numEntries = in.readInt();
- for (int i = 0; i < numEntries; i++) {
- HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
- hostStateToTopicPartitions.put(hostInfo, readTopicPartitions(in));
- }
- }
+ return assignmentInfo;
+ } catch (final IOException ex) {
+ throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
+ }
+ }
+
+ private static void decodeVersionOneData(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
+ // decode active tasks
+ int count = in.readInt();
+ assignmentInfo.activeTasks = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ assignmentInfo.activeTasks.add(TaskId.readFrom(in));
+ }
- return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
+ // decode standby tasks
+ count = in.readInt();
+ assignmentInfo.standbyTasks = new HashMap<>(count);
+ for (int i = 0; i < count; i++) {
+ TaskId id = TaskId.readFrom(in);
+ assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
+ }
+ }
- } catch (IOException ex) {
- throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
+ private static void decodeVersionTwoData(final AssignmentInfo assignmentInfo,
+ final DataInputStream in) throws IOException {
+ decodeVersionOneData(assignmentInfo, in);
+
+ // decode partitions by host
+ assignmentInfo.partitionsByHost = new HashMap<>();
+ final int numEntries = in.readInt();
+ for (int i = 0; i < numEntries; i++) {
+ final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
+ assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in));
}
}
- private static Set<TopicPartition> readTopicPartitions(DataInputStream in) throws IOException {
- int numPartitions = in.readInt();
- Set<TopicPartition> partitions = new HashSet<>(numPartitions);
+ private static Set<TopicPartition> readTopicPartitions(final DataInputStream in) throws IOException {
+ final int numPartitions = in.readInt();
+ final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
for (int j = 0; j < numPartitions; j++) {
partitions.add(new TopicPartition(in.readUTF(), in.readInt()));
}
@@ -171,14 +235,14 @@ public class AssignmentInfo {
@Override
public int hashCode() {
- return version ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
+ return usedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode();
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (o instanceof AssignmentInfo) {
- AssignmentInfo other = (AssignmentInfo) o;
- return this.version == other.version &&
+ final AssignmentInfo other = (AssignmentInfo) o;
+ return this.usedVersion == other.usedVersion &&
this.activeTasks.equals(other.activeTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
this.partitionsByHost.equals(other.partitionsByHost);
@@ -189,7 +253,7 @@ public class AssignmentInfo {
@Override
public String toString() {
- return "[version=" + version + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
+ return "[version=" + usedVersion + ", active tasks=" + activeTasks.size() + ", standby tasks=" + standbyTasks.size() + "]";
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index f583dba..7fee90b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -31,42 +31,96 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
- private static final int CURRENT_VERSION = 2;
+ public static final int LATEST_SUPPORTED_VERSION = 2;
- public final int version;
- public final UUID processId;
- public final Set<TaskId> prevTasks;
- public final Set<TaskId> standbyTasks;
- public final String userEndPoint;
+ private final int usedVersion;
+ private UUID processId;
+ private Set<TaskId> prevTasks;
+ private Set<TaskId> standbyTasks;
+ private String userEndPoint;
- public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
- this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+ private SubscriptionInfo(final int version) {
+ this.usedVersion = version;
}
- private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
- this.version = version;
+ public SubscriptionInfo(final UUID processId,
+ final Set<TaskId> prevTasks,
+ final Set<TaskId> standbyTasks,
+ final String userEndPoint) {
+ this(LATEST_SUPPORTED_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
+ }
+
+ public SubscriptionInfo(final int version,
+ final UUID processId,
+ final Set<TaskId> prevTasks,
+ final Set<TaskId> standbyTasks,
+ final String userEndPoint) {
+ this.usedVersion = version;
this.processId = processId;
this.prevTasks = prevTasks;
this.standbyTasks = standbyTasks;
this.userEndPoint = userEndPoint;
}
+ public int version() {
+ return usedVersion;
+ }
+
+ public UUID processId() {
+ return processId;
+ }
+
+ public Set<TaskId> prevTasks() {
+ return prevTasks;
+ }
+
+ public Set<TaskId> standbyTasks() {
+ return standbyTasks;
+ }
+
+ public String userEndPoint() {
+ return userEndPoint;
+ }
+
/**
* @throws TaskAssignmentException if method fails to encode the data
*/
public ByteBuffer encode() {
- byte[] endPointBytes;
- if (userEndPoint == null) {
- endPointBytes = new byte[0];
- } else {
- endPointBytes = userEndPoint.getBytes(Charset.forName("UTF-8"));
+ final ByteBuffer buf;
+
+ switch (usedVersion) {
+ case 1:
+ buf = encodeVersionOne();
+ break;
+ case 2:
+ buf = encodeVersionTwo(prepareUserEndPoint());
+ break;
+ default:
+ throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
}
- ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 +
- prevTasks.size() * 8 + 4 + standbyTasks.size() * 8
- + 4 /* length of bytes */ + endPointBytes.length
- );
- // version
- buf.putInt(version);
+
+ buf.rewind();
+ return buf;
+ }
+
+ private ByteBuffer encodeVersionOne() {
+ final ByteBuffer buf = ByteBuffer.allocate(getVersionOneByteLength());
+
+ buf.putInt(1); // version
+ encodeVersionOneData(buf);
+
+ return buf;
+ }
+
+ private int getVersionOneByteLength() {
+ return 4 + // version
+ 16 + // client ID
+ 4 + prevTasks.size() * 8 + // length + prev tasks
+ 4 + standbyTasks.size() * 8; // length + standby tasks
+ }
+
+ private void encodeVersionOneData(final ByteBuffer buf) {
// encode client UUID
buf.putLong(processId.getMostSignificantBits());
buf.putLong(processId.getLeastSignificantBits());
@@ -80,60 +134,104 @@ public class SubscriptionInfo {
for (TaskId id : standbyTasks) {
id.writeTo(buf);
}
- buf.putInt(endPointBytes.length);
- buf.put(endPointBytes);
- buf.rewind();
+ }
+
+ private byte[] prepareUserEndPoint() {
+ if (userEndPoint == null) {
+ return new byte[0];
+ } else {
+ return userEndPoint.getBytes(Charset.forName("UTF-8"));
+ }
+ }
+
+ private ByteBuffer encodeVersionTwo(final byte[] endPointBytes) {
+ final ByteBuffer buf = ByteBuffer.allocate(getVersionTwoByteLength(endPointBytes));
+
+ buf.putInt(2); // version
+ encodeVersionTwoData(buf, endPointBytes);
+
return buf;
}
+ private int getVersionTwoByteLength(final byte[] endPointBytes) {
+ return getVersionOneByteLength() +
+ 4 + endPointBytes.length; // length + userEndPoint
+ }
+
+ private void encodeVersionTwoData(final ByteBuffer buf,
+ final byte[] endPointBytes) {
+ encodeVersionOneData(buf);
+ if (endPointBytes != null) {
+ buf.putInt(endPointBytes.length);
+ buf.put(endPointBytes);
+ }
+ }
+
/**
* @throws TaskAssignmentException if method fails to decode the data
*/
- public static SubscriptionInfo decode(ByteBuffer data) {
+ public static SubscriptionInfo decode(final ByteBuffer data) {
// ensure we are at the beginning of the ByteBuffer
data.rewind();
- // Decode version
- int version = data.getInt();
- if (version == CURRENT_VERSION || version == 1) {
- // Decode client UUID
- UUID processId = new UUID(data.getLong(), data.getLong());
- // Decode previously active tasks
- Set<TaskId> prevTasks = new HashSet<>();
- int numPrevs = data.getInt();
- for (int i = 0; i < numPrevs; i++) {
- TaskId id = TaskId.readFrom(data);
- prevTasks.add(id);
- }
- // Decode previously cached tasks
- Set<TaskId> standbyTasks = new HashSet<>();
- int numCached = data.getInt();
- for (int i = 0; i < numCached; i++) {
- standbyTasks.add(TaskId.readFrom(data));
- }
-
- String userEndPoint = null;
- if (version == CURRENT_VERSION) {
- int bytesLength = data.getInt();
- if (bytesLength != 0) {
- byte[] bytes = new byte[bytesLength];
- data.get(bytes);
- userEndPoint = new String(bytes, Charset.forName("UTF-8"));
- }
-
- }
- return new SubscriptionInfo(version, processId, prevTasks, standbyTasks, userEndPoint);
+ // decode used version
+ final int usedVersion = data.getInt();
+ final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(usedVersion);
+
+ switch (usedVersion) {
+ case 1:
+ decodeVersionOneData(subscriptionInfo, data);
+ break;
+ case 2:
+ decodeVersionTwoData(subscriptionInfo, data);
+ break;
+ default:
+ TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode subscription data: " +
+ "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
+ log.error(fatalException.getMessage(), fatalException);
+ throw fatalException;
+ }
+
+ return subscriptionInfo;
+ }
- } else {
- TaskAssignmentException ex = new TaskAssignmentException("unable to decode subscription data: version=" + version);
- log.error(ex.getMessage(), ex);
- throw ex;
+ private static void decodeVersionOneData(final SubscriptionInfo subscriptionInfo,
+ final ByteBuffer data) {
+ // decode client UUID
+ subscriptionInfo.processId = new UUID(data.getLong(), data.getLong());
+
+ // decode previously active tasks
+ final int numPrevs = data.getInt();
+ subscriptionInfo.prevTasks = new HashSet<>();
+ for (int i = 0; i < numPrevs; i++) {
+ TaskId id = TaskId.readFrom(data);
+ subscriptionInfo.prevTasks.add(id);
+ }
+
+ // decode previously cached tasks
+ final int numCached = data.getInt();
+ subscriptionInfo.standbyTasks = new HashSet<>();
+ for (int i = 0; i < numCached; i++) {
+ subscriptionInfo.standbyTasks.add(TaskId.readFrom(data));
+ }
+ }
+
+ private static void decodeVersionTwoData(final SubscriptionInfo subscriptionInfo,
+ final ByteBuffer data) {
+ decodeVersionOneData(subscriptionInfo, data);
+
+ // decode user end point (can be null)
+ int bytesLength = data.getInt();
+ if (bytesLength != 0) {
+ final byte[] bytes = new byte[bytesLength];
+ data.get(bytes);
+ subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8"));
}
}
@Override
public int hashCode() {
- int hashCode = version ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
+ final int hashCode = usedVersion ^ processId.hashCode() ^ prevTasks.hashCode() ^ standbyTasks.hashCode();
if (userEndPoint == null) {
return hashCode;
}
@@ -141,10 +239,10 @@ public class SubscriptionInfo {
}
@Override
- public boolean equals(Object o) {
+ public boolean equals(final Object o) {
if (o instanceof SubscriptionInfo) {
- SubscriptionInfo other = (SubscriptionInfo) o;
- return this.version == other.version &&
+ final SubscriptionInfo other = (SubscriptionInfo) o;
+ return this.usedVersion == other.usedVersion &&
this.processId.equals(other.processId) &&
this.prevTasks.equals(other.prevTasks) &&
this.standbyTasks.equals(other.standbyTasks) &&
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 8b4e895..bb06c72 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -376,7 +376,6 @@ public class QueryableStateIntegrationTest {
}
}
-
@Test
public void queryOnRebalance() throws InterruptedException {
final int numThreads = STREAM_TWO_PARTITIONS;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index bf3f1d1..b0c0d68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -239,17 +239,17 @@ public class StreamsPartitionAssignorTest {
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ allActiveTasks.addAll(info10.activeTasks());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
- allActiveTasks.addAll(info11.activeTasks);
+ allActiveTasks.addAll(info11.activeTasks());
assertEquals(Utils.mkSet(task0, task1), allActiveTasks);
// the third consumer
AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
- allActiveTasks.addAll(info20.activeTasks);
+ allActiveTasks.addAll(info20.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -317,13 +317,13 @@ public class StreamsPartitionAssignorTest {
final AssignmentInfo info10 = AssignmentInfo.decode(assignments.get("consumer10").userData());
final List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
- assertEquals(expectedInfo10TaskIds, info10.activeTasks);
+ assertEquals(expectedInfo10TaskIds, info10.activeTasks());
// the second consumer
final AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
final List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
- assertEquals(expectedInfo11TaskIds, info11.activeTasks);
+ assertEquals(expectedInfo11TaskIds, info11.activeTasks());
}
@Test
@@ -354,7 +354,7 @@ public class StreamsPartitionAssignorTest {
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Utils.mkSet("topic1"), assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ allActiveTasks.addAll(info10.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -394,7 +394,7 @@ public class StreamsPartitionAssignorTest {
// check assignment info
Set<TaskId> allActiveTasks = new HashSet<>();
AssignmentInfo info10 = checkAssignment(Collections.<String>emptySet(), assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ allActiveTasks.addAll(info10.activeTasks());
assertEquals(0, allActiveTasks.size());
assertEquals(Collections.<TaskId>emptySet(), new HashSet<>(allActiveTasks));
@@ -407,7 +407,7 @@ public class StreamsPartitionAssignorTest {
// the first consumer
info10 = checkAssignment(allTopics, assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
+ allActiveTasks.addAll(info10.activeTasks());
assertEquals(3, allActiveTasks.size());
assertEquals(allTasks, new HashSet<>(allActiveTasks));
@@ -455,15 +455,15 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info;
info = AssignmentInfo.decode(assignments.get("consumer10").userData());
- allActiveTasks.addAll(info.activeTasks);
+ allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer10").partitions());
info = AssignmentInfo.decode(assignments.get("consumer11").userData());
- allActiveTasks.addAll(info.activeTasks);
+ allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer11").partitions());
info = AssignmentInfo.decode(assignments.get("consumer20").userData());
- allActiveTasks.addAll(info.activeTasks);
+ allActiveTasks.addAll(info.activeTasks());
allPartitions.addAll(assignments.get("consumer20").partitions());
assertEquals(allTasks, allActiveTasks);
@@ -524,14 +524,14 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info11 = AssignmentInfo.decode(assignments.get("consumer11").userData());
AssignmentInfo info20 = AssignmentInfo.decode(assignments.get("consumer20").userData());
- assertEquals(2, info10.activeTasks.size());
- assertEquals(2, info11.activeTasks.size());
- assertEquals(2, info20.activeTasks.size());
+ assertEquals(2, info10.activeTasks().size());
+ assertEquals(2, info11.activeTasks().size());
+ assertEquals(2, info20.activeTasks().size());
Set<TaskId> allTasks = new HashSet<>();
- allTasks.addAll(info10.activeTasks);
- allTasks.addAll(info11.activeTasks);
- allTasks.addAll(info20.activeTasks);
+ allTasks.addAll(info10.activeTasks());
+ allTasks.addAll(info11.activeTasks());
+ allTasks.addAll(info20.activeTasks());
assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
@@ -603,15 +603,15 @@ public class StreamsPartitionAssignorTest {
// the first consumer
AssignmentInfo info10 = checkAssignment(allTopics, assignments.get("consumer10"));
- allActiveTasks.addAll(info10.activeTasks);
- allStandbyTasks.addAll(info10.standbyTasks.keySet());
+ allActiveTasks.addAll(info10.activeTasks());
+ allStandbyTasks.addAll(info10.standbyTasks().keySet());
// the second consumer
AssignmentInfo info11 = checkAssignment(allTopics, assignments.get("consumer11"));
- allActiveTasks.addAll(info11.activeTasks);
- allStandbyTasks.addAll(info11.standbyTasks.keySet());
+ allActiveTasks.addAll(info11.activeTasks());
+ allStandbyTasks.addAll(info11.standbyTasks().keySet());
- assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
+ assertNotEquals("same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
// check active tasks assigned to the first client
assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks));
@@ -619,8 +619,8 @@ public class StreamsPartitionAssignorTest {
// the third consumer
AssignmentInfo info20 = checkAssignment(allTopics, assignments.get("consumer20"));
- allActiveTasks.addAll(info20.activeTasks);
- allStandbyTasks.addAll(info20.standbyTasks.keySet());
+ allActiveTasks.addAll(info20.activeTasks());
+ allStandbyTasks.addAll(info20.standbyTasks().keySet());
// all task ids are in the active tasks and also in the standby tasks
@@ -847,7 +847,7 @@ public class StreamsPartitionAssignorTest {
configurePartitionAssignor(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG, (Object) userEndPoint));
final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
- assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+ assertEquals("localhost:8080", subscriptionInfo.userEndPoint());
}
@Test
@@ -874,7 +874,7 @@ public class StreamsPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
- final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
+ final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
new TopicPartition("topic1", 1),
new TopicPartition("topic1", 2)), topicPartitions);
@@ -1072,8 +1072,8 @@ public class StreamsPartitionAssignorTest {
final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
- final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
- final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
+ final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
+ final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
allAssignedPartitions.addAll(consumer2Partitions);
assertThat(consumer1partitions, not(allPartitions));
@@ -1095,6 +1095,37 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.configure(config);
}
+ @Test
+ public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() throws Exception {
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.emptySet();
+ subscriptions.put(
+ "consumer1",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+ subscriptions.put(
+ "consumer2",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+ )
+ );
+
+ mockTaskManager(Collections.<TaskId>emptySet(),
+ Collections.<TaskId>emptySet(),
+ UUID.randomUUID(),
+ new InternalTopologyBuilder());
+ partitionAssignor.configure(configProps());
+ final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+ assertThat(assignment.size(), equalTo(2));
+ assertThat(AssignmentInfo.decode(assignment.get("consumer1").userData()).version(), equalTo(1));
+ assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(1));
+ }
+
private PartitionAssignor.Assignment createAssignment(final Map<HostInfo, Set<TopicPartition>> firstHostState) {
final AssignmentInfo info = new AssignmentInfo(Collections.<TaskId>emptyList(),
Collections.<TaskId, Set<TopicPartition>>emptyMap(),
@@ -1111,7 +1142,7 @@ public class StreamsPartitionAssignorTest {
AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
// check if the number of assigned partitions == the size of active task id list
- assertEquals(assignment.partitions().size(), info.activeTasks.size());
+ assertEquals(assignment.partitions().size(), info.activeTasks().size());
// check if active tasks are consistent
List<TaskId> activeTasks = new ArrayList<>();
@@ -1121,14 +1152,14 @@ public class StreamsPartitionAssignorTest {
activeTasks.add(new TaskId(0, partition.partition()));
activeTopics.add(partition.topic());
}
- assertEquals(activeTasks, info.activeTasks);
+ assertEquals(activeTasks, info.activeTasks());
// check if active partitions cover all topics
assertEquals(expectedTopics, activeTopics);
// check if standby tasks are consistent
Set<String> standbyTopics = new HashSet<>();
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) {
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks().entrySet()) {
TaskId id = entry.getKey();
Set<TopicPartition> partitions = entry.getValue();
for (TopicPartition partition : partitions) {
@@ -1139,7 +1170,7 @@ public class StreamsPartitionAssignorTest {
}
}
- if (info.standbyTasks.size() > 0) {
+ if (info.standbyTasks().size() > 0) {
// check if standby partitions cover all topics
assertEquals(expectedTopics, standbyTopics);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ec94ad8..726a562 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class AssignmentInfoTest {
@@ -61,10 +62,10 @@ public class AssignmentInfoTest {
standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
- assertEquals(oldVersion.activeTasks, decoded.activeTasks);
- assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
- assertEquals(0, decoded.partitionsByHost.size()); // should be empty as wasn't in V1
- assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+ assertEquals(oldVersion.activeTasks(), decoded.activeTasks());
+ assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks());
+ assertNull(decoded.partitionsByHost()); // should be null as wasn't in V1
+ assertEquals(1, decoded.version());
}
@@ -76,15 +77,15 @@ public class AssignmentInfoTest {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
// Encode version
- out.writeInt(oldVersion.version);
+ out.writeInt(oldVersion.version());
// Encode active tasks
- out.writeInt(oldVersion.activeTasks.size());
- for (TaskId id : oldVersion.activeTasks) {
+ out.writeInt(oldVersion.activeTasks().size());
+ for (TaskId id : oldVersion.activeTasks()) {
id.writeTo(out);
}
// Encode standby tasks
- out.writeInt(oldVersion.standbyTasks.size());
- for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
+ out.writeInt(oldVersion.standbyTasks().size());
+ for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks().entrySet()) {
TaskId id = entry.getKey();
id.writeTo(out);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 9c011bb..633285a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -65,14 +65,12 @@ public class SubscriptionInfoTest {
final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
- assertEquals(activeTasks, decode.prevTasks);
- assertEquals(standbyTasks, decode.standbyTasks);
- assertEquals(processId, decode.processId);
- assertNull(decode.userEndPoint);
-
+ assertEquals(activeTasks, decode.prevTasks());
+ assertEquals(standbyTasks, decode.standbyTasks());
+ assertEquals(processId, decode.processId());
+ assertNull(decode.userEndPoint());
}
-
/**
* This is a clone of what the V1 encoding did. The encode method has changed for V2
* so it is impossible to test compatibility without having this
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.