You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/03 11:40:21 UTC

[pulsar] branch branch-2.7 updated (edace66 -> c00aac7)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from edace66  [docs] Update Websocket content (#8762) (#8777)
     new 508dfb3  Clear delayed messages when clear backlog. (#8691)
     new 4b59b38  [Transaction] Fix transaction log replay not handle right. (#8723)
     new 11a96e8  [C++] Fixed flaky test: AuthPluginTest.testTlsDetectHttpsWithHostNameValidation (#8771)
     new c7efdcc  [C++] Implement batch aware producer router (#8395)
     new b27902a  Fix the problem that batchMessageId is converted to messageIdImpl (#8779)
     new 7150504  Issue 8704: improve env config handling (#8709)
     new c00aac7  [Doc]--Update doc for multiple advertised listeners (#8789)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   2 +-
 docker/pulsar/scripts/apply-config-from-env.py     |  39 ++++-
 .../apache/pulsar/broker/ServiceConfiguration.java |   2 +-
 .../broker/TransactionMetadataStoreService.java    |  27 ++--
 .../broker/delayed/DelayedDeliveryTracker.java     |   5 +
 .../delayed/InMemoryDelayedDeliveryTracker.java    |   5 +
 .../broker/service/BrokerServiceException.java     |   2 +
 .../apache/pulsar/broker/service/Dispatcher.java   |   4 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  77 +++++-----
 .../PersistentDispatcherMultipleConsumers.java     |   5 +
 .../service/persistent/PersistentSubscription.java |   1 +
 .../TransactionMetadataStoreServiceTest.java       |  17 ++-
 .../service/persistent/DelayedDeliveryTest.java    |  31 ++++
 .../TransactionCoordinatorClientTest.java          |   2 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  |  72 ++++++++-
 .../client/impl/TransactionEndToEndTest.java       |   8 +-
 .../TransactionCoordinatorClientException.java     |   9 ++
 pulsar-client-cpp/include/pulsar/Result.h          |   1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |   3 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |   5 +-
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 pulsar-client-cpp/lib/RoundRobinMessageRouter.cc   |  63 +++++++-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.h    |  26 +++-
 pulsar-client-cpp/tests/AuthPluginTest.cc          |  34 ++---
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |   4 +-
 pulsar-client-cpp/tests/CustomRoutingPolicy.h      |  13 ++
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    |   3 +-
 .../tests/RoundRobinMessageRouterTest.cc           | 164 ++++++++++++++++-----
 .../client/impl/TransactionMetaStoreHandler.java   |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |   3 +
 .../util/collections/TripleLongPriorityQueue.java  |   8 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 .../TransactionMetadataStoreProvider.java          |   9 +-
 .../InMemTransactionMetadataStoreProvider.java     |   5 +-
 .../coordinator/impl/MLTransactionLogImpl.java     |  27 ++--
 .../impl/MLTransactionMetadataStoreProvider.java   |  10 +-
 .../MLTransactionMetadataStoreTest.java            |  14 +-
 .../TransactionMetadataStoreProviderTest.java      |   2 +-
 .../docs/concepts-multiple-advertised-listeners.md |  38 +++++
 site2/website/sidebars.json                        |   3 +-
 .../version-2.6.1/reference-configuration.md       |   2 +
 .../version-2.6.2/reference-configuration.md       |   2 +
 42 files changed, 587 insertions(+), 166 deletions(-)
 create mode 100644 site2/docs/concepts-multiple-advertised-listeners.md


[pulsar] 02/07: [Transaction] Fix transaction log replay not handle right. (#8723)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4b59b38b06d2285f894340116cfe4356d9d49d8b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Dec 1 12:56:27 2020 +0800

    [Transaction] Fix transaction log replay not handle right. (#8723)
    
    1. Transaction log replay don't handle right, because when replay compare lastConfirm.entryId, it is not right to replay.
    2. Transaction handle proto is async, but recycle the command will happen before send response, we use requestId or others will not right.
    3. We should afferent the managedLedgerConfig when we use MLTransactionLog, we can config the TransactionLog manageLedger.
    4. add the TransactionNotFound exception in proto, it will return when transaction commit or abort and others transaction operation repeat or error handle.
    1. we compare the markerDeletePosition and lastConfirmPosition to replay.
    2. use local variable replace command get method.
    3. afferent the managedLedgerConfig.
    
    (cherry picked from commit 411bf71006039af661be13c54dda4ce074f97679)
---
 conf/broker.conf                                   |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  2 +-
 .../broker/TransactionMetadataStoreService.java    | 27 +++++---
 .../broker/service/BrokerServiceException.java     |  2 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 73 ++++++++++++----------
 .../TransactionMetadataStoreServiceTest.java       | 17 +++--
 .../TransactionCoordinatorClientTest.java          |  2 +-
 .../client/impl/TransactionEndToEndTest.java       |  8 +--
 .../TransactionCoordinatorClientException.java     |  9 +++
 pulsar-client-cpp/include/pulsar/Result.h          |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  3 +
 pulsar-client-cpp/lib/Result.cc                    |  3 +
 .../client/impl/TransactionMetaStoreHandler.java   |  2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 .../TransactionMetadataStoreProvider.java          |  9 ++-
 .../InMemTransactionMetadataStoreProvider.java     |  5 +-
 .../coordinator/impl/MLTransactionLogImpl.java     | 27 ++++----
 .../impl/MLTransactionMetadataStoreProvider.java   | 10 +--
 .../MLTransactionMetadataStoreTest.java            | 14 +++--
 .../TransactionMetadataStoreProviderTest.java      |  2 +-
 21 files changed, 142 insertions(+), 80 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f22fb3c..97d4ff0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1175,4 +1175,4 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 
 # Enable transaction coordinator in broker
 transactionCoordinatorEnabled=false
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f6e3d35..f4a8b75 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1860,7 +1860,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Class name for transaction metadata store provider"
     )
     private String transactionMetadataStoreProviderClassName =
-            "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
+            "org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider";
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 2a65b14..301b393 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,15 +117,23 @@ public class TransactionMetadataStoreService {
     }
 
     public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory())
-            .whenComplete((store, ex) -> {
-                if (ex != null) {
-                    LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
-                } else {
-                    stores.put(tcId, store);
-                    LOG.info("Added new transaction meta store {}", tcId);
-                }
-            });
+        pulsarService.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId))
+                .whenComplete((v ,e) -> {
+                    if (e != null) {
+                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+                    } else {
+                        transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v)
+                                .whenComplete((store, ex) -> {
+                                    if (ex != null) {
+                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
+                                    } else {
+                                        stores.put(tcId, store);
+                                        LOG.info("Added new transaction meta store {}", tcId);
+                                    }
+                                });
+                    }
+        });
     }
 
     public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 4564617..9c6b2d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -219,6 +219,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.NotAllowedError;
         } else if (t instanceof TransactionConflictException) {
             return ServerError.TransactionConflict;
+        } else if (t instanceof CoordinatorException.TransactionNotFoundException) {
+            return ServerError.TransactionNotFound;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a99ee3e..06d5b75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -39,6 +39,7 @@ import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
 
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -1268,18 +1269,21 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     protected void handleAck(CommandAck ack) {
         checkArgument(state == State.Connected);
         CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId());
+        final long requestId = ack.getRequestId();
+        final boolean hasRequestId = ack.hasRequestId();
+        final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
-                        if (ack.hasRequestId()) {
+                        if (hasRequestId) {
                             ctx.writeAndFlush(Commands.newAckResponse(
-                                    ack.getRequestId(), null, null, ack.getConsumerId()));
+                                    requestId, null, null, consumerId));
                         }
                     }).exceptionally(e -> {
-                        if (ack.hasRequestId()) {
-                            ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
+                        if (hasRequestId) {
+                            ctx.writeAndFlush(Commands.newAckResponse(requestId,
                                     BrokerServiceException.getClientErrorCode(e),
-                                    e.getMessage(), ack.getConsumerId()));
+                                    e.getMessage(), consumerId));
                         }
                         return null;
                     });
