You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/16 00:49:39 UTC
kafka git commit: KAFKA-4537: StreamPartitionAssignor incorrectly
adds standby partitions to the partitionsByHostState map
Repository: kafka
Updated Branches:
refs/heads/trunk c2cfadf25 -> 233cd4b18
KAFKA-4537: StreamPartitionAssignor incorrectly adds standby partitions to the partitionsByHostState map
If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` will add the standby partitions to the partitionsByHostState map for each host. This is incorrect as the partitionHostState map is used to resolve which host is hosting a particular store for a key.
The result is that doing metadata lookups for interactive queries can return an incorrect host
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2254 from dguy/KAFKA-4537
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/233cd4b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/233cd4b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/233cd4b1
Branch: refs/heads/trunk
Commit: 233cd4b18af83e9f9eadfaf048cdaa1888faf546
Parents: c2cfadf
Author: Damian Guy <da...@gmail.com>
Authored: Thu Dec 15 16:49:35 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 15 16:49:35 2016 -0800
----------------------------------------------------------------------
.../internals/StreamPartitionAssignor.java | 2 +-
.../internals/StreamPartitionAssignorTest.java | 49 ++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/233cd4b1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 8a94b7e..4ae2d33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -481,7 +481,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
final Set<TopicPartition> topicPartitions = new HashSet<>();
final ClientState<TaskId> state = entry.getValue().state;
- for (TaskId id : state.assignedTasks) {
+ for (TaskId id : state.activeTasks) {
topicPartitions.addAll(partitionsForTask.get(id));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/233cd4b1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index d40956e..0e0620d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -57,6 +57,7 @@ import java.util.Set;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
@@ -888,6 +889,54 @@ public class StreamPartitionAssignorTest {
assertThat(expectedAssignment, equalTo(assignment.get(client).partitions()));
}
+ @Test
+ public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+ final Properties props = configProps();
+ props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+ final StreamsConfig config = new StreamsConfig(props);
+ final KStreamBuilder builder = new KStreamBuilder();
+ final String applicationId = "appId";
+ builder.setApplicationId(applicationId);
+ builder.stream("topic1").groupByKey().count("count");
+
+ final UUID uuid = UUID.randomUUID();
+ final String client = "client1";
+
+ final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+
+ final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+ partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
+
+ final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+ final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+ subscriptions.put(
+ "consumer1",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
+ )
+ );
+
+ subscriptions.put(
+ "consumer2",
+ new PartitionAssignor.Subscription(
+ Collections.singletonList("topic1"),
+ new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()
+ )
+ );
+ final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
+ final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
+ final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
+ final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
+ final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 2171));
+ final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
+ final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
+ allAssignedPartitions.addAll(consumer2Partitions);
+ assertThat(consumer1partitions, not(allPartitions));
+ assertThat(consumer2Partitions, not(allPartitions));
+ assertThat(allAssignedPartitions, equalTo(allPartitions));
+ }
+
private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
// This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.