You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/16 00:49:39 UTC

kafka git commit: KAFKA-4537: StreamPartitionAssignor incorrectly adds standby partitions to the partitionsByHostState map

Repository: kafka
Updated Branches:
  refs/heads/trunk c2cfadf25 -> 233cd4b18


KAFKA-4537: StreamPartitionAssignor incorrectly adds standby partitions to the partitionsByHostState map

If a KafkaStreams app is using Standby Tasks then `StreamPartitionAssignor` will add the standby partitions to the partitionsByHostState map for each host. This is incorrect as the partitionHostState map is used to resolve which host is hosting a particular store for a key.
The result is that doing metadata lookups for interactive queries can return an incorrect host

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2254 from dguy/KAFKA-4537


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/233cd4b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/233cd4b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/233cd4b1

Branch: refs/heads/trunk
Commit: 233cd4b18af83e9f9eadfaf048cdaa1888faf546
Parents: c2cfadf
Author: Damian Guy <da...@gmail.com>
Authored: Thu Dec 15 16:49:35 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 15 16:49:35 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      |  2 +-
 .../internals/StreamPartitionAssignorTest.java  | 49 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/233cd4b1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 8a94b7e..4ae2d33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -481,7 +481,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 final Set<TopicPartition> topicPartitions = new HashSet<>();
                 final ClientState<TaskId> state = entry.getValue().state;
 
-                for (TaskId id : state.assignedTasks) {
+                for (TaskId id : state.activeTasks) {
                     topicPartitions.addAll(partitionsForTask.get(id));
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/233cd4b1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index d40956e..0e0620d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -57,6 +57,7 @@ import java.util.Set;
 import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
@@ -888,6 +889,54 @@ public class StreamPartitionAssignorTest {
         assertThat(expectedAssignment, equalTo(assignment.get(client).partitions()));
     }
 
+    @Test
+    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
+        final Properties props = configProps();
+        props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
+        final StreamsConfig config = new StreamsConfig(props);
+        final KStreamBuilder builder = new KStreamBuilder();
+        final String applicationId = "appId";
+        builder.setApplicationId(applicationId);
+        builder.stream("topic1").groupByKey().count("count");
+
+        final UUID uuid = UUID.randomUUID();
+        final String client = "client1";
+
+        final StreamThread streamThread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put(
+                "consumer1",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
+                )
+        );
+
+        subscriptions.put(
+                "consumer2",
+                new PartitionAssignor.Subscription(
+                        Collections.singletonList("topic1"),
+                        new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()
+                )
+        );
+        final Set<TopicPartition> allPartitions = Utils.mkSet(t1p0, t1p1, t1p2);
+        final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata, subscriptions);
+        final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
+        final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
+        final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 2171));
+        final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
+        final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
+        allAssignedPartitions.addAll(consumer2Partitions);
+        assertThat(consumer1partitions, not(allPartitions));
+        assertThat(consumer2Partitions, not(allPartitions));
+        assertThat(allAssignedPartitions, equalTo(allPartitions));
+    }
+
     private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
 
         // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group.