@@ -1666,45 +1670,47 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
+        final long requestId = command.getRequestId();
+        final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
-            log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
+            log.debug("Receive new txn request {} to transaction meta store {} from {}.", requestId, tcId, remoteAddress);
         }
-        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds())
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response {} for new txn request {}", tcId.getId(),  command.getRequestId());
+                        log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
                     }
-                    ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request {}", command.getRequestId(), ex);
+                        log.debug("Send response error for new txn request {}", requestId, ex);
                     }
-                    ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                 }
             }));
     }
 
     @Override
     protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) {
-            TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
-            log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID);
+            log.debug("Receive add published partition to txn request {} from {} with txnId {}", requestId, remoteAddress, txnID);
         }
         service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList())
             .whenComplete(((v, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response success for add published partition to txn request {}",  command.getRequestId());
+                        log.debug("Send response success for add published partition to txn request {}",  requestId);
                     }
-                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for add published partition to txn request {}",  command.getRequestId(), ex);
+                        log.debug("Send response error for add published partition to txn request {}",  requestId, ex);
                     }
-                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
                             BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                 }
             }));
@@ -1723,7 +1729,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 }).exceptionally(throwable -> {
                     log.error("Send response error for end txn request.", throwable);
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
                             BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
                     return null; });
     }
@@ -1731,20 +1737,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition command) {
         final long requestId = command.getRequestId();
+        final String topic = command.getTopic();
+        final List<MessageIdData> messageIdDataList = command.getMessageIdList();
         final int txnAction = command.getTxnAction().getNumber();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
 
-        service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> {
-            if (!topic.isPresent()) {
+        service.getTopics().get(TopicName.get(topic).toString()).whenComplete((optionalTopic, t) -> {
+            if (!optionalTopic.isPresent()) {
                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                        command.getRequestId(), ServerError.TopicNotFound,
-                        "Topic " + command.getTopic() + " is not found."));
+                        requestId, ServerError.TopicNotFound,
+                        "Topic " + topic + " is not found."));
                 return;
             }
-            topic.get().endTxn(txnID, txnAction, command.getMessageIdList())
+            optionalTopic.get().endTxn(txnID, txnAction, messageIdDataList)
                 .whenComplete((ignored, throwable) -> {
                     if (throwable != null) {
-                        log.error("Handle endTxnOnPartition {} failed.", command.getTopic(), throwable);
+                        log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
                         ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                 requestId, ServerError.UnknownError, throwable.getMessage()));
                         return;
@@ -1764,10 +1772,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final String subName = command.getSubscription().getSubscription();
         final int txnAction = command.getTxnAction().getNumber();
 
-        service.getTopics().get(TopicName.get(command.getSubscription().getTopic()).toString())
+        service.getTopics().get(TopicName.get(topic).toString())
             .thenAccept(optionalTopic -> {
                 if (!optionalTopic.isPresent()) {
-                    log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic());
+                    log.error("The topic {} is not exist in broker.", topic);
                     ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                             requestId, txnidLeastBits, txnidMostBits,
                             ServerError.UnknownError,
@@ -1823,10 +1831,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) {
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}",
-                    command.getRequestId(), remoteAddress, txnID);
+                    requestId, remoteAddress, txnID);
         }
 
         service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
@@ -1835,17 +1844,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published partition to txn request {}",
-                                    command.getRequestId());
+                                    requestId);
                         }
-                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response error for add published partition to txn request {}",
-                                    command.getRequestId(), ex);
+                                    requestId, ex);
                         }
-                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 8b91aea..99fb073 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -33,6 +34,7 @@ import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertEquals;
 
@@ -58,10 +60,12 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         Assert.assertNotNull(transactionMetadataStoreService);
 
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
 
         transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 0);
     }
 
     @Test
@@ -70,7 +74,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 3);
         TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get();
         TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get();
@@ -87,7 +92,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
     public void testAddProducedPartitionToTxn() throws ExecutionException, InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
@@ -104,7 +110,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
     public void testAddAckedPartitionToTxn() throws ExecutionException, InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c63db84..d37a5dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -94,7 +94,7 @@ public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBa
         try {
             transactionCoordinatorClient.abort(txnID, Collections.emptyList());
             Assert.fail("Should be fail, because the txn is in committing state, can't abort now.");
-        } catch (TransactionCoordinatorClientException.InvalidTxnStatusException ignore) {
+        } catch (TransactionCoordinatorClientException ignore) {
            // Ok here
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index fc8532d..db3d9fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -54,7 +54,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -328,8 +328,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
             }
         }
     }
