You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 02:18:32 UTC

[pulsar] branch branch-2.9 updated (8e68926 -> ac932d5)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 8e68926  add missed import (#13037)
     new ea79a8b  fix(functions): missing runtime set in GoInstanceConfig (#13031)
     new dfcdacf  Don't attempt to delete pending ack store unless transactions are enabled (#13041)
     new 76b424f  pulsar admin exposes secret for source and sink (#13059)
     new 112d9ee  [ElasticSearch Sink] Correct @FieldDoc defaultValue for some fields: primaryFields,maxRetries,indexNumberOfReplicas,createIndexIfNeeded (#12697)
     new c5da572  [Transaction] Add a check for uninitialized PendingAck (#13088)
     new 6dea985  Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)
     new 0e0de02  [Issue #12486][Python Client]JsonSchema encoding is not idempotent (#12490)
     new b0d7960  [Proxy] Fix issue when Proxy fails to start and logs about an uncaught exception (#13171)
     new 7cce3f9  Update cursor last active timestamp when reseting cursor  (#13166)
     new 8d8da1b  Fix in macOS cmake might find error boost-python libs path (#13193)
     new 95c2ab3  [Transaction] Fix transaction sequenceId generate error. (#13209)
     new 2457ae9  [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
     new f89de87  Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)
     new 9daba91  Fix when deleting topic with NotFoundException, do not return to client (#13203)
     new d9da677  Fix MessagePayloadContextImpl not recycled (#13233)
     new ac932d5  [Transaction] Fix generate transactionId some comment. (#13234)

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  14 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  42 ++
 .../apache/pulsar/broker/admin/AdminResource.java  |   8 +
 .../broker/admin/impl/PersistentTopicsBase.java    |   5 +-
 .../intercept/ManagedLedgerInterceptorImpl.java    |   4 +-
 .../broker/loadbalance/impl/ThresholdShedder.java  |  22 +-
 .../service/persistent/PersistentSubscription.java |   4 +
 .../broker/service/persistent/PersistentTopic.java |  43 +-
 .../stats/prometheus/TransactionAggregator.java    |   4 +-
 .../transaction/pendingack/PendingAckHandle.java   |   5 +
 .../pendingack/impl/PendingAckHandleDisabled.java  |   5 +
 .../pendingack/impl/PendingAckHandleImpl.java      |   7 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  32 +
 .../broker/stats/TransactionMetricsTest.java       |  68 ++
 .../pulsar/broker/transaction/TransactionTest.java |  13 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |  12 +-
 pulsar-client-cpp/python/CMakeLists.txt            |  17 +-
 pulsar-client-cpp/python/pulsar/schema/schema.py   |  15 +-
 pulsar-client-cpp/python/pulsar_test.py            | 787 ++++++++++-----------
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  12 +
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  13 +
 .../client/impl/MessagePayloadContextImpl.java     |   1 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   3 +
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |   3 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |   2 +-
 .../io/elasticsearch/ElasticSearchConfig.java      |   8 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     |   7 +-
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   1 +
 .../coordinator/impl/MLTransactionLogImpl.java     |  41 --
 .../impl/MLTransactionLogInterceptor.java          |  63 --
 .../impl/MLTransactionMetadataStore.java           |  37 +-
 .../impl/MLTransactionMetadataStoreProvider.java   |   6 +-
 .../impl/MLTransactionSequenceIdGenerator.java     | 111 +++
 .../MLTransactionMetadataStoreTest.java            |  71 +-
 35 files changed, 870 insertions(+), 618 deletions(-)
 delete mode 100644 pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
 create mode 100644 pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java

[pulsar] 06/16: Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6dea985c58b11d806e2a1f8659ae84bce5550932
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Tue Dec 7 21:31:10 2021 +0800

    Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBundle (#13159)
    
    Co-authored-by: wuzhanpeng <wu...@bigo.sg>
    (cherry picked from commit 4b319f38256d586bf179ac8df9f401709b128b15)
---
 .../apache/pulsar/client/api/BrokerServiceLookupTest.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 251ccd3..6628cf6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -654,15 +654,18 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
             conf2.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
             conf2.setZookeeperServers("localhost:2181");
             conf2.setConfigurationStoreServers("localhost:3181");
-
-            @Cleanup
-            PulsarService pulsar2 = startBroker(conf2);
+            conf2.setLoadBalancerAutoBundleSplitEnabled(true);
+            conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
+            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
 
             // configure broker-1 with ModularLoadManager
             stopBroker();
             conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
             startBroker();
 
+            @Cleanup
+            PulsarService pulsar2 = startBroker(conf2);
+
             pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
             pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();
 
@@ -732,9 +735,6 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
                     .getLoadManager().get()).getLoadManager();
 
             updateAllMethod.invoke(loadManager);
-            conf2.setLoadBalancerAutoBundleSplitEnabled(true);
-            conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
-            conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
             loadManager.checkNamespaceBundleSplit();
 
             // (6) Broker-2 should get the watch and update bundle cache

[pulsar] 15/16: Fix MessagePayloadContextImpl not recycled (#13233)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9da67710a4cc623cc080244ca91fceef3ea426a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Sat Dec 11 00:09:19 2021 +0800

    Fix MessagePayloadContextImpl not recycled (#13233)
    
    (cherry picked from commit 0ce155ea3420306a5f9c9d3ff22f4c0d92e5ec71)
---
 .../java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
index aa6cab8..f219003 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagePayloadContextImpl.java
@@ -88,6 +88,7 @@ public class MessagePayloadContextImpl implements MessagePayloadContext {
             ackBitSet.recycle();
             ackBitSet = null;
         }
+        recyclerHandle.recycle(this);
     }
 
     @Override

[pulsar] 04/16: [ElasticSearch Sink] Correct @FieldDoc defaultValue for some fields: primaryFields, maxRetries, indexNumberOfReplicas, createIndexIfNeeded (#12697)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 112d9ee8ab0563816bb5775b2ca7ccd3cc3b8bf5
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Thu Dec 2 16:27:33 2021 +0100

    [ElasticSearch Sink] Correct @FieldDoc defaultValue for some fields: primaryFields,maxRetries,indexNumberOfReplicas,createIndexIfNeeded (#12697)
    
    (cherry picked from commit 5baa2e0e16c0487cf2f160a739f5cb18dc420fe6)
---
 .../org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 7dbfd03..dc6d0d4 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -79,14 +79,14 @@ public class ElasticSearchConfig implements Serializable {
 
     @FieldDoc(
             required = false,
-            defaultValue = "true",
+            defaultValue = "false",
             help = "Create the index if it does not exist"
     )
     private boolean createIndexIfNeeded = false;
 
     @FieldDoc(
         required = false,
-        defaultValue = "1",
+        defaultValue = "0",
         help = "The number of replicas of the index"
     )
     private int indexNumberOfReplicas = 0;
@@ -109,7 +109,7 @@ public class ElasticSearchConfig implements Serializable {
 
     @FieldDoc(
             required = false,
-            defaultValue = "-1",
+            defaultValue = "1",
             help = "The maximum number of retries for elasticsearch requests. Use -1 to disable it."
     )
     private int maxRetries = 1;
@@ -216,7 +216,7 @@ public class ElasticSearchConfig implements Serializable {
 
     @FieldDoc(
             required = false,
-            defaultValue = "id",
+            defaultValue = "",
             help = "The comma separated ordered list of field names used to build the Elasticsearch document _id from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated _id is a string representation of a JSON array of the field values."
     )
     private String primaryFields = "";

[pulsar] 11/16: [Transaction] Fix transaction sequenceId generate error. (#13209)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 95c2ab34677c2b3a4418242d504b734b8c91967b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Dec 10 16:41:49 2021 +0800

    [Transaction] Fix transaction sequenceId generate error. (#13209)
    
    (cherry picked from commit 0994254c24be4aec8fef810bcdbf62e06453fe53)
---
 .../pulsar/broker/transaction/TransactionTest.java |  9 ++-
 .../coordinator/impl/MLTransactionLogImpl.java     | 41 ------------
 .../impl/MLTransactionLogInterceptor.java          | 63 +++++++++++++++----
 .../impl/MLTransactionMetadataStore.java           | 24 ++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  6 +-
 .../MLTransactionMetadataStoreTest.java            | 72 +++++++++++++++-------
 6 files changed, 125 insertions(+), 90 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index e4975d9..07af304 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -521,7 +521,8 @@ public class TransactionTest extends TransactionTestBase {
                     null);
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
-
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog =
                 new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
                         persistentTopic.getManagedLedger().getConfig());
@@ -540,7 +541,8 @@ public class TransactionTest extends TransactionTestBase {
         doNothing().when(timeoutTracker).start();
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -553,7 +555,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
+                        mlTransactionLogInterceptor.getSequenceId());
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 2d11d98..e154bb8 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -68,15 +68,11 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final TopicName topicName;
 
-    private final MLTransactionLogInterceptor mlTransactionLogInterceptor;
-
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
                                 ManagedLedgerConfig managedLedgerConfig) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
-        this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
         this.entryQueue = new SpscArrayQueue<>(2000);
@@ -161,7 +157,6 @@ public class MLTransactionLogImpl implements TransactionLog {
             @Override
             public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                 buf.release();
-                mlTransactionLogInterceptor.setMaxLocalTxnId(transactionMetadataEntry.getMaxLocalTxnId());
                 completableFuture.complete(position);
             }
 
@@ -242,42 +237,6 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
     }
 
-    public CompletableFuture<Long> getMaxLocalTxnId() {
-
-        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
-        PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();
-
-        if (position != null && position.getEntryId() != -1
-                && ((ManagedLedgerImpl) managedLedger).ledgerExists(position.getLedgerId())) {
-            ((ManagedLedgerImpl) this.managedLedger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
-                    ByteBuf buffer = entry.getDataBuffer();
-                    lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
-                    completableFuture.complete(lastConfirmEntry.getMaxLocalTxnId());
-                }
-
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                    log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail!", topicName, exception);
-                    completableFuture.completeExceptionally(exception);
-                }
-            }, null);
-        } else if (managedLedger.getProperties()
-                .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID) != null) {
-            completableFuture.complete(Long.parseLong(managedLedger.getProperties()
-                    .get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID)));
-        } else {
-            log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail! "
-                    + "not found MaxLocalTxnId in managedLedger and properties", topicName);
-            completableFuture.completeExceptionally(new ManagedLedgerException(topicName
-                    + "MLTransactionLog recover MaxLocalTxnId fail! "
-                    + "not found MaxLocalTxnId in managedLedger and properties"));
-        }
-        return completableFuture;
-    }
-
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
index e97b104..68add4a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
@@ -18,14 +18,18 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.buffer.ByteBuf;
+import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
 import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
+import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
@@ -33,31 +37,68 @@ import java.util.concurrent.CompletableFuture;
 public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
 
     private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-
-    private volatile long maxLocalTxnId = -1;
+    @Getter
+    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
     public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
-        return null;
+        return op;
     }
 
+    // When all of ledger have been deleted, we will generate sequenceId from managedLedger properties
     @Override
     public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
+        if (propertiesMap == null || propertiesMap.size() == 0) {
+            return;
+        }
 
+        if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
+            sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
+        }
     }
 
+    // When we don't roll over ledger, we can init sequenceId from the getLastAddConfirmed transaction metadata entry
     @Override
-    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {
-        return CompletableFuture.completedFuture(null);
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        if (lh.getLastAddConfirmed() >= 0) {
+            lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
+                                ByteBuf buffer = ledgerEntry.getEntryBuffer();
+                                lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
+                                this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the tc sequenceId from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
+                        }
+                    } else {
+                        promise.complete(null);
+                    }
+                }
+            });
+        } else {
+            promise.complete(null);
+        }
+        return promise;
     }
 
+    // roll over ledger will update sequenceId to managedLedger properties
     @Override
     public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
-        propertiesMap.put(MAX_LOCAL_TXN_ID, maxLocalTxnId + "");
-    }
-
-    protected void setMaxLocalTxnId(long maxLocalTxnId) {
-        this.maxLocalTxnId = maxLocalTxnId;
+        propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
     }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 05faaad..6ef4f17 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -60,9 +60,8 @@ public class MLTransactionMetadataStore
     private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
 
     private final TransactionCoordinatorID tcID;
-    private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
+    private final AtomicLong sequenceId;
     private final MLTransactionLogImpl transactionLog;
-    private static final long TC_ID_NOT_USED = -1L;
     private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
     private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -75,8 +74,10 @@ public class MLTransactionMetadataStore
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
-                                      TransactionRecoverTracker recoverTracker) {
+                                      TransactionRecoverTracker recoverTracker,
+                                      AtomicLong sequenceId) {
         super(State.None);
+        this.sequenceId = sequenceId;
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
         this.timeoutTracker = timeoutTracker;
@@ -96,16 +97,13 @@ public class MLTransactionMetadataStore
 
             @Override
             public void replayComplete() {
-                mlTransactionLog.getMaxLocalTxnId().thenAccept(id -> {
-                    recoverTracker.appendOpenTransactionToTimeoutTracker();
-                    sequenceId.set(id);
-                    if (!changeToReadyState()) {
-                        log.error("Managed ledger transaction metadata store change state error when replay complete");
-                    } else {
-                        recoverTracker.handleCommittingAndAbortingTransaction();
-                        timeoutTracker.start();
-                    }
-                });
+                recoverTracker.appendOpenTransactionToTimeoutTracker();
+                if (!changeToReadyState()) {
+                    log.error("Managed ledger transaction metadata store change state error when replay complete");
+                } else {
+                    recoverTracker.handleCommittingAndAbortingTransaction();
+                    timeoutTracker.start();
+                }
             }
 
             @Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 36b1958..3f20cbc 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -45,10 +45,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker) {
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
+        // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
-                new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker));
+                new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
+                        recoverTracker, mlTransactionLogInterceptor.getSequenceId()));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index e9b3e0c..03aa1be 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -41,6 +42,7 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -65,12 +67,16 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -122,13 +128,13 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         }
     }
 
