You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/08 00:20:18 UTC
kafka git commit: KAFKA-2413;
New consumer's subscribe(Topic...) api fails if called more than once
Repository: kafka
Updated Branches:
refs/heads/trunk f6373e4d9 -> 63b820c59
KAFKA-2413; New consumer's subscribe(Topic...) api fails if called more than once
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Ashish Singh, Ismael Juma, Jason Gustafson
Closes #122 from onurkaraman/KAFKA-2413 and squashes the following commits:
cc340fc [Onur Karaman] fix ConsumerCoordinator updateConsumer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63b820c5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63b820c5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63b820c5
Branch: refs/heads/trunk
Commit: 63b820c592aba3ec6f26cc99c0c470795029b10c
Parents: f6373e4
Author: Onur Karaman <ok...@linkedin.com>
Authored: Fri Aug 7 15:20:09 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Aug 7 15:20:09 2015 -0700
----------------------------------------------------------------------
.../kafka/coordinator/ConsumerCoordinator.scala | 2 +-
.../integration/kafka/api/ConsumerTest.scala | 60 ++++++++++++++++----
2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/63b820c5/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 6c2df4c..1bceb43 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -326,7 +326,7 @@ class ConsumerCoordinator(val brokerId: Int,
private def updateConsumer(group: ConsumerGroupMetadata, consumer: ConsumerMetadata, topics: Set[String]) {
val topicsToBind = topics -- group.topics
group.remove(consumer.consumerId)
- val topicsToUnbind = consumer.topics -- group.topics
+ val topicsToUnbind = consumer.topics -- (group.topics ++ topics)
group.add(consumer.consumerId, consumer)
consumer.topics = topics
coordinatorMetadata.bindAndUnbindGroupFromTopics(group.groupId, topicsToBind, topicsToUnbind)
http://git-wip-us.apache.org/repos/asf/kafka/blob/63b820c5/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index f9e22ba..79f1640 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.apache.kafka.common.TopicPartition
import kafka.utils.{TestUtils, Logging}
import kafka.server.KafkaConfig
@@ -26,7 +26,7 @@ import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import kafka.coordinator.ConsumerCoordinator
@@ -95,7 +95,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
this.consumers(0).poll(50)
val pos1 = this.consumers(0).position(tp)
val pos2 = this.consumers(0).position(tp2)
- this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)), CommitType.SYNC)
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp, 3L)).asJava, CommitType.SYNC)
assertEquals(3, this.consumers(0).committed(tp))
intercept[NoOffsetForPartitionException] {
this.consumers(0).committed(tp2)
@@ -103,13 +103,13 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
// positions should not change
assertEquals(pos1, this.consumers(0).position(tp))
assertEquals(pos2, this.consumers(0).position(tp2))
- this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)), CommitType.SYNC)
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 5L)).asJava, CommitType.SYNC)
assertEquals(3, this.consumers(0).committed(tp))
assertEquals(5, this.consumers(0).committed(tp2))
// Using async should pick up the committed changes after commit completes
val commitCallback = new CountConsumerCommitCallback()
- this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)), CommitType.ASYNC, commitCallback)
+ this.consumers(0).commit(Map[TopicPartition,java.lang.Long]((tp2, 7L)).asJava, CommitType.ASYNC, commitCallback)
awaitCommitCallback(this.consumers(0), commitCallback)
assertEquals(7, this.consumers(0).committed(tp2))
}
@@ -182,7 +182,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
TestUtils.createTopic(this.zkClient, "part-test", numParts, 1, this.servers)
val parts = this.consumers(0).partitionsFor("part-test")
assertNotNull(parts)
- assertEquals(2, parts.length)
+ assertEquals(2, parts.size)
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
@@ -199,9 +199,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNotNull(topics)
assertEquals(5, topics.size())
assertEquals(5, topics.keySet().size())
- assertEquals(2, topics.get(topic1).length)
- assertEquals(2, topics.get(topic2).length)
- assertEquals(2, topics.get(topic3).length)
+ assertEquals(2, topics.get(topic1).size)
+ assertEquals(2, topics.get(topic2).size)
+ assertEquals(2, topics.get(topic3).size)
}
def testPartitionReassignmentCallback() {
@@ -216,9 +216,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.poll(50)
// get metadata for the topic
- var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
+ var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
while(parts == null)
- parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
+ parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName).asScala
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())
@@ -256,6 +256,42 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
}
}
+ def testExpandingTopicSubscriptions() {
+ val otherTopic = "other"
+ val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+ this.consumers(0).subscribe(topic)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).subscriptions == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+
+ TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+ this.consumers(0).subscribe(otherTopic)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).subscriptions == expandedSubscriptions.asJava
+ }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+ }
+
+ def testShrinkingTopicSubscriptions() {
+ val otherTopic = "other"
+ TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
+ val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+ val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
+ this.consumers(0).subscribe(topic, otherTopic)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).subscriptions == subscriptions.asJava
+ }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+
+ this.consumers(0).unsubscribe(otherTopic)
+ TestUtils.waitUntilTrue(() => {
+ this.consumers(0).poll(50)
+ this.consumers(0).subscriptions == shrunkenSubscriptions.asJava
+ }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+ }
+
def testPartitionPauseAndResume() {
sendRecords(5)
this.consumers(0).subscribe(tp)
@@ -314,7 +350,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
val maxIters = numRecords * 300
var iters = 0
while (records.size < numRecords) {
- for (record <- consumer.poll(50))
+ for (record <- consumer.poll(50).asScala)
records.add(record)
if(iters > maxIters)
throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")