@@ -533,8 +532,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
             }
 
             message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index d1ad443..0e1f6c7 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -71,6 +71,15 @@ public class TransactionCoordinatorClientException extends IOException {
     }
 
     /**
+     * Thrown when transaction not found in transaction coordinator.
+     */
+    public static class TransactionNotFoundException extends TransactionCoordinatorClientException {
+        public TransactionNotFoundException(String message) {
+            super(message);
+        }
+    }
+
+    /**
      * Thrown when transaction meta store handler not exists.
      */
     public static class MetaStoreHandlerNotExistsException extends TransactionCoordinatorClientException {
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index 01a2474..1106aae 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -84,6 +84,7 @@ enum Result
     ResultInvalidTxnStatusError,                     /// Invalid txn status error
     ResultNotAllowedError,                           /// Not allowed
     ResultTransactionConflict,                       /// Transaction ack conflict
+    ResultTransactionNotFound,                       /// Transaction not found
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 824d54a..d17a9b6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -123,6 +123,9 @@ static Result getResult(ServerError serverError) {
 
         case TransactionConflict:
             return ResultTransactionConflict;
+
+        case TransactionNotFound:
+            return ResultTransactionNotFound;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 5f074a4..89dfe8e 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -150,6 +150,9 @@ const char* strResult(Result result) {
 
         case ResultTransactionConflict:
             return "ResultTransactionConflict";
+
+        case ResultTransactionNotFound:
+            return "ResultTransactionNotFound";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2b4fa48..64688f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -379,6 +379,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                 return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
             case InvalidTxnStatus:
                 return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg);
+            case TransactionNotFound:
+                return new TransactionCoordinatorClientException.TransactionNotFoundException(msg);
             default:
                 return new TransactionCoordinatorClientException(msg);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index b30c405..9e1e9d3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -84,6 +84,7 @@ public final class PulsarApi {
     InvalidTxnStatus(21, 21),
     NotAllowedError(22, 22),
     TransactionConflict(23, 23),
+    TransactionNotFound(24, 24),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -110,6 +111,7 @@ public final class PulsarApi {
     public static final int InvalidTxnStatus_VALUE = 21;
     public static final int NotAllowedError_VALUE = 22;
     public static final int TransactionConflict_VALUE = 23;
+    public static final int TransactionNotFound_VALUE = 24;
     
     
     public final int getNumber() { return value; }
@@ -140,6 +142,7 @@ public final class PulsarApi {
         case 21: return InvalidTxnStatus;
         case 22: return NotAllowedError;
         case 23: return TransactionConflict;
+        case 24: return TransactionNotFound;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index e722708..5c12ca3 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -205,6 +205,7 @@ enum ServerError {
     NotAllowedError = 22; // Not allowed error
 
     TransactionConflict = 23; // Ack with transaction conflict
+    TransactionNotFound = 24; // Transaction not found
 }
 
 enum AuthMethod {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index 4030558..bc55f77 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -24,7 +24,7 @@ import com.google.common.annotations.Beta;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 
 /**
@@ -58,10 +58,15 @@ public interface TransactionMetadataStoreProvider {
      * Open the transaction metadata store for transaction coordinator
      * identified by <tt>transactionCoordinatorId</tt>.
      *
+     * @param transactionCoordinatorId {@link TransactionCoordinatorID} the coordinator id.
+     * @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger.
+     * @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger.
+     *
      * @return a future represents the result of the operation.
      *         an instance of {@link TransactionMetadataStore} is returned
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionMetadataStore> openStore(
-        TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory);
+            TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
+            ManagedLedgerConfig managedLedgerConfig);
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 614ef1a..8174b3a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
-
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -32,7 +32,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
 
     @Override
     public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
-         ManagedLedgerFactory managedLedgerFactory) {
+                                                                 ManagedLedgerFactory managedLedgerFactory,
+                                                                 ManagedLedgerConfig managedLedgerConfig) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 761169c..89c84c1 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -53,7 +54,7 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final ManagedLedger managedLedger;
 
-    private final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
+    public final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
 
     private final ManagedCursor cursor;
 
@@ -64,17 +65,21 @@ public class MLTransactionLogImpl implements TransactionLog {
     //this is for replay
     private final PositionImpl lastConfirmedEntry;
 
+    private PositionImpl currentLoadPosition;
+
     private final long tcId;
 
     private final String topicName;
 
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
-                                ManagedLedgerFactory managedLedgerFactory) throws Exception {
+                                ManagedLedgerFactory managedLedgerFactory,
+                                ManagedLedgerConfig managedLedgerConfig) throws Exception {
         this.topicName = TRANSACTION_LOG_PREFIX + tcID;
         this.tcId = tcID.getId();
-        this.managedLedger = managedLedgerFactory.open(topicName);
+        this.managedLedger = managedLedgerFactory.open(topicName, managedLedgerConfig);
         this.cursor =  managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
                 CommandSubscribe.InitialPosition.Earliest);
+        this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition();
         this.entryQueue = new SpscArrayQueue<>(2000);
         this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
     }
@@ -165,9 +170,8 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     class TransactionLogReplayer {
 
-        private FillEntryQueueCallback fillEntryQueueCallback;
-        private long currentLoadEntryId;
-        private TransactionLogReplayCallback transactionLogReplayCallback;
+        private final FillEntryQueueCallback fillEntryQueueCallback;
+        private final TransactionLogReplayCallback transactionLogReplayCallback;
 
         TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) {
             this.fillEntryQueueCallback = new FillEntryQueueCallback();
@@ -175,16 +179,13 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
 
         public void start() {
-            if (((PositionImpl) cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) {
-                this.transactionLogReplayCallback.replayComplete();
-                return;
-            }
-            while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) {
+
+            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
                 fillEntryQueueCallback.fillQueue();
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     ByteBuf buffer = entry.getDataBuffer();
-                    currentLoadEntryId = entry.getEntryId();
+                    currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                     ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer);
                     TransactionMetadataEntry.Builder transactionMetadataEntryBuilder =
                             TransactionMetadataEntry.newBuilder();
@@ -215,7 +216,7 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
-        private AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
         void fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 5e1061d..ef63089 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
-
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -36,13 +36,15 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
     private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class);
 
     @Override
-    public CompletableFuture<TransactionMetadataStore>
-    openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory) {
+    public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
+                                                                 ManagedLedgerFactory managedLedgerFactory,
+                                                                 ManagedLedgerConfig managedLedgerConfig) {
         TransactionMetadataStore transactionMetadataStore;
         try {
             transactionMetadataStore =
                     new MLTransactionMetadataStore(transactionCoordinatorId,
-                            new MLTransactionLogImpl(transactionCoordinatorId, managedLedgerFactory));
+                            new MLTransactionLogImpl(transactionCoordinatorId,
+                                    managedLedgerFactory, managedLedgerConfig));
         } catch (Exception e) {
             log.error("MLTransactionMetadataStore init fail", e);
             return FutureUtil.failedFuture(e);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 84e68c7..aff0d5a 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -48,7 +49,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -108,7 +110,10 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                managedLedgerConfig);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -150,7 +155,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
 
-                                new MLTransactionLogImpl(transactionCoordinatorID, factory));
+                                new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()));
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -208,7 +213,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 9a79b37..84f9a47 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,7 @@ public class TransactionMetadataStoreProviderTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
-        this.store = this.provider.openStore(tcId, null).get();
+        this.store = this.provider.openStore(tcId, null, null).get();
     }
 
     @Test


[pulsar] 03/07: [C++] Fixed flaky test: AuthPluginTest.testTlsDetectHttpsWithHostNameValidation (#8771)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 11a96e8ea22e0afcf4e862b4fbe3d26300a0874d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Dec 2 17:14:51 2020 -0800

    [C++] Fixed flaky test: AuthPluginTest.testTlsDetectHttpsWithHostNameValidation (#8771)
    
    * [C++] Fixed flaky test: AuthPluginTest.testTlsDetectHttpsWithHostNameValidation
    
    * Fixed test assertion
    
    (cherry picked from commit b78b01c6718a45865a9146e415664c48d374b869)
---
 pulsar-client-cpp/tests/AuthPluginTest.cc | 34 +++++++++++++------------------
 1 file changed, 14 insertions(+), 20 deletions(-)

diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc b/pulsar-client-cpp/tests/AuthPluginTest.cc
index db7c8ee..2d7aa1d 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -171,26 +171,20 @@ TEST(AuthPluginTest, testTlsDetectHttps) {
 }
 
 TEST(AuthPluginTest, testTlsDetectHttpsWithHostNameValidation) {
-    try {
-        ClientConfiguration config = ClientConfiguration();
-        config.setUseTls(true);  // shouldn't be needed soon
-        config.setTlsTrustCertsFilePath(caPath);
-        config.setTlsAllowInsecureConnection(false);
-        config.setAuth(pulsar::AuthTls::create(clientPublicKeyPath, clientPrivateKeyPath));
-        config.setValidateHostName(true);
-
-        Client client(serviceUrlHttps, config);
-
-        std::string topicName = "persistent://private/auth/test-tls-detect-https";
-
-        Producer producer;
-        Promise<Result, Producer> producerPromise;
-        client.createProducerAsync(topicName, WaitForCallbackValue<Producer>(producerPromise));
-    } catch (const std::exception& ex) {
-        EXPECT_EQ(ex.what(), std::string("handshake: certificate verify failed"));
-    } catch (...) {
-        FAIL() << "Expected handshake: certificate verify failed";
-    }
+    ClientConfiguration config = ClientConfiguration();
+    config.setUseTls(true);  // shouldn't be needed soon
+    config.setTlsTrustCertsFilePath(caPath);
+    config.setTlsAllowInsecureConnection(false);
+    config.setAuth(pulsar::AuthTls::create(clientPublicKeyPath, clientPrivateKeyPath));
+    config.setValidateHostName(true);
+
+    Client client(serviceUrlHttps, config);
+
+    std::string topicName = "persistent://private/auth/test-tls-detect-https-with-hostname-validation";
+
+    Producer producer;
+    Result res = client.createProducer(topicName, producer);
+    ASSERT_NE(ResultOk, res);
 }
 
 namespace testAthenz {


[pulsar] 05/07: Fix the problem that batchMessageId is converted to messageIdImpl (#8779)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b27902a110dfa88bed78016d2079225ea0e126e7
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Dec 3 14:31:59 2020 +0800

    Fix the problem that batchMessageId is converted to messageIdImpl (#8779)
    
    Fixes #8712
    
    ### Motivation
    TopicReaderTest.testHasMessageAvailable is flaky
    Cause Analysis:
    When there is only one message in the batch, calling Broker's `GetLastMessageId` will determine that this is not a batch message, and then return a `MessageIdImpl`. (See ServerCnx line 1553)
    In the same scenario, the client will return a `BatchMessageId` after sending a message. (See ProducerImpl line 1151)
    
    This will happen:
    MessageIdImpl: `3:31:-1`
    BatchMessageId: `3:31:-1:0`
    
    When calling `reader.hasMessageAvailable()`, the two ids will be compared, like this: `lastMessageIdInBroker.compareTo(messageId)`
    Although it is the same messageId, the results will not be equal.
    
    ### Modifications
    When we call `admin.topics().getLastMessageId`, even if there is only one message in the batch, it is considered to be BatchMessageId. Client also acts like this.
    Therefore, we keep consistent everywhere.
    Even if there is only one message in the batch, we think it is `BatchMessageId`
    
    
    (cherry picked from commit 85f3ff4edbaa10c7894af8ad823cbce37b13829c)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 72 +++++++++++++++++++++-
 2 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 06d5b75..e931c94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1550,7 +1550,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
             MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
             int batchSize = metadata.getNumMessagesInBatch();
             entry.release();
-            return batchSize;
+            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
         });
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
@@ -1558,7 +1558,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 ctx.writeAndFlush(Commands.newError(
                         requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
             } else {
-                int largestBatchIndex = batchSize > 1 ? batchSize - 1 : -1;
+                int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", remoteAddress,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index e16125f..ebc2889 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -31,9 +31,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -982,6 +980,76 @@ public class TopicReaderTest extends ProducerConsumerBase {
         producer.close();
     }
 
+    @Test(timeOut = 20000)
+    public void testHasMessageAvailableWithBatch() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch";
+        final int numOfMessage = 10;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .enableBatching(true)
+                .batchingMaxMessages(10)
+                .batchingMaxPublishDelay(2,TimeUnit.SECONDS)
+                .topic(topicName).create();
+
+        //For batch-messages with single message, the type of client messageId should be the same as that of broker
+        MessageId messageId = producer.send("msg".getBytes());
+        assertTrue(messageId instanceof MessageIdImpl);
+        ReaderImpl<byte[]> reader = (ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
+                .startMessageId(messageId).startMessageIdInclusive().create();
+        MessageId lastMsgId = reader.getConsumer().getLastMessageId();
+        assertTrue(lastMsgId instanceof BatchMessageIdImpl);
+        assertTrue(messageId instanceof BatchMessageIdImpl);
+        assertEquals(lastMsgId, messageId);
+        reader.close();
+
+        CountDownLatch latch = new CountDownLatch(numOfMessage);
+        List<MessageId> allIds = Collections.synchronizedList(new ArrayList<>());
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.sendAsync(String.format("msg num %d", i).getBytes()).whenComplete((mid, e) -> {
+                if (e != null) {
+                    Assert.fail();
+                } else {
+                    allIds.add(mid);
+                }
+                latch.countDown();
+            });
+        }
+        producer.flush();
+        latch.await();
+        producer.close();
+
+        //For batch-message with multi messages, the type of client messageId should be the same as that of broker
+        for (MessageId id : allIds) {
+            reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
+                    .startMessageId(id).startMessageIdInclusive().create();
+            if (id instanceof BatchMessageIdImpl) {
+                MessageId lastMessageId = reader.getConsumer().getLastMessageId();
+                assertTrue(lastMessageId instanceof BatchMessageIdImpl);
+                log.info("id {} instance of BatchMessageIdImpl",id);
+            } else {
+                assertTrue(id instanceof MessageIdImpl);
+                MessageId lastMessageId = reader.getConsumer().getLastMessageId();
+                assertTrue(lastMessageId instanceof MessageIdImpl);
+                log.info("id {} instance of MessageIdImpl",id);
+            }
+            reader.close();
+        }
+        //For non-batch message, the type of client messageId should be the same as that of broker
+        producer = pulsarClient.newProducer()
+                .enableBatching(false).topic(topicName).create();
+        messageId = producer.send("non-batch".getBytes());
+        assertFalse(messageId instanceof BatchMessageIdImpl);
+        assertTrue(messageId instanceof MessageIdImpl);
+        reader = (ReaderImpl<byte[]>) pulsarClient.newReader().topic(topicName)
+                .startMessageId(messageId).create();
+        MessageId lastMessageId = reader.getConsumer().getLastMessageId();
+        assertFalse(lastMessageId instanceof BatchMessageIdImpl);
+        assertTrue(lastMessageId instanceof MessageIdImpl);
+        assertEquals(lastMessageId, messageId);
+        producer.close();
+        reader.close();
+    }
+
     @Test
     public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
         final int numOfMessage = 10;


[pulsar] 04/07: [C++] Implement batch aware producer router (#8395)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c7efdcc7cbb27769eb42ec5c75080d9caf5b87d4
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Dec 2 21:30:04 2020 -0800

    [C++] Implement batch aware producer router (#8395)
    
    * [C++] Implement batch aware producer router
    
    * Fixed tests that relied on old rr distribution
    
    * Fixed compilation with older compilers
    
    * Fixed test
    
    (cherry picked from commit 74803dbf1f9ef5b90558cce90f67fa4c668444a3)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |   5 +-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.cc   |  63 +++++++-
 pulsar-client-cpp/lib/RoundRobinMessageRouter.h    |  26 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |   4 +-
 pulsar-client-cpp/tests/CustomRoutingPolicy.h      |  13 ++
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    |   3 +-
 .../tests/RoundRobinMessageRouterTest.cc           | 164 ++++++++++++++++-----
 7 files changed, 223 insertions(+), 55 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index aa5e176..1590780 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -63,7 +63,10 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
     switch (conf_.getPartitionsRoutingMode()) {
         case ProducerConfiguration::RoundRobinDistribution:
-            return std::make_shared<RoundRobinMessageRouter>(conf_.getHashingScheme());
+            return std::make_shared<RoundRobinMessageRouter>(
+                conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(),
+                conf_.getBatchingMaxAllowedSizeInBytes(),
+                boost::posix_time::milliseconds(conf_.getBatchingMaxPublishDelayMs()));
         case ProducerConfiguration::CustomPartition:
             return conf_.getMessageRouterPtr();
         case ProducerConfiguration::UseSinglePartition:
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
index c47fb23..feb3ab0 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
@@ -18,22 +18,73 @@
  */
 #include "RoundRobinMessageRouter.h"
 
+#include "TimeUtils.h"
+
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int_distribution.hpp>
+
 namespace pulsar {
-RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme)
-    : MessageRouterBase(hashingScheme), prevPartition_(0) {}
+RoundRobinMessageRouter::RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme,
+                                                 bool batchingEnabled, uint32_t maxBatchingMessages,
+                                                 uint32_t maxBatchingSize,
+                                                 boost::posix_time::time_duration maxBatchingDelay)
+    : MessageRouterBase(hashingScheme),
+      batchingEnabled_(batchingEnabled),
+      lastPartitionChange_(TimeUtils::currentTimeMillis()),
+      msgCounter_(0),
+      cumulativeBatchSize_(0),
+      maxBatchingMessages_(maxBatchingMessages),
+      maxBatchingSize_(maxBatchingSize),
+      maxBatchingDelay_(maxBatchingDelay) {
+    boost::random::mt19937 rng(time(nullptr));
+    boost::random::uniform_int_distribution<int> dist;
+    currentPartitionCursor_ = dist(rng);
+}
 
 RoundRobinMessageRouter::~RoundRobinMessageRouter() {}
 
 // override
 int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
+    if (topicMetadata.getNumPartitions() == 1) {
+        // When there are no partitions, don't even bother
+        return 0;
+    }
+
     // if message has a key, hash the key and return the partition
     if (msg.hasPartitionKey()) {
         return hash->makeHash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
-    } else {
-        Lock lock(mutex_);
-        // else pick the next partition
-        return prevPartition_++ % topicMetadata.getNumPartitions();
     }
+
+    if (!batchingEnabled_) {
+        // If there's no batching, do the round-robin at the message scope
+        // as there is no gain otherwise.
+        return currentPartitionCursor_++ % topicMetadata.getNumPartitions();
+    }
+
+    // If there's no key, we do round-robin across partition, sticking with a given
+    // partition for a certain amount of messages or volume buffered or the max delay to batch is reached so
+    // that we ensure having a decent amount of batching of the messages. Note that it is possible that we
+    // skip more than one partition if multiple goroutines increment currentPartitionCursor at the same time.
+    // If that happens it shouldn't be a problem because we only want to spread the data on different
+    // partitions but not necessarily in a specific sequence.
+    uint32_t messageSize = msg.getLength();
+    uint32_t messageCount = msgCounter_;
+    uint32_t batchSize = cumulativeBatchSize_;
+    int64_t lastPartitionChange = lastPartitionChange_;
+    int64_t now = TimeUtils::currentTimeMillis();
+
+    if (messageCount >= maxBatchingMessages_ || (messageSize >= maxBatchingSize_ - batchSize) ||
+        (now - lastPartitionChange >= maxBatchingDelay_.total_milliseconds())) {
+        uint32_t currentPartitionCursor = ++currentPartitionCursor_;
+        lastPartitionChange_ = now;
+        cumulativeBatchSize_ = messageSize;
+        msgCounter_ = 1;
+        return currentPartitionCursor % topicMetadata.getNumPartitions();
+    }
+
+    ++msgCounter_;
+    cumulativeBatchSize_ += messageSize;
+    return currentPartitionCursor_ % topicMetadata.getNumPartitions();
 }
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 5460e13..be172a0 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -16,28 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_RR_MESSAGE_ROUTER_HEADER_
-#define PULSAR_RR_MESSAGE_ROUTER_HEADER_
+
+#pragma once
 
 #include <pulsar/defines.h>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/TopicMetadata.h>
-#include <mutex>
 #include "Hash.h"
 #include "MessageRouterBase.h"
 
+#include <atomic>
+#include <boost/date_time/local_time/local_time.hpp>
+
 namespace pulsar {
 class PULSAR_PUBLIC RoundRobinMessageRouter : public MessageRouterBase {
    public:
-    RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme);
+    RoundRobinMessageRouter(ProducerConfiguration::HashingScheme hashingScheme, bool batchingEnabled,
+                            uint32_t maxBatchingMessages, uint32_t maxBatchingSize,
+                            boost::posix_time::time_duration maxBatchingDelay);
     virtual ~RoundRobinMessageRouter();
     virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
 
    private:
-    std::mutex mutex_;
-    unsigned int prevPartition_;
+    const bool batchingEnabled_;
+    const uint32_t maxBatchingMessages_;
+    const uint32_t maxBatchingSize_;
+    const boost::posix_time::time_duration maxBatchingDelay_;
+
+    std::atomic<uint32_t> currentPartitionCursor_;
+    std::atomic<int64_t> lastPartitionChange_;
+    std::atomic<uint32_t> msgCounter_;
+    std::atomic<uint32_t> cumulativeBatchSize_;
 };
-typedef std::unique_lock<std::mutex> Lock;
+
 }  // namespace pulsar
-#endif  // PULSAR_RR_MESSAGE_ROUTER_HEADER_
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 60fc9b5..a812657 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -861,6 +861,7 @@ TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy) {
     Producer producer;
     ProducerConfiguration tempProducerConfiguration;
     tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
     ProducerConfiguration producerConfiguration = tempProducerConfiguration;
     Result result = client.createProducer(topicName, producerConfiguration, producer);
     ASSERT_EQ(ResultOk, result);
@@ -2465,7 +2466,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
     Producer producer;
     int numOfMessages = 20;
     ProducerConfiguration tempProducerConfiguration;
-    tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
     ProducerConfiguration producerConfiguration = tempProducerConfiguration;
     producerConfiguration.setBatchingEnabled(true);
     // set batch message number numOfMessages, and max delay 60s
@@ -2687,6 +2688,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
     // set batch message number numOfMessages, and max delay 60s
     producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions);
     producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+    producerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
 
     Result result = client.createProducer(topicName, producerConfiguration, producer);
     ASSERT_EQ(ResultOk, result);
diff --git a/pulsar-client-cpp/tests/CustomRoutingPolicy.h b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
index ca82e34..ed10c5b 100644
--- a/pulsar-client-cpp/tests/CustomRoutingPolicy.h
+++ b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
@@ -32,6 +32,19 @@ class CustomRoutingPolicy : public MessageRoutingPolicy {
 
     int getPartition(const Message& msg, const TopicMetadata& topicMetadata) { return 0; }
 };
+
+class SimpleRoundRobinRoutingPolicy : public MessageRoutingPolicy {
+   public:
+    SimpleRoundRobinRoutingPolicy() : counter_(0) {}
+
+    int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
+        return counter_++ % topicMetadata.getNumPartitions();
+    }
+
+   private:
+    uint32_t counter_;
+};
+
 }  // namespace pulsar
 
 #endif  // CUSTOM_ROUTER_POLICY_HEADER_
diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
index e1bf68c..af473a2 100644
--- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
+++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
@@ -25,6 +25,7 @@
 #include <memory>
 
 #include "HttpHelper.h"
+#include "CustomRoutingPolicy.h"
 
 using namespace pulsar;
 
@@ -57,7 +58,7 @@ class PartitionsSet {
     Result initProducer(bool enablePartitionsUpdate) {
         clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
         const auto producerConfig =
-            ProducerConfiguration().setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+            ProducerConfiguration().setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
         return clientForProducer_->createProducer(topicName, producerConfig, producer_);
     }
 
diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
index 431dd6b..ce5ad17 100644
--- a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
+++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
@@ -19,59 +19,147 @@
 #include <pulsar/Client.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <gtest/gtest.h>
-#include <gmock/gmock.h>
-#include <boost/functional/hash.hpp>
-
-#include "tests/mocks/GMockMessage.h"
+#include <thread>
 
 #include "../lib/RoundRobinMessageRouter.h"
 #include "../lib/TopicMetadataImpl.h"
 
-using ::testing::AtLeast;
-using ::testing::Return;
-using ::testing::ReturnRef;
-
 using namespace pulsar;
 
-// TODO: Edit Message class to suit Google Mock and enable these tests when 2.0.0 release.
+TEST(RoundRobinMessageRouterTest, onePartition) {
+    const int numPartitions = 1;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = MessageBuilder().setPartitionKey("my-key-1").setContent("one").build();
+    Message msg2 = MessageBuilder().setPartitionKey("my-key-2").setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("three").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    int p3 = router.getPartition(msg3, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p1, 0);
+    ASSERT_EQ(p2, 0);
+    ASSERT_EQ(p3, 0);
+}
+
+TEST(RoundRobinMessageRouterTest, sameKey) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = MessageBuilder().setPartitionKey("my-key").setContent("one").build();
+    Message msg2 = MessageBuilder().setPartitionKey("my-key").setContent("two").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p2, p1);
+}
+
+TEST(RoundRobinMessageRouterTest, batchingDisabled) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, false, 1, 1,
+                                   boost::posix_time::milliseconds(0));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+
+    int p1 = router.getPartition(msg1, TopicMetadataImpl(numPartitions));
+    int p2 = router.getPartition(msg2, TopicMetadataImpl(numPartitions));
+    ASSERT_EQ(p2, (p1 + 1) % numPartitions);
+}
+
+TEST(RoundRobinMessageRouterTest, batchingEnabled) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000,
+                                   boost::posix_time::seconds(1));
 
-TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) {
-    const int numPartitions1 = 5;
-    const int numPartitions2 = 3;
+    int p = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
 
-    RoundRobinMessageRouter router1(ProducerConfiguration::BoostHash);
-    RoundRobinMessageRouter router2(ProducerConfiguration::BoostHash);
+        int p1 = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p != -1) {
+            ASSERT_EQ(p1, p);
+        }
 
-    GMockMessage message;
-    EXPECT_CALL(message, hasPartitionKey()).Times(20).WillRepeatedly(Return(false));
-    EXPECT_CALL(message, getPartitionKey()).Times(0);
-    for (int i = 0; i < 10; i++) {
-        ASSERT_EQ(i % numPartitions1, router1.getPartition(message, TopicMetadataImpl(numPartitions1)));
-        ASSERT_EQ(i % numPartitions2, router2.getPartition(message, TopicMetadataImpl(numPartitions2)));
+        p = p1;
     }
 }
 
-TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
-    const int numPartitons = 1234;
+TEST(RoundRobinMessageRouterTest, maxDelay) {
+    const int numPartitions = 13;
 
-    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash);
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 1000, 100000,
+                                   boost::posix_time::seconds(1));
 
-    std::string partitionKey1 = "key1";
-    std::string partitionKey2 = "key2";
+    int p1 = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
 