-    @DataProvider(name = "isUseManagedLedger")
+    @DataProvider(name = "isUseManagedLedgerProperties")
     public Object[][] versions() {
         return new Object[][] { { true }, { false } };
     }
 
-    @Test(dataProvider = "isUseManagedLedger")
-    public void testRecoverSequenceId(boolean isUseManagedLedger) throws Exception {
+    @Test(dataProvider = "isUseManagedLedgerProperties")
+    public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws Exception {
         ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
         factoryConf.setMaxCacheSize(0);
 
@@ -136,18 +142,21 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
         transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTING, TxnStatus.OPEN, false).get();
-        if (isUseManagedLedger) {
+        if (isUseManagedLedgerProperties) {
             transactionMetadataStore.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
         }
         assertEquals(txnID.getLeastSigBits(), 0);
@@ -155,16 +164,20 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         field.setAccessible(true);
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
         Position position = managedLedger.getLastConfirmedEntry();
-
-        if (isUseManagedLedger) {
+        if (isUseManagedLedgerProperties) {
             Awaitility.await().until(() -> {
                 managedLedger.rollCurrentLedgerIfFull();
                 return !managedLedger.ledgerExists(position.getLedgerId());
             });
         }
+        mlTransactionLog.closeAsync().get();
+        mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                managedLedgerConfig);
+        mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -181,12 +194,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -224,11 +240,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 transactionMetadataStore.closeAsync();
 
                 MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                        new ManagedLedgerConfig());
+                        managedLedgerConfig);
                 txnLog2.initialize().join();
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
-                                txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                                txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                                mlTransactionLogInterceptor.getSequenceId());
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -288,12 +305,16 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -351,12 +372,16 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -370,11 +395,12 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore.updateTxnStatus(txnID2, TxnStatus.ABORTED, TxnStatus.ABORTING, false).get();
 
         mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -387,12 +413,16 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                new ManagedLedgerConfig());
+                managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        mlTransactionLogInterceptor.getSequenceId());
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();

