You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/06 18:45:19 UTC
kafka git commit: KAFKA-5726;
KafkaConsumer.subscribe() overload that takes just Pattern
Repository: kafka
Updated Branches:
refs/heads/trunk 510257646 -> 23d01c805
KAFKA-5726; KafkaConsumer.subscribe() overload that takes just Pattern
- changed the interface & implementations
- updated tests to use the new method where applicable
Author: Attila Kreiner <at...@kreiner.hu>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #3669 from attilakreiner/KAFKA-5726
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23d01c80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23d01c80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23d01c80
Branch: refs/heads/trunk
Commit: 23d01c805bef7504abfa83ecac7e384d121a583a
Parents: 5102576
Author: Attila Kreiner <at...@kreiner.hu>
Authored: Wed Sep 6 11:41:33 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Sep 6 11:41:33 2017 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 5 +++++
.../kafka/clients/consumer/KafkaConsumer.java | 19 +++++++++++++++++++
.../kafka/clients/consumer/MockConsumer.java | 5 +++++
.../clients/consumer/KafkaConsumerTest.java | 6 +++---
.../main/scala/kafka/consumer/BaseConsumer.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 14 +++++++-------
6 files changed, 40 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b1badef..0e27e1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -66,6 +66,11 @@ public interface Consumer<K, V> extends Closeable {
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
/**
+ * @see KafkaConsumer#subscribe(Pattern)
+ */
+ public void subscribe(Pattern pattern);
+
+ /**
* @see KafkaConsumer#unsubscribe()
*/
public void unsubscribe();
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 073b2df..1cdb132 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -953,6 +953,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+ * The pattern matching will be done periodically against topics existing at the time of check.
+ *
+ * <p>
+ * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
+ * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer
+ * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+ * to be reset. You should also provide your own listener if you are doing your own offset
+ * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
+ *
+ * @param pattern Pattern to subscribe to
+ * @throws IllegalArgumentException If pattern is null
+ */
+ @Override
+ public void subscribe(Pattern pattern) {
+ subscribe(pattern, new NoOpConsumerRebalanceListener());
+ }
+
+ /**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
* also clears any partitions directly assigned through {@link #assign(Collection)}.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 91cb6f1..9b0c058 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -111,6 +111,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public synchronized void subscribe(Pattern pattern) {
+ subscribe(pattern, new NoOpConsumerRebalanceListener());
+ }
+
+ @Override
public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(new HashSet<>(topics), listener);
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index eed012e..c5e2213 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
@@ -191,9 +190,10 @@ public class KafkaConsumerTest {
@Test(expected = IllegalArgumentException.class)
public void testSubscriptionOnNullTopicCollection() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+ List<String> nullList = null;
try {
- consumer.subscribe(null);
+ consumer.subscribe(nullList);
} finally {
consumer.close();
}
@@ -229,7 +229,7 @@ public class KafkaConsumerTest {
Pattern pattern = null;
try {
- consumer.subscribe(pattern, new NoOpConsumerRebalanceListener());
+ consumer.subscribe(pattern);
} finally {
consumer.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index cec74d0..2c53258 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -73,7 +73,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
case (Some(topic), None, None, None) =>
consumer.subscribe(Collections.singletonList(topic))
case (None, None, None, Some(whitelist)) =>
- consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
+ consumer.subscribe(Pattern.compile(whitelist))
case _ =>
throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
"Exactly one of 'topic' or 'whitelist' must be provided. " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 67d15b3..ccb2719 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -623,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
val consumer = consumers.head
- consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(topicPattern))
try {
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
@@ -647,14 +647,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
val consumer = consumers.head
- consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(topicPattern))
consumeRecords(consumer)
// set the subscription pattern to an internal topic that the consumer has read permission to. Since
// internal topics are not included, we should not be assigned any partitions from this topic
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic,
GROUP_METADATA_TOPIC_NAME))
- consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertTrue(consumer.subscription().isEmpty)
assertTrue(consumer.assignment().isEmpty)
@@ -675,14 +675,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
try {
// ensure that internal topics are not included if no permission
- consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(".*"))
consumeRecords(consumer)
assertEquals(Set(topic).asJava, consumer.subscription)
// now authorize the user for the internal topic and verify that we can subscribe
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic,
GROUP_METADATA_TOPIC_NAME))
- consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
consumer.poll(0)
assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
} finally consumer.close()
@@ -704,7 +704,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
try {
- consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(".*"))
// It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException
consumeRecords(consumer)
consumeRecords(consumer)
@@ -728,7 +728,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
try {
- consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(topicPattern))
consumeRecords(consumer)
} finally consumer.close()
}