-    GMockMessage message1;
-    EXPECT_CALL(message1, hasPartitionKey()).Times(1).WillOnce(Return(true));
-    EXPECT_CALL(message1, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey1));
+        int p = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p1 != -1) {
+            ASSERT_EQ(p1, p);
+        }
 
-    GMockMessage message2;
-    EXPECT_CALL(message2, hasPartitionKey()).Times(1).WillOnce(Return(true));
-    EXPECT_CALL(message2, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey2));
+        p1 = p;
+    }
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+
+    // Second set of messages will go in separate partition
+
+    int p2 = -1;
+    for (int i = 0; i < 100; i++) {
+        Message msg = MessageBuilder().setContent("0123456789").build();
+
+        int p = router.getPartition(msg, TopicMetadataImpl(numPartitions));
+        if (p2 != -1) {
+            ASSERT_EQ(p2, p);
+        }
+
+        p2 = p;
+    }
+
+    ASSERT_EQ(p2, (p1 + 1) % numPartitions);
+}
 
-    auto expectedParrtition1 =
-        static_cast<const int>(boost::hash<std::string>()(partitionKey1) % numPartitons);
-    auto expectedParrtition2 =
-        static_cast<const int>(boost::hash<std::string>()(partitionKey2) % numPartitons);
+TEST(RoundRobinMessageRouterTest, maxNumberOfMessages) {
+    const int numPartitions = 13;
 
-    ASSERT_EQ(expectedParrtition1, router.getPartition(message1, TopicMetadataImpl(numPartitons)));
-    ASSERT_EQ(expectedParrtition2, router.getPartition(message2, TopicMetadataImpl(numPartitons)));
-}
\ No newline at end of file
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 2, 1000,
+                                   boost::posix_time::seconds(1));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("tree").build();
+
+    TopicMetadataImpl tm = TopicMetadataImpl(numPartitions);
+    int p1 = router.getPartition(msg1, tm);
+    int p2 = router.getPartition(msg2, tm);
+    int p3 = router.getPartition(msg3, tm);
+    ASSERT_EQ(p1, p2);
+    ASSERT_EQ(p3, (p2 + 1) % numPartitions);
+}
+
+TEST(RoundRobinMessageRouterTest, maxBatchSize) {
+    const int numPartitions = 13;
+
+    RoundRobinMessageRouter router(ProducerConfiguration::BoostHash, true, 10, 8,
+                                   boost::posix_time::seconds(1));
+
+    Message msg1 = MessageBuilder().setContent("one").build();
+    Message msg2 = MessageBuilder().setContent("two").build();
+    Message msg3 = MessageBuilder().setContent("tree").build();
+
+    TopicMetadataImpl tm = TopicMetadataImpl(numPartitions);
+    int p1 = router.getPartition(msg1, tm);
+    int p2 = router.getPartition(msg2, tm);
+    int p3 = router.getPartition(msg3, tm);
+    ASSERT_EQ(p1, p2);
+    ASSERT_EQ(p3, (p2 + 1) % numPartitions);
+}