[pulsar] 03/16: pulsar admin exposes secret for source and sink (#13059)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 76b424f35ee8ff390608b366e49e64dc79a82112
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Wed Dec 1 18:33:56 2021 -0800

    pulsar admin exposes secret for source and sink (#13059)
    
    ### Motivation
    Follow-up fix of #12950 for #12834
    
    It turns out the Source and Sink doesn't inherit from Function cmd, so we need to add the api separately.
    
    ### Modifications
    
    add the `--secrets` argument into `pulsar-admin [source|sink] create/update/localrun` command
    
    (cherry picked from commit e888c2980f61428650779a8d23fe707bb61a31a1)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java | 12 ++++++++++++
 .../main/java/org/apache/pulsar/admin/cli/CmdSources.java   | 13 +++++++++++++
 2 files changed, 25 insertions(+)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0035ff4..5d00627 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -359,6 +360,8 @@ public class CmdSinks extends CmdBase {
         protected Long negativeAckRedeliveryDelayMs;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
+        @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates how the secret is fetched by the underlying secrets provider")
+        protected String secretsString;
 
         protected SinkConfig sinkConfig;
 
@@ -524,6 +527,15 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setCustomRuntimeOptions(customRuntimeOptions);
             }
 
+            if (secretsString != null) {
+                Type type = new TypeToken<Map<String, Object>>() {}.getType();
+                Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
+                if (secretsMap == null) {
+                    secretsMap = Collections.emptyMap();
+                }
+                sinkConfig.setSecrets(secretsMap);
+            }
+
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f78feb0..1eedf65 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -38,6 +38,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -339,6 +340,8 @@ public class CmdSources extends CmdBase {
         protected String batchSourceConfigString;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
+        @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates how the secret is fetched by the underlying secrets provider")
+        protected String secretsString;
 
         protected SourceConfig sourceConfig;
 
@@ -463,6 +466,16 @@ public class CmdSources extends CmdBase {
             if (customRuntimeOptions != null) {
                 sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
             }
+
+            if (secretsString != null) {
+                Type type = new TypeToken<Map<String, Object>>() {}.getType();
+                Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
+                if (secretsMap == null) {
+                    secretsMap = Collections.emptyMap();
+                }
+                sourceConfig.setSecrets(secretsMap);
+            }
+            
             // check if source configs are valid
             validateSourceConfigs(sourceConfig);
         }

[pulsar] 01/16: fix(functions): missing runtime set in GoInstanceConfig (#13031)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea79a8b75218bd6176b25b8197016a11a3f54351
Author: Eric Shen <er...@outlook.com>
AuthorDate: Tue Nov 30 23:29:30 2021 -0600

    fix(functions): missing runtime set in GoInstanceConfig (#13031)
    
    * fix(functions): missing runtime set in GoInstanceConfig
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * fix ci ut
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * fix test ci
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    
    * rollback some change in function-go
    
    Signed-off-by: Eric Shen <er...@outlook.com>
    (cherry picked from commit aa992e843581b65c854a0f97353f68ab0170b576)
---
 .../main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java    | 3 +++
 .../java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java     | 3 ++-
 .../pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java     | 2 +-
 3 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 4acbd35..4e4e2dc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -173,6 +173,9 @@ public class RuntimeUtils {
         if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
             goInstanceConfig.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
         }
+        if (instanceConfig.getFunctionDetails().getRuntime() != null) {
+            goInstanceConfig.setRuntime(instanceConfig.getFunctionDetails().getRuntimeValue());
+        }
         if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
             goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index f8bbbc4..bc00776 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -99,6 +99,7 @@ public class RuntimeUtilsTest {
                 .setName("go-func")
                 .setLogTopic("go-func-log")
                 .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
+                .setRuntime(Function.FunctionDetails.Runtime.GO)
                 .setSecretsMap(secretsMap.toJSONString())
                 .setParallelism(1)
                 .setSource(sources)
@@ -137,7 +138,7 @@ public class RuntimeUtilsTest {
         Assert.assertEquals(goInstanceConfig.get("autoAck"), true);
         Assert.assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
         Assert.assertEquals(goInstanceConfig.get("pulsarServiceURL"), "pulsar://localhost:6650");
-        Assert.assertEquals(goInstanceConfig.get("runtime"), 0);
+        Assert.assertEquals(goInstanceConfig.get("runtime"), 3);
         Assert.assertEquals(goInstanceConfig.get("cpu"), 2.0);
         Assert.assertEquals(goInstanceConfig.get("funcID"), "func-7734");
         Assert.assertEquals(goInstanceConfig.get("funcVersion"), "1.0.0");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 7f6c36a..b295cf8 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -876,7 +876,7 @@ public class KubernetesRuntimeTest {
         assertEquals(goInstanceConfig.get("autoAck"), false);
         assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
         assertEquals(goInstanceConfig.get("pulsarServiceURL"), pulsarServiceUrl);
-        assertEquals(goInstanceConfig.get("runtime"), 0);
+        assertEquals(goInstanceConfig.get("runtime"), 3);
         assertEquals(goInstanceConfig.get("cpu"), 1.0);
         assertEquals(goInstanceConfig.get("funcVersion"), "1.0");
         assertEquals(goInstanceConfig.get("disk"), 10000);

[pulsar] 10/16: Fix in macOS cmake might find error boost-python libs path (#13193)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d8da1bbf69723ddde7044db80ebeed0f79b5901
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Fri Dec 10 01:25:05 2021 +0800

    Fix in macOS cmake might find error boost-python libs path (#13193)
    
    (cherry picked from commit fdfea8af64a1a9e5c259238ca9ec681ec41fb108)
---
 pulsar-client-cpp/python/CMakeLists.txt | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt
index f7d4069..30631cd 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -72,11 +72,18 @@ set(PYTHON_WRAPPER_LIBS ${Boost_PYTHON_LIBRARY}
                         ${Boost_PYTHON39_LIBRARY})
 
 if (APPLE)
-    set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS}
-                            ${Boost_PYTHON27-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON37-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON38-MT_LIBRARY_RELEASE}
-                            ${Boost_PYTHON39-MT_LIBRARY_RELEASE})
+    if (Boost_PYTHON27-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON27-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON37-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON37-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON38-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON38-MT_LIBRARY_RELEASE})
+    endif ()
+    if (Boost_PYTHON39-MT_LIBRARY_RELEASE)
+        set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} ${Boost_PYTHON39-MT_LIBRARY_RELEASE})
+    endif ()
 endif()
 
 message(STATUS "Using Boost Python libs: ${PYTHON_WRAPPER_LIBS}")

[pulsar] 13/16: Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f89de87cb1ae672103e1ef4035bbc92826613406
Author: Hang Chen <ch...@apache.org>
AuthorDate: Sat Dec 11 12:31:50 2021 +0800

    Use current resourceUsage value as historyUsage when leader change in ThresholdShedder (#13136)
    
    ### Motivation
    Fix #13119
    
    ### Modification
    1. User current resourceUsage value as historyUsage value when leader change in ThresholdShedder to speed up getting the actual historyUsage value.
    
    (cherry picked from commit 6d9d24d50db5418ddbb845d2c7a2be2b9ac72893)
---
 .../broker/loadbalance/impl/ThresholdShedder.java  | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 9c89be9..3e10326 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -51,13 +51,9 @@ import org.slf4j.LoggerFactory;
  */
 public class ThresholdShedder implements LoadSheddingStrategy {
     private static final Logger log = LoggerFactory.getLogger(ThresholdShedder.class);
-
     private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
-
     private static final double ADDITIONAL_THRESHOLD_PERCENT_MARGIN = 0.05;
-
     private static final double MB = 1024 * 1024;
-
     private final Map<String, Double> brokerAvgResourceUsage = new HashMap<>();
 
     @Override
@@ -153,25 +149,27 @@ public class ThresholdShedder implements LoadSheddingStrategy {
         for (Map.Entry<String, BrokerData> entry : loadData.getBrokerData().entrySet()) {
             LocalBrokerData localBrokerData = entry.getValue().getLocalData();
             String broker = entry.getKey();
-            updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
-            totalUsage += brokerAvgResourceUsage.getOrDefault(broker, 0.0);
+            totalUsage += updateAvgResourceUsage(broker, localBrokerData, historyPercentage, conf);
             totalBrokers++;
         }
 
         return totalBrokers > 0 ? totalUsage / totalBrokers : 0;
     }
 
-    private void updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData, final double historyPercentage,
-                                        final ServiceConfiguration conf) {
-        double historyUsage =
-                brokerAvgResourceUsage.getOrDefault(broker, 0.0);
-        historyUsage = historyUsage * historyPercentage
-                + (1 - historyPercentage) * localBrokerData.getMaxResourceUsageWithWeight(
+    private double updateAvgResourceUsage(String broker, LocalBrokerData localBrokerData,
+                                          final double historyPercentage, final ServiceConfiguration conf) {
+        Double historyUsage =
+                brokerAvgResourceUsage.get(broker);
+        double resourceUsage = localBrokerData.getMaxResourceUsageWithWeight(
                 conf.getLoadBalancerCPUResourceWeight(),
                 conf.getLoadBalancerMemoryResourceWeight(), conf.getLoadBalancerDirectMemoryResourceWeight(),
                 conf.getLoadBalancerBandwithInResourceWeight(),
                 conf.getLoadBalancerBandwithOutResourceWeight());
+        historyUsage = historyUsage == null
+                ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage;
+
         brokerAvgResourceUsage.put(broker, historyUsage);
+        return historyUsage;
     }
 
 }

[pulsar] 05/16: [Transaction] Add a check for uninitialized PendingAck (#13088)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c5da572ea40bdd20f1e5ada6a3ad8a6a60183e89
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Dec 6 22:00:38 2021 +0800

    [Transaction] Add a check for uninitialized PendingAck (#13088)
    
    ### Motivation
    
    We shoud not generate the statistics of a uninitialized PendingAck,and we should check if it is initialized when we get it by `getStoreManageLedger()`.
    ### Modifications
     Shoud not generate the statistics of a uninitialized PendingAck
     Add check if it is initialized when we get it by `getStoreManageLedger()`.
    
    (cherry picked from commit 591b4e80a7652ed608c04b769052744179473f0a)
---
 .../service/persistent/PersistentSubscription.java |  4 ++
 .../stats/prometheus/TransactionAggregator.java    |  4 +-
 .../transaction/pendingack/PendingAckHandle.java   |  5 ++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  5 ++
 .../pendingack/impl/PendingAckHandleImpl.java      |  7 ++-
 .../broker/stats/TransactionMetricsTest.java       | 68 ++++++++++++++++++++++
 6 files changed, 91 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index acfd9ee..8d75ea7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1219,5 +1219,9 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    public boolean checkIfPendingAckStoreInit() {
+        return this.pendingAckHandle.checkIfPendingAckStoreInit();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 142ec48..65399d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -64,7 +64,9 @@ public class TransactionAggregator {
                             topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
                                     localManageLedgerStats.get().reset();
-                                    if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
+                                    if (!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))
+                                            && subscription instanceof  PersistentSubscription
+                                            && ((PersistentSubscription) subscription).checkIfPendingAckStoreInit()) {
                                         ManagedLedger managedLedger =
                                                 ((PersistentSubscription) subscription)
                                                         .getPendingAckManageLedger().get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index 3664c5d..dc64cbe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -159,4 +159,9 @@ public interface PendingAckHandle {
      */
     CompletableFuture<Void> close();
 
+    /**
+     * Check if the PendingAckStore is init.
+     * @return if the PendingAckStore is init.
+     */
+    boolean checkIfPendingAckStoreInit();
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index cf6b5c8..634655e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -99,4 +99,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
     public CompletableFuture<Void> close() {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public boolean checkIfPendingAckStoreInit() {
+        return false;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 78bab96..d92793a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -923,7 +923,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     }
 
     public CompletableFuture<ManagedLedger> getStoreManageLedger() {
-        if (this.pendingAckStoreFuture.isDone()) {
+        if (this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone()) {
             return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
                 if (pendingAckStore instanceof MLPendingAckStore) {
                     return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
@@ -937,6 +937,11 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         }
     }
 
+    @Override
+    public boolean checkIfPendingAckStoreInit() {
+        return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
+    }
+
     protected void handleCacheRequest() {
         while (true) {
             Runnable runnable = acceptQueue.poll();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index cb8e430..6a4b5c4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -40,6 +40,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -271,6 +272,73 @@ public class TransactionMetricsTest extends BrokerTestBase {
         assertEquals(metric.size(), 2);
     }
 
+    @Test
+    public void testManagedLedgerMetricsWhenPendingAckNotInit() throws Exception{
+        String ns1 = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns1);
+        String topic = "persistent://" + ns1 + "/testManagedLedgerMetricsWhenPendingAckNotInit";
+        String subName = "test_managed_ledger_metrics";
+        String subName2 = "test_pending_ack_no_init";
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0);
+        pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
+        admin.topics().createSubscription(topic, subName, MessageId.earliest);
+        admin.topics().createSubscription(topic, subName2, MessageId.earliest);
+
+        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+                pulsar.getTransactionMetadataStoreService().getStores().size() == 1);
+
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .receiverQueueSize(10)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        Transaction transaction =
+                pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        producer.send("hello pulsar".getBytes());
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+
+        Multimap<String, PrometheusMetricsTest.Metric> metrics = parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> metric = metrics.get("pulsar_storage_size");
+        checkManagedLedgerMetrics(subName, 32, metric);
+        //No statistics of the pendingAck are generated when the pendingAck is not initialized.
+        for (PrometheusMetricsTest.Metric metric1 : metric) {
+            if (metric1.tags.containsValue(subName2)) {
+                Assert.fail();
+            }
+        }
+
+        consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .receiverQueueSize(10)
+                .subscriptionName(subName2)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+        transaction =
+                pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
+
+        statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        metricsStr = statsOut.toString();
+        metrics = parseMetrics(metricsStr);
+        metric = metrics.get("pulsar_storage_size");
+        checkManagedLedgerMetrics(subName2, 32, metric);
+    }
+
     private void checkManagedLedgerMetrics(String tag, double value, Collection<PrometheusMetricsTest.Metric> metrics) {
         boolean exist = false;
         for (PrometheusMetricsTest.Metric metric1 : metrics) {

[pulsar] 07/16: [Issue #12486][Python Client]JsonSchema encoding is not idempotent (#12490)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0e0de02ca80d19048065c5f5d94905b289b539ec
Author: Travis Sturzl <tr...@toro.com>
AuthorDate: Tue Dec 7 10:08:50 2021 -0700

    [Issue #12486][Python Client]JsonSchema encoding is not idempotent (#12490)
    
    * fix JsonSchema, copy data out to prevent modifying the reference object,
    check keys before deleting them
    
    * add unit test, but cannot test due to compilation failure for cpp lib
    
    (cherry picked from commit 2df53da3126d8e0471aa96cadc102254fff286f5)
---
 pulsar-client-cpp/python/pulsar/schema/schema.py |  15 +-
 pulsar-client-cpp/python/pulsar_test.py          | 787 +++++++++++------------
 2 files changed, 402 insertions(+), 400 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index 083efc3..349087e 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -85,11 +85,16 @@ class JsonSchema(Schema):
 
     def encode(self, obj):
         self._validate_object_type(obj)
-        del obj.__dict__['_default']
-        del obj.__dict__['_required']
-        del obj.__dict__['_required_default']
-
-        return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
+        # Copy the dict of the object as to not modify the provided object via the reference provided
+        data = obj.__dict__.copy()
+        if '_default' in data:
+            del data['_default']
+        if '_required' in data:
+            del data['_required']
+        if '_required_default' in data:
+            del data['_required_default']
+
+        return json.dumps(data, default=self._get_serialized_value, indent=True).encode('utf-8')
 
     def decode(self, data):
         return self._record_cls(**json.loads(data))
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 8db53bd..fd3656b 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -25,10 +25,19 @@ import os
 import pulsar
 import uuid
 from datetime import timedelta
-from pulsar import Client, MessageId, \
-            CompressionType, ConsumerType, PartitionsRoutingMode, \
-            AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
-            CryptoKeyReader
+from pulsar import (
+    Client,
+    MessageId,
+    CompressionType,
+    ConsumerType,
+    PartitionsRoutingMode,
+    AuthenticationTLS,
+    Authentication,
+    AuthenticationToken,
+    InitialPosition,
+    CryptoKeyReader,
+)
+from pulsar.schema import JsonSchema, Record, Integer
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
@@ -46,19 +55,19 @@ TM = 10000  # Do not wait forever in tests
 
 def doHttpPost(url, data):
     req = Request(url, data.encode())
-    req.add_header('Content-Type', 'application/json')
+    req.add_header("Content-Type", "application/json")
     urlopen(req)
 
 
 def doHttpPut(url, data):
     try:
         req = Request(url, data.encode())
-        req.add_header('Content-Type', 'application/json')
-        req.get_method = lambda: 'PUT'
+        req.add_header("Content-Type", "application/json")
+        req.get_method = lambda: "PUT"
         urlopen(req)
     except Exception as ex:
         # ignore conflicts exception to have test idempotency
-        if '409' in str(ex):
+        if "409" in str(ex):
             pass
         else:
             raise ex
@@ -66,16 +75,21 @@ def doHttpPut(url, data):
 
 def doHttpGet(url):
     req = Request(url)
-    req.add_header('Accept', 'application/json')
+    req.add_header("Accept", "application/json")
     return urlopen(req).read()
 
 
+class TestRecord(Record):
+    a = Integer()
+    b = Integer()
+
+
 class PulsarTest(TestCase):
 
-    serviceUrl = 'pulsar://localhost:6650'
-    adminUrl = 'http://localhost:8080'
+    serviceUrl = "pulsar://localhost:6650"
+    adminUrl = "http://localhost:8080"
 
-    serviceUrlTls = 'pulsar+ssl://localhost:6651'
+    serviceUrlTls = "pulsar+ssl://localhost:6651"
 
     def test_producer_config(self):
         conf = ProducerConfiguration()
@@ -95,7 +109,7 @@ class PulsarTest(TestCase):
         conf.consumer_type(ConsumerType.Shared)
         self.assertEqual(conf.consumer_type(), ConsumerType.Shared)
 
-        self.assertEqual(conf.consumer_name(), '')
+        self.assertEqual(conf.consumer_name(), "")
         conf.consumer_name("my-name")
         self.assertEqual(conf.consumer_name(), "my-name")
 
@@ -105,8 +119,8 @@ class PulsarTest(TestCase):
 
     def test_connect_error(self):
         with self.assertRaises(pulsar.ConnectError):
-            client = Client('fakeServiceUrl')
-            client.create_producer('connect-error-topic')
+            client = Client("fakeServiceUrl")
+            client.create_producer("connect-error-topic")
             client.close()
 
     def test_exception_inheritance(self):
@@ -115,23 +129,23 @@ class PulsarTest(TestCase):
 
     def test_simple_producer(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-topic')
-        producer.send(b'hello')
+        producer = client.create_producer("my-python-topic")
+        producer.send(b"hello")
         producer.close()
         client.close()
 
     def test_producer_send_async(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-topic')
+        producer = client.create_producer("my-python-topic")
 
         sent_messages = []
 
         def send_callback(producer, msg):
             sent_messages.append(msg)
 
-        producer.send_async(b'hello', send_callback)
-        producer.send_async(b'hello', send_callback)
-        producer.send_async(b'hello', send_callback)
+        producer.send_async(b"hello", send_callback)
+        producer.send_async(b"hello", send_callback)
+        producer.send_async(b"hello", send_callback)
 
         i = 0
         while len(sent_messages) < 3 and i < 100:
@@ -142,28 +156,26 @@ class PulsarTest(TestCase):
 
     def test_producer_send(self):
         client = Client(self.serviceUrl)
-        topic = 'test_producer_send'
+        topic = "test_producer_send"
         producer = client.create_producer(topic)
-        consumer = client.subscribe(topic, 'sub-name')
-        msg_id = producer.send(b'hello')
-        print('send to {}'.format(msg_id))
+        consumer = client.subscribe(topic, "sub-name")
+        msg_id = producer.send(b"hello")
+        print("send to {}".format(msg_id))
         msg = consumer.receive(TM)
         consumer.acknowledge(msg)
-        print('receive from {}'.format(msg.message_id()))
+        print("receive from {}".format(msg.message_id()))
         self.assertEqual(msg_id, msg.message_id())
         client.close()
 
     def test_producer_consumer(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-producer-consumer',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-producer-consumer')
-        producer.send(b'hello')
+        consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
+        producer = client.create_producer("my-python-topic-producer-consumer")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -173,12 +185,14 @@ class PulsarTest(TestCase):
 
     def test_redelivery_count(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-redelivery-count',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared,
-                                    negative_ack_redelivery_delay_ms=500)
-        producer = client.create_producer('my-python-topic-redelivery-count')
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "my-python-topic-redelivery-count",
+            "my-sub",
+            consumer_type=ConsumerType.Shared,
+            negative_ack_redelivery_delay_ms=500,
+        )
+        producer = client.create_producer("my-python-topic-redelivery-count")
+        producer.send(b"hello")
 
         redelivery_count = 0
         for i in range(4):
@@ -188,7 +202,7 @@ class PulsarTest(TestCase):
             redelivery_count = msg.redelivery_count()
 
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         self.assertEqual(3, redelivery_count)
         consumer.unsubscribe()
         producer.close()
@@ -196,12 +210,10 @@ class PulsarTest(TestCase):
 
     def test_deliver_at(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-deliver-at',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-deliver-at')
+        consumer = client.subscribe("my-python-topic-deliver-at", "my-sub", consumer_type=ConsumerType.Shared)
+        producer = client.create_producer("my-python-topic-deliver-at")
         # Delay message in 1.1s
-        producer.send(b'hello', deliver_at=int(round(time.time() * 1000)) + 1100)
+        producer.send(b"hello", deliver_at=int(round(time.time() * 1000)) + 1100)
 
         # Message should not be available in the next second
         with self.assertRaises(pulsar.Timeout):
@@ -210,19 +222,17 @@ class PulsarTest(TestCase):
         # Message should be published now
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         consumer.unsubscribe()
         producer.close()
         client.close()
 
     def test_deliver_after(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-deliver-after',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-deliver-after')
+        consumer = client.subscribe("my-python-topic-deliver-after", "my-sub", consumer_type=ConsumerType.Shared)
+        producer = client.create_producer("my-python-topic-deliver-after")
         # Delay message in 1.1s
-        producer.send(b'hello', deliver_after=timedelta(milliseconds=1100))
+        producer.send(b"hello", deliver_after=timedelta(milliseconds=1100))
 
         # Message should not be available in the next second
         with self.assertRaises(pulsar.Timeout):
@@ -231,33 +241,35 @@ class PulsarTest(TestCase):
         # Message should be published in the next 500ms
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         consumer.unsubscribe()
         producer.close()
         client.close()
 
     def test_consumer_initial_position(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('consumer-initial-position')
+        producer = client.create_producer("consumer-initial-position")
 
         # Sending 5 messages before consumer creation.
         # These should be received with initial_position set to Earliest but not with Latest.
         for i in range(5):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
-        consumer = client.subscribe('consumer-initial-position',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared,
-                                    initial_position=InitialPosition.Earliest)
+        consumer = client.subscribe(
+            "consumer-initial-position",
+            "my-sub",
+            consumer_type=ConsumerType.Shared,
+            initial_position=InitialPosition.Earliest,
+        )
 
         # Sending 5 other messages that should be received regardless of the initial_position.
         for i in range(5, 10):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
         for i in range(10):
             msg = consumer.receive(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -267,65 +279,59 @@ class PulsarTest(TestCase):
 
     def test_consumer_queue_size_is_zero(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-consumer-init-queue-size-is-zero',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared,
-                                    receiver_queue_size=0,
-                                    initial_position=InitialPosition.Earliest)
-        producer = client.create_producer('my-python-topic-consumer-init-queue-size-is-zero')
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "my-python-topic-consumer-init-queue-size-is-zero",
+            "my-sub",
+            consumer_type=ConsumerType.Shared,
+            receiver_queue_size=0,
+            initial_position=InitialPosition.Earliest,
+        )
+        producer = client.create_producer("my-python-topic-consumer-init-queue-size-is-zero")
+        producer.send(b"hello")
         time.sleep(0.1)
         msg = consumer.receive()
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         consumer.unsubscribe()
         client.close()
 
     def test_message_properties(self):
         client = Client(self.serviceUrl)
-        topic = 'my-python-test-message-properties'
-        consumer = client.subscribe(topic=topic,
-                                    subscription_name='my-subscription',
-                                    schema=pulsar.schema.StringSchema())
-        producer = client.create_producer(topic=topic,
-                                          schema=pulsar.schema.StringSchema())
-        producer.send('hello',
-                      properties={
-                          'a': '1',
-                          'b': '2'
-                      })
+        topic = "my-python-test-message-properties"
+        consumer = client.subscribe(
+            topic=topic, subscription_name="my-subscription", schema=pulsar.schema.StringSchema()
+        )
+        producer = client.create_producer(topic=topic, schema=pulsar.schema.StringSchema())
+        producer.send("hello", properties={"a": "1", "b": "2"})
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.value(), 'hello')
-        self.assertEqual(msg.properties(), {
-                          'a': '1',
-                          'b': '2'
-                      })
+        self.assertEqual(msg.value(), "hello")
+        self.assertEqual(msg.properties(), {"a": "1", "b": "2"})
 
         consumer.unsubscribe()
         client.close()
 
     def test_tls_auth(self):
-        certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
         if not os.path.exists(certs_dir):
             certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
-        client = Client(self.serviceUrlTls,
-                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
-                        tls_allow_insecure_connection=False,
-                        authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))
-
-        topic = 'my-python-topic-tls-auth-' + str(time.time())
-        consumer = client.subscribe(topic,
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
+        client = Client(
+            self.serviceUrlTls,
+            tls_trust_certs_file_path=certs_dir + "cacert.pem",
+            tls_allow_insecure_connection=False,
+            authentication=AuthenticationTLS(certs_dir + "client-cert.pem", certs_dir + "client-key.pem"),
+        )
+
+        topic = "my-python-topic-tls-auth-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
         producer = client.create_producer(topic)
-        producer.send(b'hello')
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -333,27 +339,27 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_tls_auth2(self):
-        certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
         if not os.path.exists(certs_dir):
             certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
         authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"
         authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, certs_dir)
 
-        client = Client(self.serviceUrlTls,
-                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
-                        tls_allow_insecure_connection=False,
-                        authentication=Authentication(authPlugin, authParams))
+        client = Client(
+            self.serviceUrlTls,
+            tls_trust_certs_file_path=certs_dir + "cacert.pem",
+            tls_allow_insecure_connection=False,
+            authentication=Authentication(authPlugin, authParams),
+        )
 
-        topic = 'my-python-topic-tls-auth-2-' + str(time.time())
-        consumer = client.subscribe(topic,
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
+        topic = "my-python-topic-tls-auth-2-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
         producer = client.create_producer(topic)
-        producer.send(b'hello')
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -365,25 +371,25 @@ class PulsarTest(TestCase):
         privateKeyPath = "/pulsar/pulsar-broker/src/test/resources/certificate/private-key.client-rsa.pem"
         crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
         client = Client(self.serviceUrl)
-        topic = 'my-python-test-end-to-end-encryption'
-        consumer = client.subscribe(topic=topic,
-                                    subscription_name='my-subscription',
-                                    crypto_key_reader=crypto_key_reader)
-        producer = client.create_producer(topic=topic,
-                                          encryption_key="client-rsa.pem",
-                                          crypto_key_reader=crypto_key_reader)
-        reader = client.create_reader(topic=topic,
-                                      start_message_id=MessageId.earliest,
-                                      crypto_key_reader=crypto_key_reader)
-        producer.send(b'hello')
+        topic = "my-python-test-end-to-end-encryption"
+        consumer = client.subscribe(
+            topic=topic, subscription_name="my-subscription", crypto_key_reader=crypto_key_reader
+        )
+        producer = client.create_producer(
+            topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
+        )
+        reader = client.create_reader(
+            topic=topic, start_message_id=MessageId.earliest, crypto_key_reader=crypto_key_reader
+        )
+        producer.send(b"hello")
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.value(), b'hello')
+        self.assertEqual(msg.value(), b"hello")
         consumer.unsubscribe()
 
         msg = reader.read_next(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             reader.read_next(100)
@@ -393,27 +399,27 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_tls_auth3(self):
-        certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
         if not os.path.exists(certs_dir):
             certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
         authPlugin = "tls"
         authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, certs_dir)
 
-        client = Client(self.serviceUrlTls,
-                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
-                        tls_allow_insecure_connection=False,
-                        authentication=Authentication(authPlugin, authParams))
+        client = Client(
+            self.serviceUrlTls,
+            tls_trust_certs_file_path=certs_dir + "cacert.pem",
+            tls_allow_insecure_connection=False,
+            authentication=Authentication(authPlugin, authParams),
+        )
 
-        topic = 'my-python-topic-tls-auth-3-' + str(time.time())
-        consumer = client.subscribe(topic,
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
+        topic = "my-python-topic-tls-auth-3-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
         producer = client.create_producer(topic)
-        producer.send(b'hello')
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -421,20 +427,20 @@ class PulsarTest(TestCase):
         client.close()
 
     def test_auth_junk_params(self):
-        certs_dir = '/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        certs_dir = "/pulsar/pulsar-broker/src/test/resources/authentication/tls/"
         if not os.path.exists(certs_dir):
             certs_dir = "../../pulsar-broker/src/test/resources/authentication/tls/"
         authPlugin = "someoldjunk.so"
         authParams = "blah"
-        client = Client(self.serviceUrlTls,
-                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
-                        tls_allow_insecure_connection=False,
-                        authentication=Authentication(authPlugin, authParams))
+        client = Client(
+            self.serviceUrlTls,
+            tls_trust_certs_file_path=certs_dir + "cacert.pem",
+            tls_allow_insecure_connection=False,
+            authentication=Authentication(authPlugin, authParams),
+        )
 
         with self.assertRaises(pulsar.ConnectError):
-            client.subscribe('my-python-topic-auth-junk-params',
-                             'my-sub',
-                             consumer_type=ConsumerType.Shared)
+            client.subscribe("my-python-topic-auth-junk-params", "my-sub", consumer_type=ConsumerType.Shared)
 
     def test_message_listener(self):
         client = Client(self.serviceUrl)
@@ -446,14 +452,13 @@ class PulsarTest(TestCase):
             received_messages.append(msg)
             consumer.acknowledge(msg)
 
-        client.subscribe('my-python-topic-listener',
-                         'my-sub',
-                         consumer_type=ConsumerType.Exclusive,
-                         message_listener=listener)
-        producer = client.create_producer('my-python-topic-listener')
-        producer.send(b'hello-1')
-        producer.send(b'hello-2')
-        producer.send(b'hello-3')
+        client.subscribe(
+            "my-python-topic-listener", "my-sub", consumer_type=ConsumerType.Exclusive, message_listener=listener
+        )
+        producer = client.create_producer("my-python-topic-listener")
+        producer.send(b"hello-1")
+        producer.send(b"hello-2")
+        producer.send(b"hello-3")
 
         time.sleep(0.1)
         self.assertEqual(len(received_messages), 3)
@@ -464,15 +469,14 @@ class PulsarTest(TestCase):
 
     def test_reader_simple(self):
         client = Client(self.serviceUrl)
-        reader = client.create_reader('my-python-topic-reader-simple',
-                                      MessageId.earliest)
+        reader = client.create_reader("my-python-topic-reader-simple", MessageId.earliest)
 
-        producer = client.create_producer('my-python-topic-reader-simple')
-        producer.send(b'hello')
+        producer = client.create_producer("my-python-topic-reader-simple")
+        producer.send(b"hello")
 
         msg = reader.read_next(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             reader.read_next(100)
@@ -482,21 +486,20 @@ class PulsarTest(TestCase):
 
     def test_reader_on_last_message(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-topic-reader-on-last-message')
+        producer = client.create_producer("my-python-topic-reader-on-last-message")
 
         for i in range(10):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
-        reader = client.create_reader('my-python-topic-reader-on-last-message',
-                                      MessageId.latest)
+        reader = client.create_reader("my-python-topic-reader-on-last-message", MessageId.latest)
 
         for i in range(10, 20):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
         for i in range(10, 20):
             msg = reader.read_next(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
 
         reader.close()
         client.close()
@@ -504,26 +507,21 @@ class PulsarTest(TestCase):
     def test_reader_on_specific_message(self):
         num_of_msgs = 10
         client = Client(self.serviceUrl)
-        producer = client.create_producer(
-            'my-python-topic-reader-on-specific-message')
+        producer = client.create_producer("my-python-topic-reader-on-specific-message")
 
         for i in range(num_of_msgs):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
-        reader1 = client.create_reader(
-                'my-python-topic-reader-on-specific-message',
-                MessageId.earliest)
+        reader1 = client.create_reader("my-python-topic-reader-on-specific-message", MessageId.earliest)
 
-        for i in range(num_of_msgs//2):
+        for i in range(num_of_msgs // 2):
             msg = reader1.read_next(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
             last_msg_id = msg.message_id()
             last_msg_idx = i
 
-        reader2 = client.create_reader(
-                'my-python-topic-reader-on-specific-message',
-                last_msg_id)
+        reader2 = client.create_reader("my-python-topic-reader-on-specific-message", last_msg_id)
 
         # The reset would be effectively done on the next position relative to reset.
         # When available, we should test this behaviour with `startMessageIdInclusive` opt.
@@ -531,7 +529,7 @@ class PulsarTest(TestCase):
         for i in range(from_msg_idx, num_of_msgs):
             msg = reader2.read_next(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
 
         reader1.close()
         reader2.close()
@@ -540,32 +538,29 @@ class PulsarTest(TestCase):
     def test_reader_on_specific_message_with_batches(self):
         client = Client(self.serviceUrl)
         producer = client.create_producer(
-            'my-python-topic-reader-on-specific-message-with-batches',
+            "my-python-topic-reader-on-specific-message-with-batches",
             batching_enabled=True,
-            batching_max_publish_delay_ms=1000)
+            batching_max_publish_delay_ms=1000,
+        )
 
         for i in range(10):
-            producer.send_async(b'hello-%d' % i, None)
+            producer.send_async(b"hello-%d" % i, None)
 
         # Send one sync message to make sure everything was published
-        producer.send(b'hello-10')
+        producer.send(b"hello-10")
 
-        reader1 = client.create_reader(
-                'my-python-topic-reader-on-specific-message-with-batches',
-                MessageId.earliest)
+        reader1 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", MessageId.earliest)
 
         for i in range(5):
             msg = reader1.read_next(TM)
             last_msg_id = msg.message_id()
 
-        reader2 = client.create_reader(
-                'my-python-topic-reader-on-specific-message-with-batches',
-                last_msg_id)
+        reader2 = client.create_reader("my-python-topic-reader-on-specific-message-with-batches", last_msg_id)
 
         for i in range(5, 11):
             msg = reader2.read_next(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
 
         reader1.close()
         reader2.close()
@@ -573,60 +568,56 @@ class PulsarTest(TestCase):
 
     def test_producer_sequence_after_reconnection(self):
         # Enable deduplication on namespace
-        doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
-                   'true')
+        doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
         client = Client(self.serviceUrl)
 
-        topic = 'my-python-test-producer-sequence-after-reconnection-' \
-            + str(time.time())
+        topic = "my-python-test-producer-sequence-after-reconnection-" + str(time.time())
 
-        producer = client.create_producer(topic, producer_name='my-producer-name')
+        producer = client.create_producer(topic, producer_name="my-producer-name")
         self.assertEqual(producer.last_sequence_id(), -1)
 
         for i in range(10):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
             self.assertEqual(producer.last_sequence_id(), i)
 
         producer.close()
 
-        producer = client.create_producer(topic, producer_name='my-producer-name')
+        producer = client.create_producer(topic, producer_name="my-producer-name")
         self.assertEqual(producer.last_sequence_id(), 9)
 
         for i in range(10, 20):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
             self.assertEqual(producer.last_sequence_id(), i)
 
         client.close()
 
-        doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
-                   'false')
+        doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
 
     def test_producer_deduplication(self):
         # Enable deduplication on namespace
-        doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
-                   'true')
+        doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
         client = Client(self.serviceUrl)
 
-        topic = 'my-python-test-producer-deduplication-' + str(time.time())
+        topic = "my-python-test-producer-deduplication-" + str(time.time())
 
-        producer = client.create_producer(topic, producer_name='my-producer-name')
+        producer = client.create_producer(topic, producer_name="my-producer-name")
         self.assertEqual(producer.last_sequence_id(), -1)
 
-        consumer = client.subscribe(topic, 'my-sub')
+        consumer = client.subscribe(topic, "my-sub")
 
-        producer.send(b'hello-0', sequence_id=0)
-        producer.send(b'hello-1', sequence_id=1)
-        producer.send(b'hello-2', sequence_id=2)
+        producer.send(b"hello-0", sequence_id=0)
+        producer.send(b"hello-1", sequence_id=1)
+        producer.send(b"hello-2", sequence_id=2)
         self.assertEqual(producer.last_sequence_id(), 2)
 
         # Repeat the messages and verify they're not received by consumer
-        producer.send(b'hello-1', sequence_id=1)
-        producer.send(b'hello-2', sequence_id=2)
+        producer.send(b"hello-1", sequence_id=1)
+        producer.send(b"hello-2", sequence_id=2)
         self.assertEqual(producer.last_sequence_id(), 2)
 
         for i in range(3):
             msg = consumer.receive(TM)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
             consumer.acknowledge(msg)
 
         with self.assertRaises(pulsar.Timeout):
@@ -634,12 +625,12 @@ class PulsarTest(TestCase):
 
         producer.close()
 
-        producer = client.create_producer(topic, producer_name='my-producer-name')
+        producer = client.create_producer(topic, producer_name="my-producer-name")
         self.assertEqual(producer.last_sequence_id(), 2)
 
         # Repeat the messages and verify they're not received by consumer
-        producer.send(b'hello-1', sequence_id=1)
-        producer.send(b'hello-2', sequence_id=2)
+        producer.send(b"hello-1", sequence_id=1)
+        producer.send(b"hello-2", sequence_id=2)
         self.assertEqual(producer.last_sequence_id(), 2)
 
         with self.assertRaises(pulsar.Timeout):
@@ -647,32 +638,32 @@ class PulsarTest(TestCase):
 
         client.close()
 
-        doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
-                   'false')
+        doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "false")
 
     def test_producer_routing_mode(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-test-producer',
-                                          message_routing_mode=PartitionsRoutingMode.UseSinglePartition)
-        producer.send(b'test')
+        producer = client.create_producer(
+            "my-python-test-producer", message_routing_mode=PartitionsRoutingMode.UseSinglePartition
+        )
+        producer.send(b"test")
         client.close()
 
     def test_message_argument_errors(self):
         client = Client(self.serviceUrl)
-        topic = 'my-python-test-producer'
+        topic = "my-python-test-producer"
         producer = client.create_producer(topic)
 
-        content = 'test'.encode('utf-8')
+        content = "test".encode("utf-8")
 
         self._check_type_error(lambda: producer.send(5))
-        self._check_value_error(lambda: producer.send(content, properties='test'))
+        self._check_value_error(lambda: producer.send(content, properties="test"))
         self._check_value_error(lambda: producer.send(content, partition_key=5))
-        self._check_value_error(lambda: producer.send(content, sequence_id='test'))
+        self._check_value_error(lambda: producer.send(content, sequence_id="test"))
         self._check_value_error(lambda: producer.send(content, replication_clusters=5))
-        self._check_value_error(lambda: producer.send(content, disable_replication='test'))
-        self._check_value_error(lambda: producer.send(content, event_timestamp='test'))
-        self._check_value_error(lambda: producer.send(content, deliver_at='test'))
-        self._check_value_error(lambda: producer.send(content, deliver_after='test'))
+        self._check_value_error(lambda: producer.send(content, disable_replication="test"))
+        self._check_value_error(lambda: producer.send(content, event_timestamp="test"))
+        self._check_value_error(lambda: producer.send(content, deliver_at="test"))
+        self._check_value_error(lambda: producer.send(content, deliver_after="test"))
         client.close()
 
     def test_client_argument_errors(self):
@@ -692,75 +683,75 @@ class PulsarTest(TestCase):
 
         self._check_value_error(lambda: client.create_producer(None))
 
-        topic = 'my-python-test-producer'
+        topic = "my-python-test-producer"
 
         self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
-        self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id='test'))
-        self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis='test'))
+        self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id="test"))
+        self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis="test"))
         self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
-        self._check_value_error(lambda: client.create_producer(topic, max_pending_messages='test'))
-        self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full='test'))
-        self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
-        self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
-        self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes='test'))
-        self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms='test'))
+        self._check_value_error(lambda: client.create_producer(topic, max_pending_messages="test"))
+        self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full="test"))
+        self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
+        self._check_value_error(lambda: client.create_producer(topic, batching_enabled="test"))
+        self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes="test"))
+        self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms="test"))
         client.close()
 
     def test_consumer_argument_errors(self):
         client = Client(self.serviceUrl)
 
