You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/06 18:45:19 UTC

kafka git commit: KAFKA-5726; KafkaConsumer.subscribe() overload that takes just Pattern

Repository: kafka
Updated Branches:
  refs/heads/trunk 510257646 -> 23d01c805


KAFKA-5726; KafkaConsumer.subscribe() overload that takes just Pattern

- changed the interface & implementations
- updated tests to use the new method where applicable

Author: Attila Kreiner <at...@kreiner.hu>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3669 from attilakreiner/KAFKA-5726


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23d01c80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23d01c80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23d01c80

Branch: refs/heads/trunk
Commit: 23d01c805bef7504abfa83ecac7e384d121a583a
Parents: 5102576
Author: Attila Kreiner <at...@kreiner.hu>
Authored: Wed Sep 6 11:41:33 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Sep 6 11:41:33 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java  |  5 +++++
 .../kafka/clients/consumer/KafkaConsumer.java    | 19 +++++++++++++++++++
 .../kafka/clients/consumer/MockConsumer.java     |  5 +++++
 .../clients/consumer/KafkaConsumerTest.java      |  6 +++---
 .../main/scala/kafka/consumer/BaseConsumer.scala |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala    | 14 +++++++-------
 6 files changed, 40 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index b1badef..0e27e1f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -66,6 +66,11 @@ public interface Consumer<K, V> extends Closeable {
     public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
 
     /**
+    * @see KafkaConsumer#subscribe(Pattern)
+    */
+    public void subscribe(Pattern pattern);
+
+    /**
      * @see KafkaConsumer#unsubscribe()
      */
     public void unsubscribe();

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 073b2df..1cdb132 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -953,6 +953,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
+     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
+     * The pattern matching will be done periodically against topics existing at the time of check.
+     *
+     * <p>
+     * This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
+     * uses a noop listener. If you need the ability to seek to particular offsets, you should prefer
+     * {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+     * to be reset. You should also provide your own listener if you are doing your own offset
+     * management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
+     *
+     * @param pattern Pattern to subscribe to
+     * @throws IllegalArgumentException If pattern is null
+     */
+    @Override
+    public void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    /**
      * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
      * also clears any partitions directly assigned through {@link #assign(Collection)}.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 91cb6f1..9b0c058 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -111,6 +111,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public synchronized void subscribe(Pattern pattern) {
+        subscribe(pattern, new NoOpConsumerRebalanceListener());
+    }
+
+    @Override
     public synchronized void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
         ensureNotClosed();
         this.subscriptions.subscribe(new HashSet<>(topics), listener);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index eed012e..c5e2213 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerMetrics;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -191,9 +190,10 @@ public class KafkaConsumerTest {
     @Test(expected = IllegalArgumentException.class)
     public void testSubscriptionOnNullTopicCollection() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
+        List<String> nullList = null;
 
         try {
-            consumer.subscribe(null);
+            consumer.subscribe(nullList);
         } finally {
             consumer.close();
         }
@@ -229,7 +229,7 @@ public class KafkaConsumerTest {
         Pattern pattern = null;
 
         try {
-            consumer.subscribe(pattern, new NoOpConsumerRebalanceListener());
+            consumer.subscribe(pattern);
         } finally {
             consumer.close();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index cec74d0..2c53258 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -73,7 +73,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
       case (Some(topic), None, None, None) =>
         consumer.subscribe(Collections.singletonList(topic))
       case (None, None, None, Some(whitelist)) =>
-        consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
+        consumer.subscribe(Pattern.compile(whitelist))
       case _ =>
         throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
             "Exactly one of 'topic' or 'whitelist' must be provided. " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d01c80/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 67d15b3..ccb2719 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -623,7 +623,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
     val consumer = consumers.head
-    consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    consumer.subscribe(Pattern.compile(topicPattern))
     try {
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
@@ -647,14 +647,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource)
     val consumer = consumers.head
-    consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+    consumer.subscribe(Pattern.compile(topicPattern))
     consumeRecords(consumer)
 
     // set the subscription pattern to an internal topic that the consumer has read permission to. Since
     // internal topics are not included, we should not be assigned any partitions from this topic
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)),  new Resource(Topic,
       GROUP_METADATA_TOPIC_NAME))
-    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
+    consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
     consumer.poll(0)
     assertTrue(consumer.subscription().isEmpty)
     assertTrue(consumer.assignment().isEmpty)
@@ -675,14 +675,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
     try {
       // ensure that internal topics are not included if no permission
-      consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+      consumer.subscribe(Pattern.compile(".*"))
       consumeRecords(consumer)
       assertEquals(Set(topic).asJava, consumer.subscription)
 
       // now authorize the user for the internal topic and verify that we can subscribe
       addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic,
         GROUP_METADATA_TOPIC_NAME))
-      consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener)
+      consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME))
       consumer.poll(0)
       assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala)
     } finally consumer.close()
@@ -704,7 +704,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
     try {
-      consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+      consumer.subscribe(Pattern.compile(".*"))
       // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException
       consumeRecords(consumer)
       consumeRecords(consumer)
@@ -728,7 +728,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group,
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
     try {
-      consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener)
+      consumer.subscribe(Pattern.compile(topicPattern))
       consumeRecords(consumer)
     } finally consumer.close()
 }