[pulsar] 01/07: Clear delayed messages when clear backlog. (#8691)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 508dfb358c81edea97c70e439aa8cfea54244be1
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Nov 26 09:25:52 2020 +0800

    Clear delayed messages when clear backlog. (#8691)
    
    Clear delayed messages when clear backlog.
    
    (cherry picked from commit a022d28735ea99e8f5c13053bfab3fef7f095c31)
---
 .../broker/delayed/DelayedDeliveryTracker.java     |  5 ++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  5 ++++
 .../apache/pulsar/broker/service/Dispatcher.java   |  4 +++
 .../PersistentDispatcherMultipleConsumers.java     |  5 ++++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../service/persistent/DelayedDeliveryTest.java    | 31 ++++++++++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  |  8 ++++++
 7 files changed, 59 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index e772d22..6b122bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -69,6 +69,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
     void resetTickTime(long tickTime);
 
     /**
+     * Clear all delayed messages from the tracker.
+     */
+    void clear();
+
+    /**
      * Close the subscription tracker and release all resources.
      */
     void close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a702a01..5c37b81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -135,6 +135,11 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
     }
 
     @Override
+    public void clear() {
+        this.priorityQueue.clear();
+    }
+
+    @Override
     public long getNumberOfDelayedMessages() {
         return priorityQueue.size();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index a16adc9..483190f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -108,6 +108,10 @@ public interface Dispatcher {
         return 0;
     }
 
+    default void clearDelayedMessages() {
+        //No-op
+    }
+
     default void cursorIsReset() {
         //No-op
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index be9a7aa..265186b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -798,6 +798,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
+    public void clearDelayedMessages() {
+        this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
+    }
+
+    @Override
     public void cursorIsReset() {
         if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
             this.lastIndividualDeletedRangeFromCursorRecovery = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5e22dad..0a578d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -493,6 +493,7 @@ public class PersistentSubscription implements Subscription {
                     log.debug("[{}][{}] Backlog size after clearing: {}", topicName, subName,
                             cursor.getNumberOfEntriesInBacklog(false));
                 }
+                dispatcher.clearDelayedMessages();
                 future.complete(null);
             }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 3c527d8..799d7f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -42,6 +44,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -447,4 +451,31 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         msg = consumer.receive(3, TimeUnit.SECONDS);
         assertNotNull(msg);
     }
+
+    @Test
+    public void testClearDelayedMessagesWhenClearBacklog() throws PulsarClientException, PulsarAdminException {
+        final String topic = "persistent://public/default/testClearDelayedMessagesWhenClearBacklog-" + UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value("Delayed Message - " + i).send();
+        }
+
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName).getDispatcher();
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), messages));
+
+        admin.topics().skipAllMessages(topic, subName);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 6a9168a..5075eb1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -139,6 +139,14 @@ public class TripleLongPriorityQueue implements AutoCloseable {
         return size;
     }
 
+    /**
+     * Clear all items.
+     */
+    public void clear() {
+        this.buffer.clear();
+        this.size = 0;
+    }
+
     private void increaseCapacity() {
         // For bigger sizes, increase by 50%
         this.capacity += (capacity <= 256 ? capacity : capacity / 2);


[pulsar] 06/07: Issue 8704: improve env config handling (#8709)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7150504846f2a7c4079367c0796f3f01530bad04
Author: Kevin Wilson <kl...@comcast.net>
AuthorDate: Thu Dec 3 02:11:00 2020 -0700

    Issue 8704: improve env config handling (#8709)
    
    There are three issues addressed in this PR.
    1) script files that contain lines other than comments and value
    assignments cause the script to abort not re-writing the file at all.
    These lines should be tolerated if they cannot be parsed as value
    assignments and flow through to the edited results.
    
    2) Redaction of passwords in environment variables. Environment
    variables with a name containing password will automatically be
    redacted and not printed in the log messages.
    
    3) values that contain spaces as in many java args that contain
    stringed together options like "-Xmx ... -Xmn ..." are read from the
    environment without the quotes. These quotes need to be added back
    when writing back to the shell script to ensure that these values are
    processed as a group.
    
    (cherry picked from commit c02e8802253e38d07c87f2ff93a88172e2721af8)
---
 docker/pulsar/scripts/apply-config-from-env.py | 39 ++++++++++++++++++++++----
 1 file changed, 33 insertions(+), 6 deletions(-)

diff --git a/docker/pulsar/scripts/apply-config-from-env.py b/docker/pulsar/scripts/apply-config-from-env.py
index 23d3d0f..948254b 100755
--- a/docker/pulsar/scripts/apply-config-from-env.py
+++ b/docker/pulsar/scripts/apply-config-from-env.py
@@ -47,16 +47,30 @@ for conf_filename in conf_files:
         if not line or line.startswith('#'):
             continue
 
-        k,v = line.split('=', 1)
-        keys[k] = len(lines) - 1
+        try:
+            k,v = line.split('=', 1)
+            keys[k] = len(lines) - 1
+        except:
+            print("[%s] skip Processing %s" % (conf_filename, line))
 
     # Update values from Env
     for k in sorted(os.environ.keys()):
-        v = os.environ[k]
+        v = os.environ[k].strip()
+
+        # Quote the value if it contains a space.
+        if v.find(" ") >= 0:
+            v = '\"%s\"' % v
+
+        # Hide the value in logs if is password.
+        if "password" in k:
+            displayValue = "********"
+        else:
+            displayValue = v
+
         if k.startswith(PF_ENV_PREFIX):
             k = k[len(PF_ENV_PREFIX):]
         if k in keys:
-            print('[%s] Applying config %s = %s' % (conf_filename, k, v))
+            print('[%s] Applying config %s = %s' % (conf_filename, k, displayValue))
             idx = keys[k]
             lines[idx] = '%s=%s\n' % (k, v)
 
@@ -66,16 +80,29 @@ for conf_filename in conf_files:
         v = os.environ[k]
         if not k.startswith(PF_ENV_PREFIX):
             continue
+
+        # Quote the value if it contains a space.
+        if v.find(" ") >= 0:
+            v = '\"%s\"' % v
+
+        # Hide the value in logs if is password.
+        if "password" in k:
+            displayValue = "********"
+        else:
+            displayValue = v
+
         k = k[len(PF_ENV_PREFIX):]
         if k not in keys:
-            print('[%s] Adding config %s = %s' % (conf_filename, k, v))
+            print('[%s] Adding config %s = %s' % (conf_filename, k, displayValue))
             lines.append('%s=%s\n' % (k, v))
         else:
-            print('[%s] Updating config %s = %s' %(conf_filename, k, v))
+            print('[%s] Updating config %s = %s' % (conf_filename, k, displayValue))
             lines[keys[k]] = '%s=%s\n' % (k, v)
 
+
     # Store back the updated config in the same file
     f = open(conf_filename, 'w')
     for line in lines:
         f.write(line)
     f.close()
+