-        topic = 'my-python-test-producer'
-        sub_name = 'my-sub-name'
+        topic = "my-python-test-producer"
+        sub_name = "my-sub-name"
 
         self._check_value_error(lambda: client.subscribe(None, sub_name))
         self._check_value_error(lambda: client.subscribe(topic, None))
         self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
-        self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size='test'))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size="test"))
         self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
-        self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms='test'))
-        self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms='test'))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms="test"))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms="test"))
         client.close()
 
     def test_reader_argument_errors(self):
         client = Client(self.serviceUrl)
-        topic = 'my-python-test-producer'
+        topic = "my-python-test-producer"
 
         # This should not raise exception
         client.create_reader(topic, MessageId.earliest)
 
         self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
         self._check_value_error(lambda: client.create_reader(topic, None))
-        self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size='test'))
+        self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size="test"))
         self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
         client.close()
 
     def test_publish_compact_and_consume(self):
         client = Client(self.serviceUrl)
-        topic = 'compaction_%s' % (uuid.uuid4())
-        producer = client.create_producer(topic, producer_name='my-producer-name', batching_enabled=False)
+        topic = "compaction_%s" % (uuid.uuid4())
+        producer = client.create_producer(topic, producer_name="my-producer-name", batching_enabled=False)
         self.assertEqual(producer.last_sequence_id(), -1)
