You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/08 00:20:18 UTC

kafka git commit: KAFKA-2413; New consumer's subscribe(Topic...) api fails if called more than once

Repository: kafka
Updated Branches:
  refs/heads/trunk f6373e4d9 -> 63b820c59


KAFKA-2413; New consumer's subscribe(Topic...) api fails if called more than once

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ashish Singh, Ismael Juma, Jason Gustafson

Closes #122 from onurkaraman/KAFKA-2413 and squashes the following commits:

cc340fc [Onur Karaman] fix ConsumerCoordinator updateConsumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63b820c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63b820c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63b820c5

Branch: refs/heads/trunk
Commit: 63b820c592aba3ec6f26cc99c0c470795029b10c
Parents: f6373e4
Author: Onur Karaman <ok...@linkedin.com>
Authored: Fri Aug 7 15:20:09 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Aug 7 15:20:09 2015 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala |  2 +-
 .../integration/kafka/api/ConsumerTest.scala    | 60 ++++++++++++++++----
 2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/63b820c5/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 6c2df4c..1bceb43 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -326,7 +326,7 @@ class ConsumerCoordinator(val brokerId: Int,
   private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) {
     val topicsToBind = topics -- group.topics
     group.remove(consumer.consumerId)
-    val topicsToUnbind = consumer.topics -- group.topics
+    val topicsToUnbind = consumer.topics -- (group.topics ++ topics)
     group.add(consumer.consumerId, consumer)
     consumer.topics = topics
     coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind)

http://git-wip-us.apache.org/repos/asf/kafka/blob/63b820c5/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index f9e22ba..79f1640 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.kafka.common.TopicPartition
 
 import kafka.utils.{TestUtils, Logging}
 import kafka.server.KafkaConfig
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
 import java.util.ArrayList
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import kafka.coordinator.ConsumerCoordinator
 
 
@@ -95,7 +95,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)), CommitType.SYNC)
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC)
     assertEquals(3, this.consumers(0).committed(tp))
     intercept[NoOffsetForPartitionException] {
       this.consumers(0).committed(tp2)
@@ -103,13 +103,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)), CommitType.SYNC)
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC)
     assertEquals(3, this.consumers(0).committed(tp))
     assertEquals(5, this.consumers(0).committed(tp2))
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)), CommitType.ASYNC, commitCallback)
+    this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
     assertEquals(7, this.consumers(0).committed(tp2))
   }
@@ -182,7 +182,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
     val parts = this.consumers(0).partitionsFor("part-test")
     assertNotNull(parts)
-    assertEquals(2, parts.length)
+    assertEquals(2, parts.size)
     assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
   }
 
@@ -199,9 +199,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     assertNotNull(topics)
     assertEquals(5, topics.size())
     assertEquals(5, topics.keySet().size())
-    assertEquals(2, topics.get(topic1).length)
-    assertEquals(2, topics.get(topic2).length)
-    assertEquals(2, topics.get(topic3).length)
+    assertEquals(2, topics.get(topic1).size)
+    assertEquals(2, topics.get(topic2).size)
+    assertEquals(2, topics.get(topic3).size)
   }
 
   def testPartitionReassignmentCallback() {
@@ -216,9 +216,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
       consumer0.poll(50)
     
     // get metadata for the topic
-    var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
+    var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
     while(parts == null)
-      parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
+      parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
     assertEquals(1, parts.size)
     assertNotNull(parts(0).leader())
     
@@ -256,6 +256,42 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
+  def testExpandingTopicSubscriptions() {
+    val otherTopic = "other"
+    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+    val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    this.consumers(0).subscribe(topic)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).subscriptions == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+
+    TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+    this.consumers(0).subscribe(otherTopic)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).subscriptions == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+  }
+
+  def testShrinkingTopicSubscriptions() {
+    val otherTopic = "other"
+    TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+    val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+    this.consumers(0).subscribe(topic, otherTopic)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).subscriptions == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+
+    this.consumers(0).unsubscribe(otherTopic)
+    TestUtils.waitUntilTrue(() => {
+      this.consumers(0).poll(50)
+      this.consumers(0).subscriptions == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+  }
+
   def testPartitionPauseAndResume() {
     sendRecords(5)
     this.consumers(0).subscribe(tp)
@@ -314,7 +350,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val maxIters = numRecords * 300
     var iters = 0
     while (records.size < numRecords) {
-      for (record <- consumer.poll(50))
+      for (record <- consumer.poll(50).asScala)
         records.add(record)
       if(iters > maxIters)
         throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")