You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/26 16:44:33 UTC

[GitHub] mgodave closed pull request #1008: Allow user to specify listener on subscriber or creation of reader

mgodave closed pull request #1008: Allow user to specify listener on subscriber or creation of reader
URL: https://github.com/apache/incubator-pulsar/pull/1008
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index c861fdd00..045a5fb2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -28,13 +28,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.RawReader;
-import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
@@ -63,7 +57,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Co
         consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
 
         consumer = new RawConsumerImpl(client, topic, subscription, consumerConfiguration,
-                                       consumerFuture);
+                                       consumerFuture, null);
     }
 
     @Override
@@ -86,10 +80,10 @@ public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Co
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
 
         RawConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-                        CompletableFuture<Consumer> consumerFuture) {
+                        CompletableFuture<Consumer> consumerFuture, MessageListener listener) {
             super(client, topic, subscription, conf,
                   client.externalExecutorProvider().getExecutor(), -1, consumerFuture,
-                  SubscriptionMode.Durable, MessageId.earliest);
+                  SubscriptionMode.Durable, MessageId.earliest, listener);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index dbe454daa..979c8e890 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -320,7 +320,7 @@ public void testSillyUser() throws Exception {
         ConsumerConfiguration consumerConf = new ConsumerConfiguration();
 
         try {
-            consumer = pulsarClient.subscribe(dn.toString(), "my-subscriber-name", null);
+            consumer = pulsarClient.subscribe(dn.toString(), "my-subscriber-name", (ConsumerConfiguration) null);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 4200d925d..a8b1858e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -437,7 +437,7 @@ public void testSillyUser() {
 
         try {
             Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic7",
-                    "my-subscriber-name", null);
+                    "my-subscriber-name", (ConsumerConfiguration) null);
             Assert.fail("Should fail");
         } catch (PulsarClientException e) {
             Assert.assertTrue(e instanceof PulsarClientException.InvalidConfigurationException);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index ee0b37846..b128abebe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -102,13 +102,20 @@ public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionTy
     }
 
     /**
+     * @deprecated Use a suitable {@link PulsarClient} subscribe method and provide a {@link MessageListener}, ex:
+     * {@link PulsarClient#subscribe(String, String, MessageListener)}
+     *
      * @return the configured {@link MessageListener} for the consumer
      */
+    @Deprecated
     public MessageListener getMessageListener() {
         return this.messageListener;
     }
 
     /**
+     * @deprecated Use a suitable {@link PulsarClient} subscribe method and provide a {@link MessageListener}, ex:
+     * {@link PulsarClient#subscribe(String, String, MessageListener)}
+     *
      * Sets a {@link MessageListener} for the consumer
      * <p>
      * When a {@link MessageListener} is set, application will receive messages through it. Calls to
@@ -117,6 +124,7 @@ public MessageListener getMessageListener() {
      * @param messageListener
      *            the listener object
      */
+    @Deprecated
     public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
         checkNotNull(messageListener);
         this.messageListener = messageListener;
@@ -152,7 +160,7 @@ public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader)
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
      * 
-     * @param The consumer action
+     * @param action consumer action
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
         cryptoFailureAction = action;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index e15db40a6..d31cdcd30 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -122,6 +122,21 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     Consumer subscribe(String topic, String subscription) throws PulsarClientException;
 
+    /**
+     * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     * @throws InterruptedException
+     */
+    Consumer subscribe(String topic, String subscription, MessageListener listener) throws PulsarClientException;
+
     /**
      * Asynchronously subscribe to the given topic and subscription combination using default
      * {@code ConsumerConfiguration}
@@ -134,6 +149,21 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     CompletableFuture<Consumer> subscribeAsync(String topic, String subscription);
 
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using default
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The topic name
+     * @param subscription
+     *            The subscription name
+     *
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, MessageListener listener);
+
     /**
      * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
      *
@@ -148,6 +178,22 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException;
 
+    /**
+     * Subscribe to the given topic and subscription combination with given {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     */
+    Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf, MessageListener listener) throws PulsarClientException;
+
     /**
      * Asynchronously subscribe to the given topic and subscription combination using given
      * {@code ConsumerConfiguration}
@@ -162,6 +208,22 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf);
 
+    /**
+     * Asynchronously subscribe to the given topic and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topic
+     *            The name of the topic
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf, MessageListener listener);
+
     /**
      * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
      * <p>
@@ -188,6 +250,34 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException;
 
+    /**
+     * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return The {@code Reader} object
+     */
+    Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf, ReaderListener listener) throws PulsarClientException;
+
     /**
      * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
      * specified topic.
@@ -215,6 +305,35 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      */
     CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf);
 