-        consumer = client.subscribe(topic, 'my-sub1', is_read_compacted=True)
+        consumer = client.subscribe(topic, "my-sub1", is_read_compacted=True)
         consumer.close()
-        consumer2 = client.subscribe(topic, 'my-sub2', is_read_compacted=False)
+        consumer2 = client.subscribe(topic, "my-sub2", is_read_compacted=False)
 
         # producer create 2 messages with same key.
-        producer.send(b'hello-0', partition_key='key0')
-        producer.send(b'hello-1', partition_key='key0')
+        producer.send(b"hello-0", partition_key="key0")
+        producer.send(b"hello-1", partition_key="key0")
         producer.close()
 
         # issue compact command, and wait success
-        url='%s/admin/v2/persistent/public/default/%s/compaction' % (self.adminUrl, topic)
-        doHttpPut(url, '')
+        url = "%s/admin/v2/persistent/public/default/%s/compaction" % (self.adminUrl, topic)
+        doHttpPut(url, "")
         while True:
-            s=doHttpGet(url).decode('utf-8')
-            if 'RUNNING' in s:
+            s = doHttpGet(url).decode("utf-8")
+            if "RUNNING" in s:
                 print(s)
                 print("Compact still running")
                 time.sleep(0.2)
             else:
                 print(s)
                 print("Compact Complete now")