[pulsar] 07/07: [Doc]--Update doc for multiple advertised listeners (#8789)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c00aac7838a17ac12c91a48989054ae6930085a2
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Thu Dec 3 11:06:25 2020 +0800

    [Doc]--Update doc for multiple advertised listeners (#8789)
    
    Fixes #1736
    
    
    ### Motivation
    
    
    Multiple advertised listener is introduced in Release 2.6.0 but the doc is not added or updated accordingly.
    
    ### Modifications
    
    1: Releases 2.6.1 and 2.6.2
    Update Reference > Pulsar configuration > Broker: add `advertisedListeners` and `internalListenerName` config options
    
    **Note: because these 2 config options have been added in Release master and 2.6.0 by another TW.**
    
    2: Release Master:
    
    Concepts and Architecture: Add a new doc "Multiple advertised listeners".
    sidebar: add the info of the new doc.
    
    **Note: After the new doc is reviewed and approved, will add it to releases 2.6.0, 2.6.1 and 2.6.2.**
    
    
    (cherry picked from commit 42b6d9844126803cfc14a9ff4733d91bffae1948)
---
 .../docs/concepts-multiple-advertised-listeners.md | 38 ++++++++++++++++++++++
 site2/website/sidebars.json                        |  3 +-
 .../version-2.6.1/reference-configuration.md       |  2 ++
 .../version-2.6.2/reference-configuration.md       |  2 ++
 4 files changed, 44 insertions(+), 1 deletion(-)

diff --git a/site2/docs/concepts-multiple-advertised-listeners.md b/site2/docs/concepts-multiple-advertised-listeners.md
new file mode 100644
index 0000000..8ba0db9
--- /dev/null
+++ b/site2/docs/concepts-multiple-advertised-listeners.md
@@ -0,0 +1,38 @@
+---
+id: concepts-multiple-advertised-listeners
+title: Multiple advertised listeners
+sidebar_label: Multiple advertised listeners
+---
+
+When a Pulsar cluster is deployed in the production environment, it may require to expose multiple advertised addresses for the broker. For example, when you deploy a Pulsar cluster in Kubernetes and want other clients, which are not in the same Kubernetes cluster, to connect to the Pulsar cluster, you need to assign a broker URL to external clients. But clients in the same Kubernetes cluster can still connect to the Pulsar cluster through the internal network of Kubernetes.
+
+## Advertised listeners
+
+To ensure clients in both internal and external networks can connect to a Pulsar cluster, Pulsar introduces `advertisedListeners` and `internalListenerName` configuration options into the [broker configuration file](reference-configuration.md#broker) to ensure that the broker supports exposing multiple advertised listeners and support the separation of internal and external network traffic.
+
+- The `advertisedListeners` is used to specify multiple advertised listeners. The broker uses the listener as the broker identifier in the load manager and the bundle owner data. The `advertisedListeners` is formatted as `<listener_name>:pulsar://<host>:<port>, <listener_name>:pulsar+ssl://<host>:<port>`. You can set up the `advertisedListeners` like
+`advertisedListeners=internal:pulsar://192.168.1.11:6660,internal:pulsar+ssl://192.168.1.11:6651`.
+
+- The `internalListenerName` is used to specify the internal service URL that the broker uses. You can specify the `internalListenerName` by choosing one of the `advertisedListeners`. The broker uses the listener name of the first advertised listener as the `internalListenerName` if the `internalListenerName` is absent.
+
+After setting up the `advertisedListeners`, clients can choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible. However, if the client creates producers or consumer on a topic, the client must send a lookup requests to the broker for getting the owner broker, then connect to the owner broker to publish messages or consume messages. Therefore, You must allow the client to get the corresponding service URL with the same adve [...]
+
+## Use multiple advertised listeners
+
+This example shows how a Pulsar client uses multiple advertised listeners.
+
+1. Configure multiple advertised listeners in the broker configuration file.
+
+```shell
+advertisedListeners={listenerName}:pulsar://xxxx:6650,
+{listenerName}:pulsar+ssl://xxxx:6651
+```
+
+2. Specify the listener name for the client.
+
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://xxxx:6650")
+    .listenerName("external")
+    .build();
+```
\ No newline at end of file
diff --git a/site2/website/sidebars.json b/site2/website/sidebars.json
index 0a446da..d75e4f5 100644
--- a/site2/website/sidebars.json
+++ b/site2/website/sidebars.json
@@ -16,7 +16,8 @@
       "concepts-multi-tenancy",
       "concepts-authentication",
       "concepts-topic-compaction",
-      "concepts-proxy-sni-routing"
+      "concepts-proxy-sni-routing",
+      "concepts-multiple-advertised-listeners"
     ],
     "Pulsar Schema": [
       "schema-get-started",
diff --git a/site2/website/versioned_docs/version-2.6.1/reference-configuration.md b/site2/website/versioned_docs/version-2.6.1/reference-configuration.md
index c0179c8..9ac5cfb 100644
--- a/site2/website/versioned_docs/version-2.6.1/reference-configuration.md
+++ b/site2/website/versioned_docs/version-2.6.1/reference-configuration.md
@@ -107,6 +107,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 
 |Name|Description|Default|
 |---|---|---|
+|advertisedListeners|Specify multiple advertised listeners for the broker.<br><br>The format is `<listener_name>:pulsar://<host>:<port>`.<br><br>If there are multiple listeners, separate them with commas.<br><br>**Note**: do not use this configuration with `advertisedAddress` and `brokerServicePort`. If the value of this configuration is empty, the broker uses `advertisedAddress` and `brokerServicePort`|/|
+|internalListenerName|Specify the internal listener name for the broker.<br><br>**Note**: the listener name must be contained in `advertisedListeners`.<br><br> If the value of this configuration is empty, the broker uses the first listener as the internal listener.|/|
 |enablePersistentTopics|  Whether persistent topics are enabled on the broker |true|
 |enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true|
 |functionsWorkerEnabled|  Whether the Pulsar Functions worker service is enabled in the broker  |false|
diff --git a/site2/website/versioned_docs/version-2.6.2/reference-configuration.md b/site2/website/versioned_docs/version-2.6.2/reference-configuration.md
index 32ec853..c186a24 100644
--- a/site2/website/versioned_docs/version-2.6.2/reference-configuration.md
+++ b/site2/website/versioned_docs/version-2.6.2/reference-configuration.md
@@ -106,6 +106,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 
 |Name|Description|Default|
 |---|---|---|
+|advertisedListeners|Specify multiple advertised listeners for the broker.<br><br>The format is `<listener_name>:pulsar://<host>:<port>`.<br><br>If there are multiple listeners, separate them with commas.<br><br>**Note**: do not use this configuration with `advertisedAddress` and `brokerServicePort`. If the value of this configuration is empty, the broker uses `advertisedAddress` and `brokerServicePort`|/|
+|internalListenerName|Specify the internal listener name for the broker.<br><br>**Note**: the listener name must be contained in `advertisedListeners`.<br><br> If the value of this configuration is empty, the broker uses the first listener as the internal listener.|/|
 |enablePersistentTopics|  Whether persistent topics are enabled on the broker |true|
 |enableNonPersistentTopics| Whether non-persistent topics are enabled on the broker |true|
 |functionsWorkerEnabled|  Whether the Pulsar Functions worker service is enabled in the broker  |false|