+    /**
+     * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the
+     * specified topic.
+     * <p>
+     * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a
+     * subscription. Reader can only work on non-partitioned topics.
+     * <p>
+     * The initial reader positioning is done by specifying a message id. The options are:
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+     * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published after the
+     * reader was created
+     * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on that
+     * specific position. The first message to be read will be the message next to the specified messageId.
+     * </ul>
+     *
+     * @param topic
+     *            The name of the topic where to read
+     * @param startMessageId
+     *            The message id where the reader will position itself. The first message returned will be the one after
+     *            the specified startMessageId
+     * @param conf
+     *            The {@code ReaderConfiguration} object
+     * @param listener
+     *            A listener that will be called in order for every message received
+     * @return Future of the asynchronously created producer object
+     */
+    CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf, ReaderListener listener);
+
     /**
      * Close the PulsarClient and release all the resources.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
index 999e2e60a..888a49b0c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -35,13 +35,20 @@
     private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
 
     /**
+     * @deprecated Use {@link PulsarClient#createReader(String, MessageId, ReaderConfiguration, ReaderListener)} or
+     * {@link PulsarClient#createReaderAsync(String, MessageId, ReaderConfiguration, ReaderListener)} to add a listener.
+     *
      * @return the configured {@link ReaderListener} for the reader
      */
+    @Deprecated
     public ReaderListener getReaderListener() {
         return this.readerListener;
     }
 
     /**
+     * @deprecated Use {@link PulsarClient#createReader(String, MessageId, ReaderConfiguration, ReaderListener)} or
+     * {@link PulsarClient#createReaderAsync(String, MessageId, ReaderConfiguration, ReaderListener)} to add a listener.
+     *
      * Sets a {@link ReaderListener} for the reader
      * <p>
      * When a {@link ReaderListener} is set, application will receive messages through it. Calls to
@@ -50,6 +57,7 @@ public ReaderListener getReaderListener() {
      * @param readerListener
      *            the listener object
      */
+    @Deprecated
     public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
         checkNotNull(readerListener);
         this.readerListener = readerListener;
@@ -85,7 +93,7 @@ public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
     /**
      * Sets the ConsumerCryptoFailureAction to the value specified
      * 
-     * @param The consumer action
+     * @param action consumer action
      */
     public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
         cryptoFailureAction = action;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 96e2a84a7..cbba3026a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -58,14 +58,15 @@
     protected final int maxReceiverQueueSize;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-            int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
+            int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture,
+            MessageListener listener) {
         super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS));
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = subscription;
         this.conf = conf;
         this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
         this.subscribeFuture = subscribeFuture;
-        this.listener = conf.getMessageListener();
+        this.listener = listener;
         if (receiverQueueSize <= 1) {
             this.incomingMessages = Queues.newArrayBlockingQueue(1);
         } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c47c17fb4..984960ee7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -43,13 +43,7 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -119,15 +113,16 @@
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture) {
+            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture,
+                 MessageListener listener) {
         this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture,
-                SubscriptionMode.Durable, null);
+                SubscriptionMode.Durable, null, listener);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture,
-            SubscriptionMode subscriptionMode, MessageId startMessageId) {
-        super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
+            SubscriptionMode subscriptionMode, MessageId startMessageId, MessageListener listener) {
+        super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, listener);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
         this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index bc80b9089..9a7335767 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -33,12 +33,7 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -65,9 +60,10 @@
     private final UnAckedMessageTracker unAckedMessageTracker;
 
     PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
-            int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
+        int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture,
+        MessageListener listener) {
         super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor,
-                subscribeFuture);
+                subscribeFuture, listener);
         this.consumers = Lists.newArrayListWithCapacity(numPartitions);
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
@@ -92,7 +88,8 @@ private void start() {
         for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) {
             String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
             ConsumerImpl consumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<Consumer>());