-                self.assertTrue('SUCCESS' in s)
+                self.assertTrue("SUCCESS" in s)
                 break
 
         # after compaction completes the compacted ledger is recorded
@@ -772,87 +763,84 @@ class PulsarTest(TestCase):
         time.sleep(1.0)
 
         # after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
-        consumer1 = client.subscribe(topic, 'my-sub1', is_read_compacted=True)
+        consumer1 = client.subscribe(topic, "my-sub1", is_read_compacted=True)
         msg0 = consumer1.receive(TM)
-        self.assertEqual(msg0.data(), b'hello-1')
+        self.assertEqual(msg0.data(), b"hello-1")
         consumer1.acknowledge(msg0)
         consumer1.close()
 
         # ditto for reader
         reader1 = client.create_reader(topic, MessageId.earliest, is_read_compacted=True)
         msg0 = reader1.read_next(TM)
-        self.assertEqual(msg0.data(), b'hello-1')
+        self.assertEqual(msg0.data(), b"hello-1")
         reader1.close()
 
         # after compact, consumer with `is_read_compacted=False`, expected read 2 messages for same key.
         msg0 = consumer2.receive(TM)
-        self.assertEqual(msg0.data(), b'hello-0')
+        self.assertEqual(msg0.data(), b"hello-0")
         consumer2.acknowledge(msg0)
         msg1 = consumer2.receive(TM)
-        self.assertEqual(msg1.data(), b'hello-1')
+        self.assertEqual(msg1.data(), b"hello-1")
         consumer2.acknowledge(msg1)
         consumer2.close()
 
         # ditto for reader
         reader2 = client.create_reader(topic, MessageId.earliest, is_read_compacted=False)
         msg0 = reader2.read_next(TM)
-        self.assertEqual(msg0.data(), b'hello-0')
+        self.assertEqual(msg0.data(), b"hello-0")
         msg1 = reader2.read_next(TM)
-        self.assertEqual(msg1.data(), b'hello-1')
+        self.assertEqual(msg1.data(), b"hello-1")
         reader2.close()
         client.close()
 
     def test_reader_has_message_available(self):
         # create client, producer, reader
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-topic-reader-has-message-available')
-        reader = client.create_reader('my-python-topic-reader-has-message-available',
-                                      MessageId.latest)
+        producer = client.create_producer("my-python-topic-reader-has-message-available")
+        reader = client.create_reader("my-python-topic-reader-has-message-available", MessageId.latest)
 
         # before produce data, expected not has message available
-        self.assertFalse(reader.has_message_available());
+        self.assertFalse(reader.has_message_available())
 
         for i in range(10):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
         # produced data, expected has message available
-        self.assertTrue(reader.has_message_available());
+        self.assertTrue(reader.has_message_available())
 
         for i in range(10):
             msg = reader.read_next(TM)
             self.assertTrue(msg)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
 
         # consumed all data, expected not has message available
-        self.assertFalse(reader.has_message_available());
+        self.assertFalse(reader.has_message_available())
 
         for i in range(10, 20):
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
         # produced data again, expected has message available
-        self.assertTrue(reader.has_message_available());
+        self.assertTrue(reader.has_message_available())
         reader.close()
         producer.close()
         client.close()
 
     def test_seek(self):
         client = Client(self.serviceUrl)
