You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 11:55:21 UTC

[pulsar] branch branch-2.6 updated (a4ab408 -> d789bfa)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from a4ab408  [pulsar-broker] capture stats with precise backlog (#8928)
     new d346552  Intercept beforeSendMessage calls (#8932)
     new fa01b59  [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
     new d789bfa  [C++] Add consumer's configs for reader (#8905)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../pulsar/broker/intercept/BrokerInterceptor.java |  26 +++-
 .../BrokerInterceptorWithClassLoader.java          |  12 ++
 .../broker/intercept/BrokerInterceptors.java       |  17 +++
 .../broker/service/AbstractBaseDispatcher.java     |  19 ++-
 .../apache/pulsar/broker/service/Subscription.java |   3 +
 .../nonpersistent/NonPersistentSubscription.java   |   6 +
 .../service/persistent/PersistentSubscription.java |   6 +
 .../broker/intercept/BrokerInterceptorTest.java    |  30 ++++-
 .../broker/intercept/CounterBrokerInterceptor.java |  18 +++
 .../include/pulsar/ReaderConfiguration.h           | 100 +++++++++++++++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  85 +++++++++++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  21 +--
 pulsar-client-cpp/lib/ReaderImpl.cc                |  29 ++++-
 pulsar-client-cpp/lib/ReaderImpl.h                 |  10 +-
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 141 +++++++++++++++++++++
 pulsar-client-cpp/tests/ReaderTest.cc              |  57 +++++++++
 16 files changed, 555 insertions(+), 25 deletions(-)
 create mode 100644 pulsar-client-cpp/tests/ReaderConfigurationTest.cc


[pulsar] 01/03: Intercept beforeSendMessage calls (#8932)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3465529d33f0bf35fe837444e2216085eecbc5f
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Dec 14 16:26:16 2020 -0800

    Intercept beforeSendMessage calls (#8932)
    
    *Motivation*
    
    Currently the message metadata headers are deserialized for filtering
    out the entries to dispatch. Add a `beforeSendMessage` method to
    intercept entries before sending them to consumers
    
    (cherry picked from commit f76655a9d23d1130cb89e4bf5431722f470efbf4)
---
 .../pulsar/broker/intercept/BrokerInterceptor.java | 26 +++++++++++++++----
 .../BrokerInterceptorWithClassLoader.java          | 12 +++++++++
 .../broker/intercept/BrokerInterceptors.java       | 17 ++++++++++++
 .../broker/service/AbstractBaseDispatcher.java     | 19 ++++++++++----
 .../apache/pulsar/broker/service/Subscription.java |  3 +++
 .../nonpersistent/NonPersistentSubscription.java   |  6 +++++
 .../service/persistent/PersistentSubscription.java |  6 +++++
 .../broker/intercept/BrokerInterceptorTest.java    | 30 +++++++++++++++++++++-
 .../broker/intercept/CounterBrokerInterceptor.java | 18 +++++++++++++
 9 files changed, 126 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 164f757..f1bbb2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -19,16 +19,18 @@
 package org.apache.pulsar.broker.intercept;
 
 import com.google.common.annotations.Beta;
+import java.io.IOException;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.intercept.InterceptException;
 
-import javax.servlet.ServletException;
-import javax.servlet.ServletRequest;
-import javax.servlet.ServletResponse;
-import java.io.IOException;
-
 /**
  * A plugin interface that allows you to intercept the
  * client requests to the Pulsar brokers.
@@ -40,6 +42,20 @@ import java.io.IOException;
 public interface BrokerInterceptor extends AutoCloseable {
 
     /**
+     * Intercept messages before sending them to the consumers.
+     *
+     * @param subscription pulsar subscription
+     * @param entry entry
+     * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
+     * @param msgMetadata message metadata. The message metadata will be recycled after this call.
+     */
+    default void beforeSendMessage(Subscription subscription,
+                                   Entry entry,
+                                   long[] ackSet,
+                                   MessageMetadata msgMetadata) {
+    }
+
+    /**
      * Called by the broker while new command incoming.
      */
     void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index 00c960c..68710cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.broker.intercept;
 import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.nar.NarClassLoader;
 
@@ -44,6 +47,15 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
     private final NarClassLoader classLoader;
 
     @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata) {
+        this.interceptor.beforeSendMessage(
+            subscription, entry, ackSet, msgMetadata);
+    }
+
+    @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
         this.interceptor.onPulsarCommand(command, cnx);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 6f0b7b8..77cca6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.broker.intercept;
 
 import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.intercept.InterceptException;
 
 import javax.servlet.ServletException;
@@ -86,6 +89,20 @@ public class BrokerInterceptors implements BrokerInterceptor {
     }
 
     @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata) {
+        for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+            value.beforeSendMessage(
+                subscription,
+                entry,
+                ackSet,
+                msgMetadata);
+        }
+    }
+
+    @Override
     public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
         for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
             value.onPulsarCommand(command, cnx);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7cf9793..29ac54b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service;
 
 import io.netty.buffer.ByteBuf;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
@@ -30,11 +29,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.common.api.proto.PulsarApi;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -113,14 +110,26 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 totalBytes += metadataAndPayload.readableBytes();
                 totalChunkedMessages += msgMetadata.hasChunkId() ? 1: 0;
                 batchSizes.setBatchSize(i, batchSize);
+                long[] ackSet = null;
                 if (indexesAcks != null && cursor != null) {
-                    long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
+                    ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
                     if (ackSet != null) {
                         indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
                     } else {
                         indexesAcks.setIndexesAcks(i,null);
                     }
                 }
+
+                BrokerInterceptor interceptor = subscription.interceptor();
+                if (null != interceptor) {
+                    interceptor.beforeSendMessage(
+                        subscription,
+                        entry,
+                        ackSet,
+                        msgMetadata
+                    );
+                }
+
             } finally {
                 msgMetadata.recycle();
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 85b5415..88ac442 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -25,12 +25,15 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 
 public interface Subscription {
 
+    BrokerInterceptor interceptor();
+
     Topic getTopic();
 
     String getName();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 6421bde..90be218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -77,6 +78,11 @@ public class NonPersistentSubscription implements Subscription {
     }
 
     @Override
+    public BrokerInterceptor interceptor() {
+        return this.topic.getBrokerService().getInterceptor();
+    }
+
+    @Override
     public String getName() {
         return this.subName;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 51b1e99..9958e7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -153,6 +154,11 @@ public class PersistentSubscription implements Subscription {
     }
 
     @Override
+    public BrokerInterceptor interceptor() {
+        return topic.getBrokerService().getInterceptor();
+    }
+
+    @Override
     public String getName() {
         return this.subName;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 5ff73fc..1cd10a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.broker.intercept;
 
-import org.apache.pulsar.broker.ServiceConfiguration;
+import lombok.Cleanup;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -36,6 +39,7 @@ import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 
 public class BrokerInterceptorTest extends ProducerConsumerBase {
 
@@ -109,4 +113,28 @@ public class BrokerInterceptorTest extends ProducerConsumerBase {
         // CONNECT and PRODUCER
         Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
     }
+
+    @Test
+    public void testBeforeSendMessage() throws PulsarClientException {
+        BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+        Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+            .topic("test-before-send-message")
+            .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+            .topic("test-before-send-message")
+            .subscriptionName("test")
+            .subscribe();
+
+        producer.send("hello world");
+
+        Message<String> msg = consumer.receive();
+
+        assertEquals(msg.getValue(), "hello world");
+
+        Assert.assertTrue(((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 4436850..6915101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -19,9 +19,12 @@
 package org.apache.pulsar.broker.intercept;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.intercept.InterceptException;
 
 import javax.servlet.FilterChain;
@@ -34,9 +37,20 @@ import java.io.IOException;
 @Slf4j
 public class CounterBrokerInterceptor implements BrokerInterceptor {
 
+    int beforeSendCount = 0;
     int count = 0;
 
     @Override
+    public void beforeSendMessage(Subscription subscription,
+                                  Entry entry,
+                                  long[] ackSet,
+                                  MessageMetadata msgMetadata) {
+        log.info("Send message to topic {}, subscription {}",
+            subscription.getTopic(), subscription.getName());
+        beforeSendCount++;
+    }
+
+    @Override
     public void onPulsarCommand(PulsarApi.BaseCommand command, ServerCnx cnx) throws InterceptException {
         log.info("[{}] On [{}] Pulsar command", count, command.getType().name());
         count ++;
@@ -72,4 +86,8 @@ public class CounterBrokerInterceptor implements BrokerInterceptor {
     public int getCount() {
         return count;
     }
+
+    public int getBeforeSendCount() {
+        return beforeSendCount;
+    }
 }


[pulsar] 03/03: [C++] Add consumer's configs for reader (#8905)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d789bfae206ce34b095fb9119118e58175848a65
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Dec 14 18:27:09 2020 -0600

    [C++] Add consumer's configs for reader (#8905)
    
    ### Motivation
    
    C++ reader cannot set some consumer's configs. Like `CryptoReader`, if the config is not supported for reader, then reader cannot decrypt encrypted messages. Also, reader cannot set other configs like `AckGroupingTimeMs`  so that reader cannot control the behavior of `AckGroupingTracker`.
    
    ### Modifications
    
    - Add other consumer's configs for reader, except:
        - NegativeAckRedeliveryDelayMs: reader has no chance to call `negativeAcknowledge`
        - BrokerConsumerStatsCacheTimeInMs: reader has no chance to call `getBrokerConsumerStatsAsync`
        - KeySharedPolicy and ConsumerType: reader only supports Exclusive subscription now
        - MaxTotalReceiverQueueSizeAcrossPartitions: reader doesn't support partitioned topics now
        - PatternAutoDiscoveryPeriod: reader doesn't support regex subscription now
    - Add some fields to get the consumer config before reader's internal consumer is created, then add some unit tests for the consumer config.
    
    (cherry picked from commit a3ac12e8cd255bbacd308f6e2c3186e2d3f98850)
---
 .../include/pulsar/ReaderConfiguration.h           |  93 ++++++++++++++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  76 +++++++++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  20 +--
 pulsar-client-cpp/lib/ReaderImpl.cc                |  18 +++
 pulsar-client-cpp/lib/ReaderImpl.h                 |  10 +-
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 141 +++++++++++++++++++++
 6 files changed, 347 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 25b0ba3..96b36b6 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -25,6 +25,8 @@
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
 #include <pulsar/Schema.h>
+#include <pulsar/CryptoKeyReader.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
 
 namespace pulsar {
 
@@ -112,6 +114,97 @@ class PULSAR_PUBLIC ReaderConfiguration {
     void setInternalSubscriptionName(std::string internalSubscriptionName);
     const std::string& getInternalSubscriptionName() const;
 
+    /**
+     * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
+     * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
+     * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
+     * redelivered.
+     * @param timeout in milliseconds
+     */
+    void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
+
+    /**
+     * @return the configured timeout in milliseconds for unacked messages.
+     */
+    long getUnAckedMessagesTimeoutMs() const;
+
+    void setTickDurationInMs(const uint64_t milliSeconds);
+
+    long getTickDurationInMs() const;
+
+    /**
+     * Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
+     * to broker until the time window reaches its end, or the number of grouped messages reaches
+     * limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
+     * directly sent to broker without grouping.
+     *
+     * @param ackGroupMillis time of ACK grouping window in milliseconds.
+     */
+    void setAckGroupingTimeMs(long ackGroupingMillis);
+
+    /**
+     * Get grouping time window in milliseconds.
+     *
+     * @return grouping time window in milliseconds.
+     */
+    long getAckGroupingTimeMs() const;
+
+    /**
+     * Set max number of grouped messages within one grouping time window. If it's set to a
+     * non-positive value, number of grouped messages is not limited. Default is 1000.
+     *
+     * @param maxGroupingSize max number of grouped messages with in one grouping time window.
+     */
+    void setAckGroupingMaxSize(long maxGroupingSize);
+
+    /**
+     * Get max number of grouped messages within one grouping time window.
+     *
+     * @return max number of grouped messages within one grouping time window.
+     */
+    long getAckGroupingMaxSize() const;
+
+    bool isEncryptionEnabled() const;
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ReaderConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+
+    ConsumerCryptoFailureAction getCryptoFailureAction() const;
+    ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ReaderConfiguration& setProperty(const std::string& name, const std::string& value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ReaderConfiguration& setProperties(const std::map<std::string, std::string>& properties);
+
    private:
     std::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index eb18d11..0dfdbed 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -20,6 +20,8 @@
 
 namespace pulsar {
 
+const static std::string emptyString;
+
 ReaderConfiguration::ReaderConfiguration() : impl_(std::make_shared<ReaderConfigurationImpl>()) {}
 
 ReaderConfiguration::~ReaderConfiguration() {}
@@ -76,4 +78,78 @@ const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
     return impl_->internalSubscriptionName;
 }
 
+void ReaderConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) {
+    impl_->unAckedMessagesTimeoutMs = milliSeconds;
+}
+
+long ReaderConfiguration::getUnAckedMessagesTimeoutMs() const { return impl_->unAckedMessagesTimeoutMs; }
+
+void ReaderConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+    impl_->tickDurationInMs = milliSeconds;
+}
+
+long ReaderConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
+
+void ReaderConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) {
+    impl_->ackGroupingTimeMs = ackGroupingMillis;
+}
+
+long ReaderConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; }
+
+void ReaderConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
+    impl_->ackGroupingMaxSize = maxGroupingSize;
+}
+
+long ReaderConfiguration::getAckGroupingMaxSize() const { return impl_->ackGroupingMaxSize; }
+
+bool ReaderConfiguration::isEncryptionEnabled() const { return impl_->cryptoKeyReader != nullptr; }
+
+const CryptoKeyReaderPtr ReaderConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
+
+ReaderConfiguration& ReaderConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ConsumerCryptoFailureAction ReaderConfiguration::getCryptoFailureAction() const {
+    return impl_->cryptoFailureAction;
+}
+
+ReaderConfiguration& ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
+bool ReaderConfiguration::hasProperty(const std::string& name) const {
+    const auto& properties = impl_->properties;
+    return properties.find(name) != properties.cend();
+}
+
+const std::string& ReaderConfiguration::getProperty(const std::string& name) const {
+    const auto& properties = impl_->properties;
+    const auto it = properties.find(name);
+    return (it != properties.cend()) ? (it->second) : emptyString;
+}
+
+std::map<std::string, std::string>& ReaderConfiguration::getProperties() const { return impl_->properties; }
+
+ReaderConfiguration& ReaderConfiguration::setProperty(const std::string& name, const std::string& value) {
+    auto& properties = impl_->properties;
+    auto it = properties.find(name);
+    if (it != properties.end()) {
+        it->second = value;
+    } else {
+        properties.emplace(name, value);
+    }
+    return *this;
+}
+
+ReaderConfiguration& ReaderConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (const auto& kv : properties) {
+        setProperty(kv.first, kv.second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 3b5cc8a..6f38c29 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -25,19 +25,19 @@ namespace pulsar {
 struct ReaderConfigurationImpl {
     SchemaInfo schemaInfo;
     ReaderListener readerListener;
-    bool hasReaderListener;
-    int receiverQueueSize;
+    bool hasReaderListener{false};
+    int receiverQueueSize{1000};
     std::string readerName;
     std::string subscriptionRolePrefix;
-    bool readCompacted;
+    bool readCompacted{false};
     std::string internalSubscriptionName;
-    ReaderConfigurationImpl()
-        : schemaInfo(),
-          hasReaderListener(false),
-          receiverQueueSize(1000),
-          readerName(),
-          subscriptionRolePrefix(),
-          readCompacted(false) {}
+    long unAckedMessagesTimeoutMs{0};
+    long tickDurationInMs{1000};
+    long ackGroupingTimeMs{100};
+    long ackGroupingMaxSize{1000};
+    CryptoKeyReaderPtr cryptoKeyReader;
+    ConsumerCryptoFailureAction cryptoFailureAction;
+    std::map<std::string, std::string> properties;
 };
 }  // namespace pulsar
 #endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 2ea3430..bcf707d 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -23,6 +23,12 @@
 
 namespace pulsar {
 
+namespace test {
+std::mutex readerConfigTestMutex;
+std::atomic_bool readerConfigTestEnabled{false};
+ConsumerConfiguration consumerConfigOfReader;
+}  // namespace test
+
 static ResultCallback emptyCallback;
 
 ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
@@ -35,6 +41,13 @@ void ReaderImpl::start(const MessageId& startMessageId) {
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
     consumerConf.setReadCompacted(readerConf_.isReadCompacted());
     consumerConf.setSchema(readerConf_.getSchema());
+    consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
+    consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
+    consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
+    consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
+    consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
+    consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
+    consumerConf.setProperties(readerConf_.getProperties());
 
     if (readerConf_.getReaderName().length() > 0) {
         consumerConf.setConsumerName(readerConf_.getReaderName());
@@ -57,6 +70,11 @@ void ReaderImpl::start(const MessageId& startMessageId) {
         }
     }
 
+    // get the consumer's configuration before created
+    if (test::readerConfigTestEnabled) {
+        test::consumerConfigOfReader = consumerConf.clone();
+    }
+
     consumer_ = std::make_shared<ConsumerImpl>(
         client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
         Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 4069247..f1ad387 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -29,7 +29,15 @@ class ReaderImpl;
 typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
 typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 
-class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
+namespace test {
+
+extern std::mutex readerConfigTestMutex PULSAR_PUBLIC;
+extern std::atomic_bool readerConfigTestEnabled PULSAR_PUBLIC;
+extern ConsumerConfiguration consumerConfigOfReader PULSAR_PUBLIC;
+
+}  // namespace test
+
+class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
    public:
     ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
                const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
new file mode 100644
index 0000000..6af4b4a
--- /dev/null
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This test only tests the ConsumerConfiguration used for the Reader's internal consumer.
+ * Because the ReaderConfiguration for Reader itself is meaningless.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+class NoOpsCryptoKeyReader : public CryptoKeyReader {
+   public:
+    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                        EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                         EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+};
+
+TEST(ReaderConfigurationTest, testDefaultConfig) {
+    const std::string topic = "ReaderConfigurationTest-default-config";
+    Client client(lookupUrl);
+    ReaderConfiguration readerConf;
+    Reader reader;
+
+    std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+    test::readerConfigTestEnabled = true;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    const auto consumerConf = test::consumerConfigOfReader.clone();
+    test::readerConfigTestEnabled = false;
+    lock.unlock();
+
+    ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+    ASSERT_EQ(consumerConf.getReceiverQueueSize(), 1000);
+    ASSERT_EQ(consumerConf.isReadCompacted(), false);
+    ASSERT_EQ(consumerConf.getSchema().getName(), "BYTES");
+    ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 0);
+    ASSERT_EQ(consumerConf.getTickDurationInMs(), 1000);
+    ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 100);
+    ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 1000);
+    ASSERT_EQ(consumerConf.getCryptoKeyReader().get(), nullptr);
+    ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
+    ASSERT_TRUE(consumerConf.getProperties().empty());
+    ASSERT_TRUE(consumerConf.getConsumerName().empty());
+    ASSERT_FALSE(consumerConf.hasMessageListener());
+
+    client.close();
+}
+
+TEST(ReaderConfigurationTest, testCustomConfig) {
+    const std::string exampleSchema =
+        "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+        "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+    const std::string topic = "ReaderConfigurationTest-custom-config";
+    Client client(lookupUrl);
+
+    const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+    ProducerConfiguration producerConf;
+    producerConf.setSchema(schema);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+    ASSERT_FALSE(producer.getSchemaVersion().empty());
+
+    ReaderConfiguration readerConf;
+    readerConf.setSchema(schema);
+    readerConf.setReaderListener([](Reader, const Message&) {});
+    readerConf.setReceiverQueueSize(2000);
+    readerConf.setReaderName("my-reader");
+    readerConf.setReadCompacted(true);
+    readerConf.setUnAckedMessagesTimeoutMs(11000);
+    readerConf.setTickDurationInMs(2000);
+    readerConf.setAckGroupingTimeMs(0);
+    readerConf.setAckGroupingMaxSize(4096);
+    const auto cryptoReader = std::make_shared<NoOpsCryptoKeyReader>();
+    readerConf.setCryptoKeyReader(cryptoReader);
+    readerConf.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD);
+    const std::map<std::string, std::string> properties{{"key-1", "value-1"}, {"key-2", "value-2"}};
+    readerConf.setProperties(properties);
+
+    Reader reader;
+    std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+    test::readerConfigTestEnabled = true;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    const auto consumerConf = test::consumerConfigOfReader.clone();
+    test::readerConfigTestEnabled = false;
+    lock.unlock();
+
+    ASSERT_EQ(consumerConf.getSchema().getName(), schema.getName());
+    ASSERT_EQ(consumerConf.getSchema().getSchemaType(), schema.getSchemaType());
+    ASSERT_EQ(consumerConf.getSchema().getSchema(), schema.getSchema());
+    ASSERT_EQ(consumerConf.getSchema().getProperties(), schema.getProperties());
+
+    ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+    ASSERT_TRUE(consumerConf.hasMessageListener());
+    ASSERT_EQ(consumerConf.getReceiverQueueSize(), 2000);
+    ASSERT_EQ(consumerConf.getConsumerName(), "my-reader");
+    ASSERT_EQ(consumerConf.isReadCompacted(), true);
+    ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 11000);
+    ASSERT_EQ(consumerConf.getTickDurationInMs(), 2000);
+    ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 0);
+    ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 4096);
+    ASSERT_EQ(consumerConf.getCryptoKeyReader(), cryptoReader);
+    ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::DISCARD);
+    ASSERT_EQ(consumerConf.getProperties(), properties);
+    ASSERT_TRUE(consumerConf.hasProperty("key-1"));
+    ASSERT_EQ(consumerConf.getProperty("key-1"), "value-1");
+    ASSERT_TRUE(consumerConf.hasProperty("key-2"));
+    ASSERT_EQ(consumerConf.getProperty("key-2"), "value-2");
+    ASSERT_FALSE(consumerConf.hasProperty("key-3"));
+
+    client.close();
+}


[pulsar] 02/03: [Issue 8787][C++] Add reader internal subscription name setter. (#8823)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fa01b59b7ade2f29ad2b40535094bf340377fe16
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Dec 9 01:38:47 2020 +0800

    [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
    
    Master Issue: #8787
    
    ### Motivation
    
    Currently, the reader subscription name can only be generated internally randomly in the C++ client.
    Java client part is at #8801
    
    ### Modifications
    
    Add a setter for the reader's internal subscription name.
    
    ### Verifying this change
    
    This change is already covered by existing tests, such as *testSubscriptionNameSetting*, *testSetSubscriptionNameAndPrefix* and *testMultiSameSubscriptionNameReaderShouldFail*.
    
    (cherry picked from commit 408f9e61c07b436b425f9ef65ccbada022a4e7fd)
---
 .../include/pulsar/ReaderConfiguration.h           |  7 +++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  9 ++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  1 +
 pulsar-client-cpp/lib/ReaderImpl.cc                | 11 +++--
 pulsar-client-cpp/tests/ReaderTest.cc              | 57 ++++++++++++++++++++++
 5 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index bf52c4f..25b0ba3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -105,6 +105,13 @@ class PULSAR_PUBLIC ReaderConfiguration {
     void setReadCompacted(bool compacted);
     bool isReadCompacted() const;
 
+    /**
+     * Set the internal subscription name.
+     * @param internal subscriptionName
+     */
+    void setInternalSubscriptionName(std::string internalSubscriptionName);
+    const std::string& getInternalSubscriptionName() const;
+
    private:
     std::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index fec8eed..eb18d11 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -67,4 +67,13 @@ void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscript
 bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted; }
 
 void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
+
+void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
+    impl_->internalSubscriptionName = internalSubscriptionName;
+}
+
+const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
+    return impl_->internalSubscriptionName;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index c74b217..3b5cc8a 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -30,6 +30,7 @@ struct ReaderConfigurationImpl {
     std::string readerName;
     std::string subscriptionRolePrefix;
     bool readCompacted;
+    std::string internalSubscriptionName;
     ReaderConfigurationImpl()
         : schemaInfo(),
           hasReaderListener(false),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 1e08977..2ea3430 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -47,9 +47,14 @@ void ReaderImpl::start(const MessageId& startMessageId) {
                                                   std::placeholders::_1, std::placeholders::_2));
     }
 
-    std::string subscription = "reader-" + generateRandomName();
-    if (!readerConf_.getSubscriptionRolePrefix().empty()) {
-        subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+    std::string subscription;
+    if (!readerConf_.getInternalSubscriptionName().empty()) {
+        subscription = readerConf_.getInternalSubscriptionName();
+    } else {
+        subscription = "reader-" + generateRandomName();
+        if (!readerConf_.getSubscriptionRolePrefix().empty()) {
+            subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+        }
     }
 
     consumer_ = std::make_shared<ConsumerImpl>(
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index db4475b..a485652 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -506,3 +506,60 @@ TEST(ReaderTest, testPartitionIndex) {
 
     client.close();
 }
+
+TEST(ReaderTest, testSubscriptionNameSetting) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/test-subscription-name-setting";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    readerConf.setSubscriptionRolePrefix("my-prefix");
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+    std::string subscriptionName = "test-sub";
+
+    ReaderConfiguration readerConf1;
+    readerConf1.setInternalSubscriptionName(subscriptionName);
+    Reader reader1;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
+
+    ReaderConfiguration readerConf2;
+    readerConf2.setInternalSubscriptionName(subscriptionName);
+    Reader reader2;
+    ASSERT_EQ(ResultConsumerBusy,
+              client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
+
+    reader1.close();
+    reader2.close();
+    client.close();
+}