+                    client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<>(),
+                    internalConfig.getMessageListener());
             consumers.add(consumer);
             consumer.subscribeFuture().handle((cons, subscribeException) -> {
                 if (subscribeException != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9f108211e..9016f3e99 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
-
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -30,16 +29,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -196,15 +186,25 @@ public Producer createProducer(final String destination, final ProducerConfigura
     }
 
     @Override
-    public Consumer subscribe(final String topic, final String subscription) throws PulsarClientException {
-        return subscribe(topic, subscription, new ConsumerConfiguration());
+    public Consumer subscribe(String topic, String subscription) throws PulsarClientException {
+        return subscribe(topic, subscription, (MessageListener) null);
+    }
+
+    @Override
+    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf) throws PulsarClientException {
+        return subscribe(topic, subscription, conf, conf != null ? conf.getMessageListener() : null);
+    }
+
+    @Override
+    public Consumer subscribe(final String topic, final String subscription, MessageListener listener) throws PulsarClientException {
+        return subscribe(topic, subscription, new ConsumerConfiguration(), listener);
     }
 
     @Override
-    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf)
+    public Consumer subscribe(String topic, String subscription, ConsumerConfiguration conf, MessageListener listener)
             throws PulsarClientException {
         try {
-            return subscribeAsync(topic, subscription, conf).get();
+            return subscribeAsync(topic, subscription, conf, listener).get();
         } catch (ExecutionException e) {
             Throwable t = e.getCause();
             if (t instanceof PulsarClientException) {
@@ -220,12 +220,22 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription) {
-        return subscribeAsync(topic, subscription, new ConsumerConfiguration());
+        return subscribeAsync(topic, subscription, (MessageListener) null);
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, ConsumerConfiguration conf) {
+        return subscribeAsync(topic, subscription, conf, conf != null ? conf.getMessageListener() : null);
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription, MessageListener listener) {
+        return subscribeAsync(topic, subscription, new ConsumerConfiguration(), listener);
     }
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
-            final ConsumerConfiguration conf) {
+            final ConsumerConfiguration conf, MessageListener listener) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
@@ -253,10 +263,10 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
                 consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, topic, subscription, conf,
-                        metadata.partitions, listenerThread, consumerSubscribedFuture);
+                        metadata.partitions, listenerThread, consumerSubscribedFuture, listener);
             } else {
                 consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, -1,
-                        consumerSubscribedFuture);
+                        consumerSubscribedFuture, listener);
             }
 
             synchronized (consumers) {
@@ -272,10 +282,15 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
     }
 
     @Override
-    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
+    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException {
+        return createReader(topic, startMessageId, conf, conf != null ? conf.getReaderListener() : null);
+    }
+
+    @Override
+    public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf, ReaderListener listener)
             throws PulsarClientException {
         try {
-            return createReaderAsync(topic, startMessageId, conf).get();
+            return createReaderAsync(topic, startMessageId, conf, listener).get();
         } catch (ExecutionException e) {
             Throwable t = e.getCause();
             if (t instanceof PulsarClientException) {
@@ -289,9 +304,14 @@ public Reader createReader(String topic, MessageId startMessageId, ReaderConfigu
         }
     }
 
+    @Override
+    public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf) {
+        return createReaderAsync(topic, startMessageId, conf, conf != null ? conf.getReaderListener() : null);
+    }
+
     @Override
     public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
-            ReaderConfiguration conf) {
+            ReaderConfiguration conf, ReaderListener listener) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
@@ -324,7 +344,7 @@ public Reader createReader(String topic, MessageId startMessageId, ReaderConfigu
             // gets the next single threaded executor from the list of executors
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
             ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, topic, startMessageId, conf, listenerThread,
-                    consumerSubscribedFuture);
+                    consumerSubscribedFuture, listener);
 
             synchronized (consumers) {
                 consumers.put(reader.getConsumer(), Boolean.TRUE);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 5b50ef523..85f6f0e2c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -43,7 +43,7 @@
 
     public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageId,
             ReaderConfiguration readerConfiguration, ExecutorService listenerExecutor,
-            CompletableFuture<Consumer> consumerFuture) {
+            CompletableFuture<Consumer> consumerFuture, ReaderListener readerListener) {
 
         String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
 
@@ -55,7 +55,6 @@ public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageI
         }
 
         if (readerConfiguration.getReaderListener() != null) {
-            ReaderListener readerListener = readerConfiguration.getReaderListener();
             consumerConfiguration.setMessageListener(new MessageListener() {
                 private static final long serialVersionUID = 1L;
 
@@ -78,7 +77,7 @@ public void reachedEndOfTopic(Consumer consumer) {
         }
 
         consumer = new ConsumerImpl(client, topic, subscription, consumerConfiguration, listenerExecutor, -1,
-                consumerFuture, SubscriptionMode.NonDurable, startMessageId);
+                consumerFuture, SubscriptionMode.NonDurable, startMessageId, consumerConfiguration.getMessageListener());
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services