-        topic = 'my-python-topic-seek-' + str(time.time())
-        consumer = client.subscribe(topic,
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
+        topic = "my-python-topic-seek-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared)
         producer = client.create_producer(topic)
 
         for i in range(100):
             if i > 0:
                 time.sleep(0.02)
-            producer.send(b'hello-%d' % i)
+            producer.send(b"hello-%d" % i)
 
         ids = []
         timestamps = []
         for i in range(100):
             msg = consumer.receive(TM)
-            self.assertEqual(msg.data(), b'hello-%d' % i)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
             ids.append(msg.message_id())
             timestamps.append(msg.publish_timestamp())
             consumer.acknowledge(msg)
@@ -861,19 +849,19 @@ class PulsarTest(TestCase):
         consumer.seek(MessageId.earliest)
         time.sleep(0.5)
         msg = consumer.receive(TM)
-        self.assertEqual(msg.data(), b'hello-0')
+        self.assertEqual(msg.data(), b"hello-0")
 
         # seek on messageId
         consumer.seek(ids[50])
         time.sleep(0.5)
         msg = consumer.receive(TM)
-        self.assertEqual(msg.data(), b'hello-50')
+        self.assertEqual(msg.data(), b"hello-50")
 
         # ditto, but seek on timestamp
         consumer.seek(timestamps[42])
         time.sleep(0.5)
         msg = consumer.receive(TM)
-        self.assertEqual(msg.data(), b'hello-42')
+        self.assertEqual(msg.data(), b"hello-42")
 
         # repeat with reader
         reader = client.create_reader(topic, MessageId.latest)
@@ -884,25 +872,25 @@ class PulsarTest(TestCase):
         reader.seek(MessageId.earliest)
         time.sleep(0.5)
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-0')
+        self.assertEqual(msg.data(), b"hello-0")
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-1')
+        self.assertEqual(msg.data(), b"hello-1")
 
         # seek on messageId
         reader.seek(ids[33])
         time.sleep(0.5)
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-33')
+        self.assertEqual(msg.data(), b"hello-33")
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-34')
+        self.assertEqual(msg.data(), b"hello-34")
 
         # seek on timestamp
         reader.seek(timestamps[79])
         time.sleep(0.5)
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-79')
+        self.assertEqual(msg.data(), b"hello-79")
         msg = reader.read_next(TM)
-        self.assertEqual(msg.data(), b'hello-80')
+        self.assertEqual(msg.data(), b"hello-80")
 
         reader.close()
         client.close()
@@ -915,15 +903,13 @@ class PulsarTest(TestCase):
 
     def _v2_topics(self, url):
         client = Client(url)
-        consumer = client.subscribe('my-v2-topic-producer-consumer',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-v2-topic-producer-consumer')
-        producer.send(b'hello')
+        consumer = client.subscribe("my-v2-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared)
+        producer = client.create_producer("my-v2-topic-producer-consumer")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         consumer.acknowledge(msg)
 
         with self.assertRaises(pulsar.Timeout):
@@ -933,38 +919,35 @@ class PulsarTest(TestCase):
 
     def test_topics_consumer(self):
         client = Client(self.serviceUrl)
-        topic1 = 'persistent://public/default/my-python-topics-consumer-1'
-        topic2 = 'persistent://public/default/my-python-topics-consumer-2'
-        topic3 = 'persistent://public/default-2/my-python-topics-consumer-3' # topic from different namespace
+        topic1 = "persistent://public/default/my-python-topics-consumer-1"
+        topic2 = "persistent://public/default/my-python-topics-consumer-2"
+        topic3 = "persistent://public/default-2/my-python-topics-consumer-3"  # topic from different namespace
         topics = [topic1, topic2, topic3]
 
-        url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions'
-        url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions'
-        url3 = self.adminUrl + '/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions'
+        url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-1/partitions"
+        url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-topics-consumer-2/partitions"
+        url3 = self.adminUrl + "/admin/v2/persistent/public/default-2/my-python-topics-consumer-3/partitions"
 
-        doHttpPut(url1, '2')
-        doHttpPut(url2, '3')
-        doHttpPut(url3, '4')
+        doHttpPut(url1, "2")
+        doHttpPut(url2, "3")
+        doHttpPut(url3, "4")
 
         producer1 = client.create_producer(topic1)
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
-        consumer = client.subscribe(topics,
-                                    'my-topics-consumer-sub',
-                                    consumer_type=ConsumerType.Shared,
-                                    receiver_queue_size=10
-                                    )
+        consumer = client.subscribe(
+            topics, "my-topics-consumer-sub", consumer_type=ConsumerType.Shared, receiver_queue_size=10
+        )
 
         for i in range(100):
-            producer1.send(b'hello-1-%d' % i)
+            producer1.send(b"hello-1-%d" % i)
 
         for i in range(100):
-            producer2.send(b'hello-2-%d' % i)
+            producer2.send(b"hello-2-%d" % i)
 
         for i in range(100):
-            producer3.send(b'hello-3-%d' % i)
-
+            producer3.send(b"hello-3-%d" % i)
 
         for i in range(300):
             msg = consumer.receive(TM)
@@ -976,45 +959,46 @@ class PulsarTest(TestCase):
 
     def test_topics_pattern_consumer(self):
         import re
+
         client = Client(self.serviceUrl)
 
-        topics_pattern = 'persistent://public/default/my-python-pattern-consumer.*'
+        topics_pattern = "persistent://public/default/my-python-pattern-consumer.*"
 
-        topic1 = 'persistent://public/default/my-python-pattern-consumer-1'
-        topic2 = 'persistent://public/default/my-python-pattern-consumer-2'
-        topic3 = 'persistent://public/default/my-python-pattern-consumer-3'
+        topic1 = "persistent://public/default/my-python-pattern-consumer-1"
+        topic2 = "persistent://public/default/my-python-pattern-consumer-2"
+        topic3 = "persistent://public/default/my-python-pattern-consumer-3"
 
-        url1 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions'
-        url2 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions'
-        url3 = self.adminUrl + '/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions'
+        url1 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-1/partitions"
+        url2 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-2/partitions"
+        url3 = self.adminUrl + "/admin/v2/persistent/public/default/my-python-pattern-consumer-3/partitions"
 
-        doHttpPut(url1, '2')
-        doHttpPut(url2, '3')
-        doHttpPut(url3, '4')
+        doHttpPut(url1, "2")
+        doHttpPut(url2, "3")
+        doHttpPut(url3, "4")
 
         producer1 = client.create_producer(topic1)
         producer2 = client.create_producer(topic2)
         producer3 = client.create_producer(topic3)
 
-        consumer = client.subscribe(re.compile(topics_pattern),
-                                    'my-pattern-consumer-sub',
-                                    consumer_type = ConsumerType.Shared,
-                                    receiver_queue_size = 10,
-                                    pattern_auto_discovery_period = 1
-                                   )
+        consumer = client.subscribe(
+            re.compile(topics_pattern),
+            "my-pattern-consumer-sub",
+            consumer_type=ConsumerType.Shared,
+            receiver_queue_size=10,
+            pattern_auto_discovery_period=1,
+        )
 
         # wait enough time to trigger auto discovery
         time.sleep(2)
 
         for i in range(100):
-            producer1.send(b'hello-1-%d' % i)
+            producer1.send(b"hello-1-%d" % i)
 
         for i in range(100):
-            producer2.send(b'hello-2-%d' % i)
+            producer2.send(b"hello-2-%d" % i)
 
         for i in range(100):
-            producer3.send(b'hello-3-%d' % i)
-
+            producer3.send(b"hello-3-%d" % i)
 
         for i in range(300):
             msg = consumer.receive(TM)
@@ -1033,70 +1017,72 @@ class PulsarTest(TestCase):
 
     def test_get_topics_partitions(self):
         client = Client(self.serviceUrl)
-        topic_partitioned = 'persistent://public/default/test_get_topics_partitions'
-        topic_non_partitioned = 'persistent://public/default/test_get_topics_not-partitioned'
-
-        url1 = self.adminUrl + '/admin/v2/persistent/public/default/test_get_topics_partitions/partitions'
-        doHttpPut(url1, '3')
-
-        self.assertEqual(client.get_topic_partitions(topic_partitioned),
-                         ['persistent://public/default/test_get_topics_partitions-partition-0',
-                          'persistent://public/default/test_get_topics_partitions-partition-1',
-                          'persistent://public/default/test_get_topics_partitions-partition-2'])
+        topic_partitioned = "persistent://public/default/test_get_topics_partitions"
+        topic_non_partitioned = "persistent://public/default/test_get_topics_not-partitioned"
+
+        url1 = self.adminUrl + "/admin/v2/persistent/public/default/test_get_topics_partitions/partitions"
+        doHttpPut(url1, "3")
+
+        self.assertEqual(
+            client.get_topic_partitions(topic_partitioned),
+            [
+                "persistent://public/default/test_get_topics_partitions-partition-0",
+                "persistent://public/default/test_get_topics_partitions-partition-1",
+                "persistent://public/default/test_get_topics_partitions-partition-2",
+            ],
+        )
 
-        self.assertEqual(client.get_topic_partitions(topic_non_partitioned),
-                         [topic_non_partitioned])
+        self.assertEqual(client.get_topic_partitions(topic_non_partitioned), [topic_non_partitioned])
         client.close()
 
     def test_token_auth(self):
-        with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+        with open("/tmp/pulsar-test-data/tokens/token.txt") as tf:
             token = tf.read().strip()
 
         # Use adminUrl to test both HTTP request and binary protocol
-        client = Client(self.adminUrl,
-                        authentication=AuthenticationToken(token))
+        client = Client(self.adminUrl, authentication=AuthenticationToken(token))
 
-        consumer = client.subscribe('persistent://private/auth/my-python-topic-token-auth',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('persistent://private/auth/my-python-topic-token-auth')
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
+        )
+        producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         client.close()
 
     def test_token_auth_supplier(self):
         def read_token():
-            with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+            with open("/tmp/pulsar-test-data/tokens/token.txt") as tf:
                 return tf.read().strip()
 
-        client = Client(self.serviceUrl,
-                        authentication=AuthenticationToken(read_token))
-        consumer = client.subscribe('persistent://private/auth/my-python-topic-token-auth',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('persistent://private/auth/my-python-topic-token-auth')
-        producer.send(b'hello')
+        client = Client(self.serviceUrl, authentication=AuthenticationToken(read_token))
+        consumer = client.subscribe(
+            "persistent://private/auth/my-python-topic-token-auth", "my-sub", consumer_type=ConsumerType.Shared
+        )
+        producer = client.create_producer("persistent://private/auth/my-python-topic-token-auth")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
         client.close()
 
     def test_producer_consumer_zstd(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('my-python-topic-producer-consumer-zstd',
-                                    'my-sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-producer-consumer-zstd',
-                                          compression_type=CompressionType.ZSTD)
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "my-python-topic-producer-consumer-zstd", "my-sub", consumer_type=ConsumerType.Shared
+        )
+        producer = client.create_producer(
+            "my-python-topic-producer-consumer-zstd", compression_type=CompressionType.ZSTD
+        )
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
-        self.assertEqual(msg.data(), b'hello')
+        self.assertEqual(msg.data(), b"hello")
 
         with self.assertRaises(pulsar.Timeout):
             consumer.receive(100)
@@ -1107,41 +1093,46 @@ class PulsarTest(TestCase):
     def test_client_reference_deleted(self):
         def get_producer():
             cl = Client(self.serviceUrl)
-            return cl.create_producer(topic='foobar')
+            return cl.create_producer(topic="foobar")
 
         producer = get_producer()
-        producer.send(b'test_payload')
+        producer.send(b"test_payload")
 
     #####
 
     def test_get_topic_name(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('persistent://public/default/topic_name_test',
-                                    'topic_name_test_sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('persistent://public/default/topic_name_test')
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
+        )
+        producer = client.create_producer("persistent://public/default/topic_name_test")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
-        self.assertEqual(msg.topic_name(), 'persistent://public/default/topic_name_test')
+        self.assertEqual(msg.topic_name(), "persistent://public/default/topic_name_test")
         client.close()
 
     def test_get_partitioned_topic_name(self):
         client = Client(self.serviceUrl)
-        url1 = self.adminUrl + '/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions'
-        doHttpPut(url1, '3')
-
-        partitions = ['persistent://public/default/partitioned_topic_name_test-partition-0',
-                      'persistent://public/default/partitioned_topic_name_test-partition-1',
-                      'persistent://public/default/partitioned_topic_name_test-partition-2']
-        self.assertEqual(client.get_topic_partitions('persistent://public/default/partitioned_topic_name_test'),
-                         partitions)
+        url1 = self.adminUrl + "/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions"
+        doHttpPut(url1, "3")
+
+        partitions = [
+            "persistent://public/default/partitioned_topic_name_test-partition-0",
+            "persistent://public/default/partitioned_topic_name_test-partition-1",
+            "persistent://public/default/partitioned_topic_name_test-partition-2",
+        ]
+        self.assertEqual(
+            client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test"), partitions
+        )
 
-        consumer = client.subscribe('persistent://public/default/partitioned_topic_name_test',
-                                    'partitioned_topic_name_test_sub',
-                                    consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
-        producer.send(b'hello')
+        consumer = client.subscribe(
+            "persistent://public/default/partitioned_topic_name_test",
+            "partitioned_topic_name_test_sub",
+            consumer_type=ConsumerType.Shared,
+        )
+        producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
+        producer.send(b"hello")
 
         msg = consumer.receive(TM)
         self.assertTrue(msg.topic_name() in partitions)
@@ -1149,12 +1140,12 @@ class PulsarTest(TestCase):
 
     def test_shutdown_client(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
-        producer.send(b'hello')
+        producer = client.create_producer("persistent://public/default/partitioned_topic_name_test")
+        producer.send(b"hello")
         client.shutdown()
 
         try:
-            producer.send(b'hello')
+            producer.send(b"hello")
             self.assertTrue(False)
         except pulsar.PulsarException:
             # Expected
@@ -1162,14 +1153,12 @@ class PulsarTest(TestCase):
 
     def test_negative_acks(self):
         client = Client(self.serviceUrl)
-        consumer = client.subscribe('test_negative_acks',
-                                    'test',
-                                    schema=pulsar.schema.StringSchema(),
-                                    negative_ack_redelivery_delay_ms=1000)
-        producer = client.create_producer('test_negative_acks',
-                                          schema=pulsar.schema.StringSchema())
+        consumer = client.subscribe(
+            "test_negative_acks", "test", schema=pulsar.schema.StringSchema(), negative_ack_redelivery_delay_ms=1000
+        )
+        producer = client.create_producer("test_negative_acks", schema=pulsar.schema.StringSchema())
         for i in range(10):
-            producer.send_async('hello-%d' % i, callback=None)
+            producer.send_async("hello-%d" % i, callback=None)
 
         producer.flush()
 
@@ -1189,20 +1178,28 @@ class PulsarTest(TestCase):
 
     def test_connect_timeout(self):
         client = pulsar.Client(
-            service_url='pulsar://192.0.2.1:1234',
-            connection_timeout_ms=1000, # 1 second
+            service_url="pulsar://192.0.2.1:1234",
+            connection_timeout_ms=1000,  # 1 second
         )
         t1 = time.time()
         try:
-            producer = client.create_producer('test_connect_timeout')
-            self.fail('create_producer should not succeed')
+            producer = client.create_producer("test_connect_timeout")
+            self.fail("create_producer should not succeed")
         except pulsar.ConnectError as expected:
-            print('expected error: {} when create producer'.format(expected))
+            print("expected error: {} when create producer".format(expected))
         t2 = time.time()
         self.assertGreater(t2 - t1, 1.0)
-        self.assertLess(t2 - t1, 1.5) # 1.5 seconds is long enough
+        self.assertLess(t2 - t1, 1.5)  # 1.5 seconds is long enough
         client.close()
 
+    def test_json_schema_encode(self):
+        schema = JsonSchema(TestRecord)
+        record = TestRecord(a=1, b=2)
+        # Ensure that encoding a JsonSchema more than once works and produces the same result
+        first_encode = schema.encode(record)
+        second_encode = schema.encode(record)
+        self.assertEqual(first_encode, second_encode)
+
     def _check_value_error(self, fun):
         with self.assertRaises(ValueError):
             fun()
@@ -1212,5 +1209,5 @@ class PulsarTest(TestCase):
             fun()
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     main()

[pulsar] 16/16: [Transaction] Fix generate transactionId some comment. (#13234)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ac932d545064ee59325edd4bfe0b294eae92ad37
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Dec 11 17:02:27 2021 +0800

    [Transaction] Fix generate transactionId some comment. (#13234)
    
    (cherry picked from commit 4ecb874b71e70b23f2f5310317b3c741007ed61b)
---
 .../intercept/ManagedLedgerInterceptorImpl.java    |  4 +-
 .../pulsar/broker/transaction/TransactionTest.java | 12 +++---
 .../impl/MLTransactionMetadataStore.java           | 19 +++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  6 +--
 ....java => MLTransactionSequenceIdGenerator.java} | 15 ++++++--
 .../MLTransactionMetadataStoreTest.java            | 45 +++++++++++-----------
 6 files changed, 52 insertions(+), 49 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index bbab84b..424797f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -35,11 +35,8 @@ import org.slf4j.LoggerFactory;
 public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
     private static final String INDEX = "index";
-
-
     private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
 
-
     public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {
         this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
     }
@@ -108,6 +105,7 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
                             entries.close();
                             promise.complete(null);
                         } catch (Exception e) {
+                            entries.close();
                             log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
                                     name, e);
                             promise.completeExceptionally(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 07af304..3d0c406 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -87,7 +87,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -510,7 +510,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic(topic, false).get().get();
         persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
         Map<String, String> map = new HashMap<>();
-        map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
+        map.put(MLTransactionSequenceIdGenerator.MAX_LOCAL_TXN_ID, "1");
         persistentTopic.getManagedLedger().setProperties(map);
 
         ManagedCursor managedCursor = mock(ManagedCursor.class);
@@ -521,8 +521,8 @@ public class TransactionTest extends TransactionTestBase {
                     null);
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog =
                 new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
                         persistentTopic.getManagedLedger().getConfig());
@@ -542,7 +542,7 @@ public class TransactionTest extends TransactionTestBase {
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -556,7 +556,7 @@ public class TransactionTest extends TransactionTestBase {
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6ef4f17..a71d203 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -60,7 +59,6 @@ public class MLTransactionMetadataStore
     private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
 
     private final TransactionCoordinatorID tcID;
-    private final AtomicLong sequenceId;
     private final MLTransactionLogImpl transactionLog;
     private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
@@ -70,14 +68,15 @@ public class MLTransactionMetadataStore
     private final LongAdder abortedTransactionCount;
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
+    private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
                                       TransactionRecoverTracker recoverTracker,
-                                      AtomicLong sequenceId) {
+                                      MLTransactionSequenceIdGenerator sequenceIdGenerator) {
         super(State.None);
-        this.sequenceId = sequenceId;
+        this.sequenceIdGenerator = sequenceIdGenerator;
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
         this.timeoutTracker = timeoutTracker;
@@ -204,7 +203,7 @@ public class MLTransactionMetadataStore
         }
 
         long mostSigBits = tcID.getId();
-        long leastSigBits = sequenceId.incrementAndGet();
+        long leastSigBits = sequenceIdGenerator.generateSequenceId();
         TxnID txnID = new TxnID(mostSigBits, leastSigBits);
         long currentTimeMillis = System.currentTimeMillis();
         TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
@@ -214,7 +213,7 @@ public class MLTransactionMetadataStore
                 .setTimeoutMs(timeOut)
                 .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                 .setLastModificationTime(currentTimeMillis)
-                .setMaxLocalTxnId(sequenceId.get());
+                .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
         return transactionLog.append(transactionMetadataEntry)
                 .thenCompose(position -> {
                     appendLogCount.increment();
@@ -243,7 +242,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
                     .addAllPartitions(partitions)
                     .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
@@ -280,7 +279,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
                     .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
                     .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
@@ -321,7 +320,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.UPDATE)
                     .setLastModificationTime(System.currentTimeMillis())
                     .setNewStatus(newStatus)
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                 appendLogCount.increment();
@@ -378,7 +377,7 @@ public class MLTransactionMetadataStore
         TransactionCoordinatorStats transactionCoordinatorstats = new TransactionCoordinatorStats();
         transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
         transactionCoordinatorstats.setState(getState().name());
-        transactionCoordinatorstats.setLeastSigBits(sequenceId.get());
+        transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
         return transactionCoordinatorstats;
     }
 
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 3f20cbc..f0c32d2 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -45,14 +45,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker) {
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
                 new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
-                        recoverTracker, mlTransactionLogInterceptor.getSequenceId()));
+                        recoverTracker, mlTransactionSequenceIdGenerator));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
similarity index 93%
rename from pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
rename to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
index 68add4a..c68997b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import io.netty.buffer.ByteBuf;
-import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
@@ -34,12 +33,11 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
  */
-public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
+public class MLTransactionSequenceIdGenerator implements ManagedLedgerInterceptor {
 
-    private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final Logger log = LoggerFactory.getLogger(MLTransactionSequenceIdGenerator.class);
     private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-    @Getter
     private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
@@ -81,6 +79,7 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
                             entries.close();
                             promise.complete(null);
                         } catch (Exception e) {
+                            entries.close();
                             log.error("[{}] Failed to recover the tc sequenceId from the last add confirmed entry.",
                                     name, e);
                             promise.completeExceptionally(e);
@@ -101,4 +100,12 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
     public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
         propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
     }
+
+    long generateSequenceId() {
+        return sequenceId.incrementAndGet();
+    }
+
+    long getCurrentSequenceId() {
+        return sequenceId.get();
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 03aa1be..7fa3c08 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -42,7 +42,6 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -68,15 +67,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -142,8 +141,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
@@ -151,7 +150,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -177,7 +176,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -194,15 +193,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -245,7 +244,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
                                 txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                                mlTransactionLogInterceptor.getSequenceId());
+                                mlTransactionSequenceIdGenerator);
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -306,15 +305,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -373,15 +372,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -400,7 +399,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -414,15 +413,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();

[pulsar] 02/16: Don't attempt to delete pending ack store unless transactions are enabled (#13041)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dfcdacf40ecacec45e56a00d577866b014f7fa67
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Nov 30 19:28:58 2021 +0200

    Don't attempt to delete pending ack store unless transactions are enabled (#13041)
    
    (cherry picked from commit 46720247d9a06daae9f8eae7740887c92406b2c3)
---
 .../broker/service/persistent/PersistentTopic.java | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 46c9dfd..053d72d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -979,27 +979,32 @@ public class PersistentTopic extends AbstractTopic
     @Override
     public CompletableFuture<Void> unsubscribe(String subscriptionName) {
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
-                .getTransactionPendingAckStoreSuffix(topic,
-                        Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
-                new AsyncCallbacks.DeleteLedgerCallback() {
-            @Override
-            public void deleteLedgerComplete(Object ctx) {
-                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-            }
 
-            @Override
-            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                if (exception instanceof MetadataNotFoundException) {
-                    asyncDeleteCursor(subscriptionName, unsubscribeFuture);
-                    return;
-                }
+        if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
+            getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
+                            .getTransactionPendingAckStoreSuffix(topic,
+                                    Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
+                    new AsyncCallbacks.DeleteLedgerCallback() {
+                        @Override
+                        public void deleteLedgerComplete(Object ctx) {
+                            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                        }
 
-                unsubscribeFuture.completeExceptionally(exception);
-                log.error("[{}][{}] Error deleting subscription pending ack store",
-                        topic, subscriptionName, exception);
-            }
-        }, null);
+                        @Override
+                        public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                            if (exception instanceof MetadataNotFoundException) {
+                                asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+                                return;
+                            }
+
+                            unsubscribeFuture.completeExceptionally(exception);
+                            log.error("[{}][{}] Error deleting subscription pending ack store",
+                                    topic, subscriptionName, exception);
+                        }
+                    }, null);
+        } else {
+            asyncDeleteCursor(subscriptionName, unsubscribeFuture);
+        }
 
         return unsubscribeFuture;
     }

