You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/22 19:39:02 UTC

[incubator-pulsar] branch master updated: Issue #1237: support builder for topicsConsumer (#1269)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 68cd115  Issue #1237: support builder for topicsConsumer (#1269)
68cd115 is described below

commit 68cd1154e9581bb8503eeee2da5a8e1d44336b25
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Thu Feb 22 11:38:59 2018 -0800

    Issue #1237: support builder for topicsConsumer (#1269)
    
    * support builder for topicsConsumer
    
    * change following @Matteo's comments to remove subscribe methods in client
    
    * change to topic(String ... names) for consumer builder
    
    * change following @sijie's comments
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 197 +++++++++++++++------
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  16 +-
 .../org/apache/pulsar/client/api/PulsarClient.java |  57 ------
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  41 ++++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  32 ----
 5 files changed, 186 insertions(+), 157 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index a2cb6d5..2aa386e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -33,12 +33,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -82,12 +79,13 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
         try {
-            Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+            Consumer consumer = pulsarClient.newConsumer()
+                .topics(topicNames)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
             fail("subscribe for topics from different namespace should fail.");
         } catch (IllegalArgumentException e) {
             // expected for have different namespace
@@ -103,18 +101,21 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
         final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
         final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
-        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2);
 
         admin.properties().createProperty("prop", new PropertyAdmin());
         admin.persistentTopics().createPartitionedTopic(topicName2, 2);
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
-        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        Consumer consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .topic(topicName3)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
         assertTrue(consumer instanceof TopicsConsumerImpl);
 
         List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
@@ -148,20 +149,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName2, 2);
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
         // 1. producer connect
-        Producer producer1 = pulsarClient.createProducer(topicName1);
-        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
-        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
-        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        Consumer consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
         assertTrue(consumer instanceof TopicsConsumerImpl);
 
         // 3. producer publish messages
@@ -205,20 +210,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName2, 2);
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
         // 1. producer connect
-        Producer producer1 = pulsarClient.createProducer(topicName1);
-        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
-        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
-        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        Consumer consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
         assertTrue(consumer instanceof TopicsConsumerImpl);
 
         // Asynchronously produce messages
@@ -280,20 +289,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName2, 2);
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
         // 1. producer connect
-        Producer producer1 = pulsarClient.createProducer(topicName1);
-        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
-        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
-        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        Consumer consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
         assertTrue(consumer instanceof TopicsConsumerImpl);
 
         // 3. producer publish messages
@@ -416,20 +429,24 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName2, 2);
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
-        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
-
         // 1. producer connect
-        Producer producer1 = pulsarClient.createProducer(topicName1);
-        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
-        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
 
         // 2. Create consumer
-        ConsumerConfiguration conf = new ConsumerConfiguration();
-        conf.setReceiverQueueSize(4);
-        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
-        conf.setSubscriptionType(SubscriptionType.Shared);
-        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        Consumer consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
         assertTrue(consumer instanceof TopicsConsumerImpl);
 
         // 3. producer publish messages
@@ -519,4 +536,68 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         producer3.close();
     }
 
+
+    @Test(timeOut = testTimeout)
+    public void testTopicsNameSubscribeWithBuilderFail() throws Exception {
+        String key = "TopicsNameSubscribeWithBuilder";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // test failing builder with empty topics
+        try {
+            Consumer consumer1 = pulsarClient.newConsumer()
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe1 with no topicName should fail.");
+        } catch (PulsarClientException e) {
+            // expected
+        }
+
+        try {
+            Consumer consumer2 = pulsarClient.newConsumer()
+                .topic()
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe2 with no topicName should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        try {
+            Consumer consumer3 = pulsarClient.newConsumer()
+                .topics(null)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe3 with no topicName should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        try {
+            Consumer consumer4 = pulsarClient.newConsumer()
+                .topics(Lists.newArrayList())
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe4 with no topicName should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index a2c3c81..be97ab6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -77,13 +78,22 @@ public interface ConsumerBuilder extends Serializable, Cloneable {
     CompletableFuture<Consumer> subscribeAsync();
 
     /**
-     * Specify the topic this consumer will subscribe on.
+     * Specify the topics this consumer will subscribe on.
      * <p>
      * This argument is required when constructing the consumer.
      *
-     * @param topicName
+     * @param topicNames
      */
-    ConsumerBuilder topic(String topicName);
+    ConsumerBuilder topic(String... topicNames);
+
+    /**
+     * Specify a list of topics that this consumer will subscribe on.
+     * <p>
+     * This argument is required when constructing the consumer.
+     *
+     * @param topicNames
+     */
+    ConsumerBuilder topics(List<String> topicNames);
 
     /**
      * Specify the subscription name for this consumer.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 8de76f4..6ba2518 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -321,60 +320,4 @@ public interface PulsarClient extends Closeable {
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
-
-
-    /**
-     * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
-     *
-     * @param topics
-     *            The collection of topic names, they should be under same namespace
-     * @param subscription
-     *            The name of the subscription
-     * @return The {@code Consumer} object
-     * @throws PulsarClientException
-     */
-    Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException;
-
-    /**
-     * Asynchronously subscribe to the given topics and subscription combination with
-     * default {@code ConsumerConfiguration}
-     *
-     * @param topics
-     *            The collection of topic names, they should be under same namespace
-     * @param subscription
-     *            The name of the subscription
-     * @return Future of the {@code Consumer} object
-     */
-    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription);
-
-    /**
-     * Subscribe to the given topics and subscription combination using given {@code ConsumerConfiguration}
-     *
-     * @param topics
-     *            The collection of topic names, they should be under same namespace
-     * @param subscription
-     *            The name of the subscription
-     * @param conf
-     *            The {@code ConsumerConfiguration} object
-     * @return Future of the {@code Consumer} object
-     */
-    Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf)
-        throws PulsarClientException;
-
-    /**
-     * Asynchronously subscribe to the given topics and subscription combination using given
-     * {@code ConsumerConfiguration}
-     *
-     * @param topics
-     *            The collection of topic names, they should be under same namespace
-     * @param subscription
-     *            The name of the subscription
-     * @param conf
-     *            The {@code ConsumerConfiguration} object
-     * @return Future of the {@code Consumer} object
-     */
-    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
-                                               String subscription,
-                                               ConsumerConfiguration conf);
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index ab91326..66cd291 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +46,9 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
     private static final long serialVersionUID = 1L;
 
     private final PulsarClientImpl client;
-    private String topicName;
     private String subscriptionName;
     private final ConsumerConfiguration conf;
+    private Set<String> topicNames;
 
     ConsumerBuilderImpl(PulsarClientImpl client) {
         this.client = client;
@@ -77,22 +83,43 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync() {
-        if (topicName == null) {
+        if (topicNames == null || topicNames.isEmpty()) {
             return FutureUtil
-                    .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
+                    .failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder"));
         }
 
         if (subscriptionName == null) {
             return FutureUtil.failedFuture(
-                    new IllegalArgumentException("Subscription name must be set on the producer builder"));
+                    new IllegalArgumentException("Subscription name must be set on the consumer builder"));
         }
 
-        return client.subscribeAsync(topicName, subscriptionName, conf);
+        if (topicNames.size() == 1) {
+            return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf);
+        } else {
+            return client.subscribeAsync(topicNames, subscriptionName, conf);
+        }
     }
 
     @Override
-    public ConsumerBuilder topic(String topicName) {
-        this.topicName = topicName;
+    public ConsumerBuilder topic(String... topicNames) {
+        checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
+        if (this.topicNames == null) {
+            this.topicNames = Sets.newHashSet(topicNames);
+        } else {
+            this.topicNames.addAll(Lists.newArrayList(topicNames));
+        }
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder topics(List<String> topicNames) {
+        checkArgument(topicNames != null && !topicNames.isEmpty(),
+            "Passed in topicNames list should not be empty.");
+        if (this.topicNames == null) {
+            this.topicNames = Sets.newHashSet();
+        }
+        this.topicNames.addAll(topicNames);
+
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 5d7aab8..c065532 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -308,38 +308,6 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerSubscribedFuture;
     }
 
-
-    @Override
-    public Consumer subscribe(Collection<String> topics, final String subscription) throws PulsarClientException {
-        return subscribe(topics, subscription, new ConsumerConfiguration());
-    }
-
-    @Override
-    public Consumer subscribe(Collection<String> topics,
-                              String subscription,
-                              ConsumerConfiguration conf)
-        throws PulsarClientException {
-        try {
-            return subscribeAsync(topics, subscription, conf).get();
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof PulsarClientException) {
-                throw (PulsarClientException) t;
-            } else {
-                throw new PulsarClientException(t);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription) {
-        return subscribeAsync(topics, subscription, new ConsumerConfiguration());
-    }
-
-    @Override
     public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
                                                       String subscription,
                                                       ConsumerConfiguration conf) {

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.