[pulsar] 14/16: Fix when deleting topic with NotFoundException, do not return to client (#13203)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9daba91d8f66627963ada9041699cfee496e8d20
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Dec 11 15:20:49 2021 +0800

    Fix when deleting topic with NotFoundException, do not return to client (#13203)
    
    (cherry picked from commit bd68b6f05f9749328701c59bdaf3cddda2254d39)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  8 ++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  5 ++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f3a94d2..9045462 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -35,6 +35,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -61,6 +62,7 @@ import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 
@@ -772,6 +774,12 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected boolean isManagedLedgerNotFoundException(Exception e) {
+        Throwable cause = e.getCause();
+        return cause instanceof ManagedLedgerException.MetadataNotFoundException
+                || cause instanceof MetadataStoreException.NotFoundException;
+    }
+
     protected void checkArgument(boolean b, String errorMessage) {
         if (!b) {
             throw new RestException(Status.BAD_REQUEST, errorMessage);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index be76ff1..5a084bf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -56,7 +56,6 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -302,7 +301,7 @@ public class PersistentTopicsBase extends AdminResource {
         try {
             pulsar().getBrokerService().deleteTopic(topicName.toString(), true, deleteSchema).get();
         } catch (Exception e) {
-            if (e.getCause() instanceof MetadataNotFoundException) {
+            if (isManagedLedgerNotFoundException(e)) {
                 log.info("[{}] Topic was already not existing {}", clientAppId(), topicName, e);
             } else {
                 log.error("[{}] Failed to delete topic forcefully {}", clientAppId(), topicName, e);
@@ -1021,7 +1020,7 @@ public class PersistentTopicsBase extends AdminResource {
             log.error("[{}] Failed to delete topic {}", clientAppId(), topicName, t);
             if (t instanceof TopicBusyException) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
-            } else if (t instanceof MetadataNotFoundException) {
+            } else if (isManagedLedgerNotFoundException(e)) {
                 throw new RestException(Status.NOT_FOUND, "Topic not found");
             } else {
                 throw new RestException(t);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 6e5ef99..e41db38 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -78,6 +79,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
 import org.powermock.reflect.Whitebox;
@@ -1084,4 +1086,34 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
                 .compareTo(id2) > 0);
     }
+
+    @Test
+    public void testDeleteTopic() throws Exception {
+        final String topicName = "topic-1";
+        BrokerService brokerService = spy(pulsar.getBrokerService());
+        doReturn(brokerService).when(pulsar).getBrokerService();
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false);
+        CompletableFuture<Void> deleteTopicFuture = new CompletableFuture<>();
+        deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException());
+        doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
+        //
+        CompletableFuture<Void> deleteTopicFuture2 = new CompletableFuture<>();
+        deleteTopicFuture2.completeExceptionally(new MetadataStoreException("test exception"));
+        doReturn(deleteTopicFuture2).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        try {
+            persistentTopics.deleteTopic(testTenant, testNamespace, topicName, true, true, true);
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof RestException);
+        }
+        //
+        CompletableFuture<Void> deleteTopicFuture3 = new CompletableFuture<>();
+        deleteTopicFuture3.completeExceptionally(new MetadataStoreException.NotFoundException());
+        doReturn(deleteTopicFuture3).when(brokerService).deleteTopic(anyString(), anyBoolean(), anyBoolean());
+        try {
+            persistentTopics.deleteTopic(testTenant, testNamespace, topicName, false, true, true);
+        } catch (RestException e) {
+            Assert.assertEquals(e.getResponse().getStatus(), 404);
+        }
+    }
 }

[pulsar] 09/16: Update cursor last active timestamp when reseting cursor (#13166)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7cce3f9b2c76c90953b943d9c27a45d5abdff59b
Author: Zhanpeng Wu <zh...@qq.com>
AuthorDate: Fri Dec 10 17:45:45 2021 +0800

    Update cursor last active timestamp when reseting cursor  (#13166)
    
    Resolves #13165
    
    ### Modifications
    1. trigger last active time update after resetting cursor
    2. add related test case
    
    (cherry picked from commit 26342996f8336dd4f63634d8962b6fef11087485)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 42 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index f13fa93..521ec7a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1103,7 +1103,7 @@ public class ManagedCursorImpl implements ManagedCursor {
                     }
                 }
                 callback.resetComplete(newPosition);
-
+                updateLastActive();
             }
 
             @Override
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index bbc1827..2f80bc8 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -771,6 +772,47 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testLastActiveAfterResetCursor() throws Exception {
+        ManagedLedger ledger = factory.open("test_cursor_ledger");
+        ManagedCursor cursor = ledger.openCursor("tla");
+
+        PositionImpl lastPosition = null;
+        for (int i = 0; i < 3; i++) {
+            lastPosition = (PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding));
+        }
+
+        final AtomicBoolean moveStatus = new AtomicBoolean(false);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+
+        long lastActive = cursor.getLastActive();
+
+        cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() {
+            @Override
+            public void resetComplete(Object ctx) {
+                moveStatus.set(true);
+                countDownLatch.countDown();
+            }
+
+            @Override
+            public void resetFailed(ManagedLedgerException exception, Object ctx) {
+                moveStatus.set(false);
+                countDownLatch.countDown();
+            }
+        });
+
+        countDownLatch.await();
+        assertTrue(moveStatus.get());
+
+        assertNotNull(lastPosition);
+        assertEquals(lastPosition, cursor.getReadPosition());
+
+        assertNotEquals(lastActive, cursor.getLastActive());
+
+        cursor.close();
+        ledger.close();
+    }
+
+    @Test(timeOut = 20000)
     void seekPosition() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
         ManagedCursor cursor = ledger.openCursor("c1");

[pulsar] 08/16: [Proxy] Fix issue when Proxy fails to start and logs about an uncaught exception (#13171)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0d7960e1f562f47269305efe6f8ae5727fa2ac6
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Dec 7 18:21:42 2021 +0200

    [Proxy] Fix issue when Proxy fails to start and logs about an uncaught exception (#13171)
    
    * [Proxy] Print stacktrace in uncaught exception handler
    
    * [Proxy] Fix IllegalStateException: Insufficient configured threads
    
    - happens at startup since the proxy client consumes 50% of number of available CPU cores
      for selectors
    
    (cherry picked from commit 3986be6ebe93c1a6ebfa3c3e731b45a6d17948d2)
---
 .../java/org/apache/pulsar/proxy/server/AdminProxyHandler.java     | 7 +++++--
 .../java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java   | 1 +
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 7d3c658..853eb0b 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -53,6 +53,7 @@ import org.eclipse.jetty.client.ProtocolHandlers;
 import org.eclipse.jetty.client.RedirectProtocolHandler;
 import org.eclipse.jetty.client.api.ContentProvider;
 import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.proxy.ProxyServlet;
 import org.eclipse.jetty.util.HttpCookieStore;
@@ -209,12 +210,14 @@ class AdminProxyHandler extends ProxyServlet {
     }
 
     private static class JettyHttpClient extends HttpClient {
+        private static final int NUMBER_OF_SELECTOR_THREADS = 1;
+
         public JettyHttpClient() {
-            super();
+            super(new HttpClientTransportOverHTTP(NUMBER_OF_SELECTOR_THREADS), null);
         }
 
         public JettyHttpClient(SslContextFactory sslContextFactory) {
-            super(sslContextFactory);
+            super(new HttpClientTransportOverHTTP(NUMBER_OF_SELECTOR_THREADS), sslContextFactory);
         }
 
         /**
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 235927c..c16844c 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -108,6 +108,7 @@ public class ProxyServiceStarter {
                 FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
             Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
                 System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
+                exception.printStackTrace(System.out);
             });
 
             JCommander jcommander = new JCommander();

[pulsar] 12/16: [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2457ae984ed3bd7887c2639fb294df8f2439e7e2
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 18:11:50 2021 -0600

    [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
    
    (cherry picked from commit 02b8de039df38fa95656e7ca9b5c0babd25ff021)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7f4d291..cec106c7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1825,21 +1825,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
         }
-        if (!ledgers.containsKey(position.getLedgerId())) {
-            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
-                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
-            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
-                + "the ledgerId does not belong to this topic or has been deleted"), ctx);
-            return;
-        }
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
-        } else {
+        } else if (ledgers.containsKey(position.getLedgerId())) {
             getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                 return null;
             });
+        } else {
+            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+                    + "the ledgerId does not belong to this topic or has been deleted"), ctx);
         }
 
     }