You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:40:53 UTC

[pulsar] branch branch-2.10 updated (61f6246 -> c8a0dff)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 61f6246  [tests] Pulsar IO - Elasticsearch - reduce elastic container memory usage (#14580)
     new eb779ff  [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)
     new 7c51c72  [Functions] Pass configured metricsPort to k8s runtime (#14502)
     new cafeade  [Flaky-test]: Fix MLTransactionMetadataStoreTest.testInitTransactionReader fails sporadically (#14532)
     new b093839  [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)
     new 5228c90  [Flaky-test]: AdminApiTest.testNamespaceSplitBundleConcurrent (#14565)
     new 22cdf21  fix: NamespacesTest execution order (#14552)
     new d16f049  [C++] Fix wrong unit of Access Token Response's `expires_in` field (#14554)
     new d3af1ae  [OWASP] Update mariadb-jdbc dependency and add suppression rule (#14593)
     new 086d710  [Broker] Fixed wrong behaviour caused by not cleaning up topic policy service state. (#14503)
     new 08a6e9b  [C++] Handle exception in creating socket when fd limit is reached (#14587)
     new e0d2f20  [C++] Fix thread safety issue for multi topic consumer  (#14380)
     new fd5bc9d  Add log to track negtive unacked msg. (#14501)
     new e75f0a8  Fix system topic replicate issue (#14605)
     new c8a0dff  Cancel offload tasks when managed ledger closed. (#14545)

The 14 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/functions_worker.yml                          |   1 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   9 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  40 +++++
 pom.xml                                            |   2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      | 129 +++++++++++----
 .../org/apache/pulsar/broker/service/Consumer.java |   6 +
 .../SystemTopicBasedTopicPoliciesService.java      |  53 ++++---
 .../broker/admin/AdminApiHealthCheckTest.java      |  97 +++++++++++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   7 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |   2 +-
 .../service/ReplicatorTopicPoliciesTest.java       | 174 ++++++++++++++++++---
 pulsar-client-cpp/lib/ClientConnection.cc          |  28 +++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 166 +++++++++-----------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   5 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 127 +++++++++++++++
 pulsar-client-cpp/lib/auth/AuthOauth2.cc           |  15 +-
 pulsar-client-cpp/lib/auth/AuthOauth2.h            |   5 +-
 pulsar-client-cpp/tests/ConsumerTest.cc            |  13 +-
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc | 125 +++++++++++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  |  14 ++
 .../runtime/kubernetes/KubernetesRuntime.java      |  18 ++-
 .../kubernetes/KubernetesRuntimeFactory.java       |   7 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  52 ++++++
 .../functions/worker/FunctionsStatsGenerator.java  |   5 +-
 .../io/kafka/connect/PulsarOffsetBackingStore.java |  21 ++-
 .../impl/MLTransactionMetadataStore.java           |   1 +
 src/owasp-dependency-check-suppressions.xml        |  10 ++
 27 files changed, 908 insertions(+), 224 deletions(-)
 create mode 100644 pulsar-client-cpp/lib/SynchronizedHashMap.h
 create mode 100644 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc

[pulsar] 14/14: Cancel offload tasks when managed ledger closed. (#14545)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c8a0dff7e1b51630840a301a35b83c62d188f983
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Mar 11 10:06:33 2022 +0800

    Cancel offload tasks when managed ledger closed. (#14545)
    
    ### Motivation
    When the user config the offloader, as the ledger close, it will trigger the ledger to offload. If there are many ledgers that need to offload, but the topic has been unloaded, the offloader will continue to offload. Because the offloader uses the shared executor pool in ManagedLedgerFactoryImpl and when the managed ledger closes, it doesn't cancel the tasks.
    
    ```
    15:29:59.180 [pulsar-web-41-3] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Unloading topic persistent://public/default/UpdateNodeCharts
    15:29:59.201 [pulsar-web-41-3] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Closing managed ledger
    15:29:59.216 [main-EventThread] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/UpdateNodeCharts] [cloud-nodes-service] Updating cursor info ledgerId=-1 mark-delete=789182:82011
    15:29:59.219 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/UpdateNodeCharts][cloud-nodes-service] Closed cursor at md-position=789182:82011
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/UpdateNodeCharts] Topic closed
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Successfully unloaded topic persistent://public/default/UpdateNodeCharts
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Preparing metadata to offload ledger 422142 with uuid 030267e2-a2f9-40a3-848b-482f9b007c00
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Found previous offload attempt for ledger 422142, uuid 030267e2-a2f9-40a3-848b-482f9b007c00, cleaning up
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Cleanup offload for ledgerId 422142 uuid 3725b3c1-1dbc-481f-a1dd-8aaffb75e603 because of the reason Previous failed offload.
    ```
    
    ### Modifications
    
    - When do `offloadLoop`, check state first. if `Close`, nothing to do.
    
    (cherry picked from commit e0687e37e137f55c6cffa263d8ac8af9169dad92)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a13cf68..5334544 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2360,13 +2360,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                     + ", total size = {}, already offloaded = {}, to offload = {}",
                             name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
                             sizeSummed, alreadyOffloadedSize, toOffloadSize);
+                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
                 } else {
                     // offloadLoop will complete immediately with an empty list to offload
                     log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
                             name, sizeSummed, alreadyOffloadedSize, threshold);
+                    unlockingPromise.complete(PositionImpl.LATEST);
                 }
-
-                offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
             }
         }
     }
@@ -2929,6 +2929,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+        if (getState() == State.Closed) {
+            promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+                    String.format("managed ledger [%s] has already closed", name)));
+            return;
+        }
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 982b914..c6008c76 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -121,6 +122,7 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -3449,4 +3451,42 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    public void testOffloadTaskCancelled() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+        OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+        config.setLedgerOffloader(ledgerOffloader);
+
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+
+        CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+        offloadFuture.complete(null);
+        Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+        final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+        final ManagedLedgerImpl ledger = spy(ledgerInit);
+        long ledgerId = 3L;
+        doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+        doReturn(ManagedLedgerImpl.State.Closed).when(ledger).getState();
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        ledger.close();
+
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<LedgerInfo> ledgerInfo = ledger.getLedgerInfo(ledgerId);
+            Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
+        });
+    }
+
 }

[pulsar] 13/14: Fix system topic replicate issue (#14605)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e75f0a839d6a79aa27187c527b83df568e892853
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Mar 10 20:15:12 2022 +0800

    Fix system topic replicate issue (#14605)
    
    ### Motivation
    PIP 92 has introduced topic policies across clusters. But after https://github.com/apache/pulsar/pull/12517, if the policy is not global, it set the replicate cluster to an empty set.
    ```
    PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
     if (policies == null || !policies.isGlobalPolicies()) {
         // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty.
         builder.replicateTo(new HashSet<>());
    }
    ```
    It should set the `replicateTo` with the local cluster, not an empty set.
    
    Otherwise,  it will cause the system event to be replicated. Details are here :
    https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328
    
    (cherry picked from commit e470de54314483ccb4f4970e0d772c81c4bdb731)
---
 .../SystemTopicBasedTopicPoliciesService.java      |  14 +-
 .../service/ReplicatorTopicPoliciesTest.java       | 174 ++++++++++++++++++---
 2 files changed, 165 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index bbb0257..e7af027 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +62,8 @@ import org.slf4j.LoggerFactory;
 public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesService {
 
     private final PulsarService pulsarService;
+    private final HashSet localCluster;
+    private final String clusterName;
     private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
     @VisibleForTesting
@@ -80,6 +83,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
+        this.clusterName = pulsarService.getConfiguration().getClusterName();
+        this.localCluster = Sets.newHashSet(clusterName);
     }
 
     @Override
@@ -143,7 +148,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder();
         if (policies == null || !policies.isGlobalPolicies()) {
             // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty.
-            builder.replicateTo(new HashSet<>());
+            builder.replicateTo(localCluster);
         }
         return builder
                 .actionType(actionType)
@@ -454,9 +459,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
     }
 
-    private static boolean hasReplicateTo(Message<?> message) {
+    private boolean hasReplicateTo(Message<?> message) {
         if (message instanceof MessageImpl) {
-            return ((MessageImpl<?>) message).hasReplicateTo();
+            return ((MessageImpl<?>) message).hasReplicateTo()
+                    ? (((MessageImpl<?>) message).getReplicateTo().size() == 1
+                        ? !((MessageImpl<?>) message).getReplicateTo().contains(clusterName) : true)
+                    : false;
         }
         if (message instanceof TopicMessageImpl) {
             return hasReplicateTo(((TopicMessageImpl<?>) message).getMessage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 29f0b8e..8ec3e04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -85,6 +85,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         BacklogQuotaImpl backlogQuota = new BacklogQuotaImpl();
         backlogQuota.setLimitSize(1);
         backlogQuota.setLimitTime(2);
+        // local
+        admin1.topicPolicies().setBacklogQuota(topic, backlogQuota);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies().getBacklogQuotaMap(topic).size(), 0));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies().getBacklogQuotaMap(topic).size(), 0));
+        // global
         admin1.topicPolicies(true).setBacklogQuota(topic, backlogQuota);
 
         Awaitility.await().untilAsserted(() ->
@@ -104,7 +111,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set message ttl
+        // local
+        admin1.topicPolicies().setMessageTTL(topic, 10);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMessageTTL(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMessageTTL(topic)));
+        // global
         admin1.topicPolicies(true).setMessageTTL(topic, 10);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMessageTTL(topic).intValue(), 10));
@@ -125,6 +138,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         init(namespace, topic);
         // set global topic policy
         SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
+        // local
+        admin1.topicPolicies().setSubscribeRate(topic, subscribeRate);
+        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getSubscribeRate(topic)));
+        // global
         admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate);
 
         // get global topic policy
@@ -141,7 +158,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set global topic policy
+        // local
+        admin1.topicPolicies().setMaxMessageSize(topic, 1000);
+        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getMaxMessageSize(topic)));
+        // global
         admin1.topicPolicies(true).setMaxMessageSize(topic, 1000);
 
         // get global topic policy
@@ -160,6 +180,10 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         init(namespace, topic);
         // set global topic policy
         PublishRate publishRate = new PublishRate(100, 10000);
+        // local
+        admin1.topicPolicies().setPublishRate(topic, publishRate);
+        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies().getPublishRate(topic)));
+        // global
         admin1.topicPolicies(true).setPublishRate(topic, publishRate);
 
         // get global topic policy
@@ -176,7 +200,11 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set global topic policy
+        // local
+        admin1.topicPolicies().setDeduplicationSnapshotInterval(topic, 100);
+        untilRemoteClustersAsserted(
+                admin -> assertNull(admin.topicPolicies().getDeduplicationSnapshotInterval(topic)));
+        // global
         admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 100);
 
         // get global topic policy
@@ -207,6 +235,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         init(namespace, topic);
         // set PersistencePolicies
         PersistencePolicies policies = new PersistencePolicies(5, 3, 2, 1000);
+        // local
+        admin1.topicPolicies().setPersistence(topic, policies);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getPersistence(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getPersistence(topic)));
+        // global
         admin1.topicPolicies(true).setPersistence(topic, policies);
 
         Awaitility.await().untilAsserted(() ->
@@ -226,7 +261,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set subscription types policies
+        // local
+        admin1.topicPolicies().setDeduplicationStatus(topic, true);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getDeduplicationStatus(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getDeduplicationStatus(topic)));
+        // global
         admin1.topicPolicies(true).setDeduplicationStatus(topic, true);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertTrue(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
@@ -238,7 +279,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
                 assertNull(admin2.topicPolicies(true).getDeduplicationStatus(topic)));
         Awaitility.await().untilAsserted(() ->
                 assertNull(admin3.topicPolicies(true).getDeduplicationStatus(topic)));
-
     }
 
     @Test
@@ -246,8 +286,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-
-        // set max producer policies
+        // local
+        admin1.topicPolicies().setMaxProducers(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxProducers(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxProducers(topic)));
+        // global
         admin1.topicPolicies(true).setMaxProducers(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMaxProducers(topic).intValue(), 100));
@@ -268,7 +313,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
 
         init(namespace, topic);
-        // set max consumer per sub
+        // local
+        admin1.topicPolicies().setMaxConsumersPerSubscription(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxConsumersPerSubscription(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxConsumersPerSubscription(topic)));
+        // global
         admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));
@@ -277,7 +328,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
 
         Awaitility.await().untilAsserted(() -> {
             assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100);
-            assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic));
         });
 
         //remove max consumer per sub
@@ -293,7 +343,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set max unacked msgs per consumers
+        // local
+        admin1.topicPolicies().setMaxUnackedMessagesOnConsumer(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnConsumer(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnConsumer(topic)));
+        // global
         admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
@@ -315,6 +371,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         init(namespace, persistentTopicName);
         // set retention
         RetentionPolicies retentionPolicies = new RetentionPolicies(1, 1);
+        // local
+        admin1.topicPolicies().setRetention(persistentTopicName, retentionPolicies);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getRetention(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getRetention(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setRetention(persistentTopicName, retentionPolicies);
 
         Awaitility.await().untilAsserted(() ->
@@ -324,7 +387,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
 
         Awaitility.await().untilAsserted(() -> {
             assertEquals(admin1.topicPolicies(true).getRetention(persistentTopicName), retentionPolicies);
-            assertNull(admin1.topicPolicies().getRetention(persistentTopicName));
         });
 
         //remove retention
@@ -341,7 +403,14 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         init(namespace, topic);
         Set<SubscriptionType> subscriptionTypes = new HashSet<>();
         subscriptionTypes.add(SubscriptionType.Shared);
-        // set subscription types policies
+        // local
+        admin1.topicPolicies().setSubscriptionTypesEnabled(topic, subscriptionTypes);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getSubscriptionTypesEnabled(topic), null));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getSubscriptionTypesEnabled(topic), null));
+
+        // global
         admin1.topicPolicies(true).setSubscriptionTypesEnabled(topic, subscriptionTypes);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), subscriptionTypes));
@@ -353,7 +422,6 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
                 assertEquals(admin2.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin3.topicPolicies(true).getSubscriptionTypesEnabled(topic), Collections.emptySet()));
-
     }
 
 
@@ -362,7 +430,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set max consumers
+        // local
+        admin1.topicPolicies().setMaxConsumers(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxConsumers(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxConsumers(topic)));
+        // global
         admin1.topicPolicies(true).setMaxConsumers(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMaxConsumers(topic).intValue(), 100));
@@ -389,6 +463,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
                 .ratePeriodInSecond(3)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        admin1.topicPolicies().setDispatchRate(persistentTopicName, dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getDispatchRate(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setDispatchRate(persistentTopicName, dispatchRate);
 
         // get dispatchRate
@@ -411,7 +492,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
         DelayedDeliveryPolicies policies = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
-        // set delayed delivery
+        // local
+        admin1.topicPolicies().setDelayedDeliveryPolicy(topic, policies);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getDelayedDeliveryPolicy(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getDelayedDeliveryPolicy(topic)));
+        // global
         admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
@@ -434,6 +521,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         // set InactiveTopicPolicies
         InactiveTopicPolicies inactiveTopicPolicies =
                 new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+        // local
+        admin1.topicPolicies().setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getInactiveTopicPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getInactiveTopicPolicies(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true)
@@ -462,6 +556,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
                 .dispatchThrottlingRateInByte(1)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        admin1.topicPolicies().setSubscriptionDispatchRate(persistentTopicName, dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getSubscriptionDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getSubscriptionDispatchRate(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate);
         // get subscription dispatch rate
         Awaitility.await().untilAsserted(() ->
@@ -492,6 +593,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
                 .dispatchThrottlingRateInByte(1)
                 .relativeToPublishRate(true)
                 .build();
+        // local
+        admin1.topicPolicies().setReplicatorDispatchRate(persistentTopicName, dispatchRate);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getReplicatorDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getReplicatorDispatchRate(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setReplicatorDispatchRate(persistentTopicName, dispatchRate);
         // get replicator dispatch rate
         Awaitility.await().untilAsserted(() ->
@@ -514,7 +622,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, topic);
-        // set max unacked msgs per sub
+        // local
+        admin1.topicPolicies().setMaxUnackedMessagesOnSubscription(topic, 100);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getMaxUnackedMessagesOnSubscription(topic)));
+        // global
         admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100);
         Awaitility.await().ignoreExceptions().untilAsserted(() ->
                 assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
@@ -534,7 +648,13 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
 
         init(namespace, persistentTopicName);
-        // set compaction threshold
+        // local
+        admin1.topicPolicies().setCompactionThreshold(persistentTopicName, 1);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getCompactionThreshold(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getCompactionThreshold(persistentTopicName)));
+        // global
         admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 1);
         // get compaction threshold
         Awaitility.await().untilAsserted(() ->
@@ -557,8 +677,12 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
         init(namespace, persistentTopicName);
+        // local
+        admin1.topicPolicies().setMaxSubscriptionsPerTopic(persistentTopicName, 1024);
+        untilRemoteClustersAsserted(
+                admin -> assertNull(admin.topicPolicies().getMaxSubscriptionsPerTopic(persistentTopicName)));
 
-        //set max subscriptions per topic
+        // global
         admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024);
 
         //get max subscriptions per topic
@@ -581,8 +705,18 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
         OffloadPoliciesImpl offloadPolicies =
                 OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null,
                 8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);
-
-        // set offload policies
+        // local
+        try {
+            admin1.topicPolicies().setOffloadPolicies(persistentTopicName, offloadPolicies);
+        } catch (Exception exception){
+            // driver not found exception.
+            assertTrue(exception instanceof PulsarAdminException.ServerSideErrorException);
+        }
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies().getOffloadPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies().getOffloadPolicies(persistentTopicName)));
+        // global
         try{
             admin1.topicPolicies(true).setOffloadPolicies(persistentTopicName, offloadPolicies);
         }catch (Exception exception){

[pulsar] 07/14: [C++] Fix wrong unit of Access Token Response's `expires_in` field (#14554)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d16f049c17485f2b8dfed95d8a48e69ab1a3a072
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Mar 8 12:53:29 2022 +0800

    [C++] Fix wrong unit of Access Token Response's `expires_in` field (#14554)
    
    ### Motivation
    
    The `expires_in` field of Access Token Response is in seconds. See
    https://datatracker.ietf.org/doc/html/rfc6749#section-4.2.2. However,
    C++ client treats it as milliseconds currently. It will leads to an
    earlier expiration of the token.
    
    ### Modifications
    
    Record the time point via the `std::time_point` class, which supports
    add operations with a `std::duration` object. Then converts the
    `expires_in` field via `std::chrono::second` function and calculate the
    expired time point.
    
    It also removes the usage of Boost time functions and makes code more clear.
    
    (cherry picked from commit 95c1581d494d59c4d93782eb18547cec5427b503)
---
 pulsar-client-cpp/lib/auth/AuthOauth2.cc | 15 ++-------------
 pulsar-client-cpp/lib/auth/AuthOauth2.h  |  5 ++++-
 2 files changed, 6 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.cc b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
index 0fc935a..334289d 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.cc
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.cc
@@ -23,7 +23,6 @@
 #include <stdexcept>
 #include <boost/property_tree/json_parser.hpp>
 #include <boost/property_tree/ptree.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
 
 #include <lib/LogUtils.h>
 DECLARE_LOG_OBJECT()
@@ -86,22 +85,12 @@ CachedToken::~CachedToken() {}
 
 // Oauth2CachedToken
 
-static int64_t currentTimeMillis() {
-    using namespace boost::posix_time;
-    using boost::posix_time::milliseconds;
-    using boost::posix_time::seconds;
-    static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
-
-    time_duration diff = microsec_clock::universal_time() - time_t_epoch;
-    return diff.total_milliseconds();
-}
-
 Oauth2CachedToken::Oauth2CachedToken(Oauth2TokenResultPtr token) {
     latest_ = token;
 
     int64_t expiredIn = token->getExpiresIn();
     if (expiredIn > 0) {
-        expiresAt_ = expiredIn + currentTimeMillis();
+        expiresAt_ = Clock::now() + std::chrono::seconds(expiredIn);
     } else {
         throw std::runtime_error("ExpiresIn in Oauth2TokenResult invalid value: " +
                                  std::to_string(expiredIn));
@@ -113,7 +102,7 @@ AuthenticationDataPtr Oauth2CachedToken::getAuthData() { return authData_; }
 
 Oauth2CachedToken::~Oauth2CachedToken() {}
 
-bool Oauth2CachedToken::isExpired() { return expiresAt_ < currentTimeMillis(); }
+bool Oauth2CachedToken::isExpired() { return expiresAt_ < Clock::now(); }
 
 // OauthFlow
 
diff --git a/pulsar-client-cpp/lib/auth/AuthOauth2.h b/pulsar-client-cpp/lib/auth/AuthOauth2.h
index b3cc952..59e8ad9 100644
--- a/pulsar-client-cpp/lib/auth/AuthOauth2.h
+++ b/pulsar-client-cpp/lib/auth/AuthOauth2.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <pulsar/Authentication.h>
+#include <chrono>
 #include <mutex>
 #include <string>
 
@@ -69,13 +70,15 @@ class ClientCredentialFlow : public Oauth2Flow {
 
 class Oauth2CachedToken : public CachedToken {
    public:
+    using Clock = std::chrono::high_resolution_clock;
+
     Oauth2CachedToken(Oauth2TokenResultPtr token);
     ~Oauth2CachedToken();
     bool isExpired();
     AuthenticationDataPtr getAuthData();
 
    private:
-    int64_t expiresAt_;
+    std::chrono::time_point<Clock> expiresAt_;
     Oauth2TokenResultPtr latest_;
     AuthenticationDataPtr authData_;
 };

[pulsar] 04/14: [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0938399d5d8d7f752a93916fc804b59af1c6b32
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Fri Mar 4 10:29:34 2022 +0800

    [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)
    
    (cherry picked from commit 4f1e39b6921ea401b8c27f17a041d06d85f8abf8)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 129 +++++++++++++++------
 .../broker/admin/AdminApiHealthCheckTest.java      |  97 +++++++++++++++-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  14 +++
 3 files changed, 203 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 4530d8c..eda186a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -54,6 +53,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -338,43 +338,104 @@ public class BrokersBase extends AdminResource {
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
                 ? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
                 : NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
-        String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
+        final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
-        String messageStr = UUID.randomUUID().toString();
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
-                .thenCompose(topicOptional -> {
-                    if (!topicOptional.isPresent()) {
-                        LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
-                                clientAppId(), topicName);
-                        throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
-                    }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            .stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
-                    try {
-                        PulsarClient client = pulsar().getClient();
-                        return client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        .thenCombine(client.newReader(Schema.STRING).topic(topicName)
-                                        .startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            List<CompletableFuture<Void>> closeFutures =
-                                                                    new ArrayList<>();
-                                                            closeFutures.add(producer.closeAsync());
-                                                            closeFutures.add(reader.closeAsync());
-                                                            return FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
-                    } catch (PulsarServerException e) {
-                        LOG.error("[{}] Fail to run health check while get client.", clientAppId());
-                        throw new RestException(e);
+            .thenCompose(topicOptional -> {
+                if (!topicOptional.isPresent()) {
+                    LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
+                            clientAppId(), topicName);
+                    throw new RestException(Status.NOT_FOUND,
+                            String.format("Topic [%s] not found after create.", topicName));
+                }
+                PulsarClient client;
+                try {
+                    client = pulsar().getClient();
+                } catch (PulsarServerException e) {
+                    LOG.error("[{}] Fail to run health check while get client.", clientAppId());
+                    throw new RestException(e);
+                }
+                CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+                client.newProducer(Schema.STRING).topic(topicName).createAsync()
+                        .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
+                                .subscriptionName(subscriptionName)
+                                .startMessageId(MessageId.latest)
+                                .createAsync().exceptionally(createException -> {
+                                    producer.closeAsync().exceptionally(ex -> {
+                                        LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+                                        return null;
+                                    });
+                                    throw FutureUtil.wrapToCompletionException(createException);
+                                }).thenCompose(reader -> producer.sendAsync(messageStr)
+                                        .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+                                        .whenComplete((__, ex) -> {
+                                            closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
+                                                    .whenComplete((unused, innerEx) -> {
+                                                        if (ex != null) {
+                                                            resultFuture.completeExceptionally(ex);
+                                                        } else {
+                                                            resultFuture.complete(null);
+                                                        }
+                                                    });
+                                        }
+                                ))
+                        ).exceptionally(ex -> {
+                            resultFuture.completeExceptionally(ex);
+                            return null;
+                        });
+                return resultFuture;
+            });
+    }
+
+    /**
+     * Close producer and reader and then to re-check if this operation is success.
+     *
+     * Re-check
+     * - Producer: If close fails we will print error log to notify user.
+     * - Consumer: If close fails we will force delete subscription.
+     *
+     * @param producer Producer
+     * @param reader Reader
+     * @param topic  Topic
+     * @param subscriptionName  Subscription name
+     */
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
+                                                    Topic topic, String subscriptionName) {
+        // no matter exception or success, we still need to
+        // close producer/reader
+        CompletableFuture<Void> producerFuture = producer.closeAsync();
+        CompletableFuture<Void> readerFuture = reader.closeAsync();
+        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+        futures.add(producerFuture);
+        futures.add(readerFuture);
+        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+                .exceptionally(closeException -> {
+                    if (readerFuture.isCompletedExceptionally()) {
+                        LOG.error("[{}] Close reader fail while heath check.", clientAppId());
+                        Subscription subscription =
+                                topic.getSubscription(subscriptionName);
+                        // re-check subscription after reader close
+                        if (subscription != null) {
+                            LOG.warn("[{}] Force delete subscription {} "
+                                            + "when it still exists after the"
+                                            + " reader is closed.",
+                                    clientAppId(), subscription);
+                            subscription.deleteForcefully()
+                                    .exceptionally(ex -> {
+                                        LOG.error("[{}] Force delete subscription fail"
+                                                        + " while health check",
+                                                clientAppId(), ex);
+                                        return null;
+                                    });
+                        }
+                    } else {
+                        // producer future fail.
+                        LOG.error("[{}] Close producer fail while heath check.", clientAppId());
                     }
+                    return null;
                 });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index 4f01cb1..b9886b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -21,12 +21,19 @@ package org.apache.pulsar.broker.admin;
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.springframework.util.CollectionUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 @Test(groups = "broker-admin")
 @Slf4j
@@ -55,16 +62,100 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testHealthCheckup() throws Exception {
-        admin.brokers().healthcheck();
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck();
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck();
+        }
+        // To ensure we don't have any subscription
+        final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV1() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V1);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V1);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V1);
+        }
+        final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV2() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V2);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V2);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V2);
+        }
+        final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 687cbd2..a29ac8c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -188,4 +188,18 @@ public class FutureUtil {
         }
         return Optional.empty();
     }
+
+    /**
+     * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
+     *
+     * @param throwable Exception
+     * @return CompletionException
+     */
+    public static CompletionException wrapToCompletionException(Throwable throwable) {
+        if (throwable instanceof CompletionException) {
+            return (CompletionException) throwable;
+        } else {
+            return new CompletionException(throwable);
+        }
+    }
 }

[pulsar] 01/14: [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb779ff8ccbe3eae80148f29715aa5713e330d91
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Wed Mar 2 16:35:58 2022 +0800

    [pulsar-io] throw exceptions when kafka offset backing store failed to start (#14491)
    
    (cherry picked from commit e6656e1407be80fdf4b6aaf424a57068687840cc)
---
 .../io/kafka/connect/PulsarOffsetBackingStore.java  | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index 495c8b9..86905ad 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -49,7 +51,7 @@ import org.apache.pulsar.client.api.Schema;
 @Slf4j
 public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
-    private Map<ByteBuffer, ByteBuffer> data;
+    private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
     private PulsarClient client;
     private String topic;
     private Producer<byte[]> producer;
@@ -65,7 +67,6 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     public void configure(WorkerConfig workerConfig) {
         this.topic = workerConfig.getString(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         checkArgument(!isBlank(topic), "Offset storage topic must be specified");
-        this.data = new HashMap<>();
 
         log.info("Configure offset backing store on pulsar topic {}", topic);
     }
@@ -126,10 +127,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
     }
 
     void processMessage(Message<byte[]> message) {
-        synchronized (data) {
+        if (message.getKey() != null) {
             data.put(
                 ByteBuffer.wrap(message.getKey().getBytes(UTF_8)),
                 ByteBuffer.wrap(message.getValue()));
+        } else {
+            log.debug("Got message without key from the offset storage topic, skip it. message value: {}",
+                    message.getValue());
         }
     }
 
@@ -149,10 +153,13 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
 
             CompletableFuture<Void> endFuture = new CompletableFuture<>();
             readToEnd(endFuture);
-            endFuture.join();
+            endFuture.get();
         } catch (PulsarClientException e) {
             log.error("Failed to setup pulsar producer/reader to cluster", e);
             throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ",  e);
+        } catch (ExecutionException | InterruptedException e) {
+            log.error("Failed to start PulsarOffsetBackingStore", e);
+            throw new RuntimeException("Failed to start PulsarOffsetBackingStore",  e);
         }
     }
 
@@ -180,6 +187,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
             }
             reader = null;
         }
+        data.clear();
 
         // do not close the client, it is provided by the sink context
     }
@@ -191,10 +199,7 @@ public class PulsarOffsetBackingStore implements OffsetBackingStore {
         return endFuture.thenApply(ignored -> {
             Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
             for (ByteBuffer key : keys) {
-                ByteBuffer value;
-                synchronized (data) {
-                    value = data.get(key);
-                }
+                ByteBuffer value = data.get(key);
                 if (null != value) {
                     values.put(key, value);
                 }

[pulsar] 02/14: [Functions] Pass configured metricsPort to k8s runtime (#14502)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7c51c7200c2b86e9e5d58616ddaef6eda8e99b16
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Mar 2 15:00:42 2022 +0200

    [Functions] Pass configured metricsPort to k8s runtime (#14502)
    
    (cherry picked from commit daed6a0b3ac94e77a3d7d4212ee297b9046317a2)
---
 conf/functions_worker.yml                          |  1 +
 .../runtime/kubernetes/KubernetesRuntime.java      | 18 +++++---
 .../kubernetes/KubernetesRuntimeFactory.java       |  7 ++-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 52 ++++++++++++++++++++++
 .../functions/worker/FunctionsStatsGenerator.java  |  5 ++-
 5 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 428c089..24d46e1 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -222,6 +222,7 @@ functionRuntimeFactoryConfigs:
 #    # The port inside the function pod which is used by the worker to communicate with the pod
 #    grpcPort: 9093
 #    # The port inside the function pod on which prometheus metrics are exposed
+#    # An empty value disables prometheus metrics.
 #    metricsPort: 9094
 #    # The directory inside the function pod where nar packages will be extracted
 #    narExtractionDirectory:
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a04ce72..59d688a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -426,7 +426,11 @@ public class KubernetesRuntime implements Runtime {
 
     @Override
     public String getPrometheusMetrics() throws IOException {
-        return RuntimeUtils.getPrometheusMetrics(metricsPort);
+        if (metricsPort != null) {
+            return RuntimeUtils.getPrometheusMetrics(metricsPort);
+        } else {
+            return null;
+        }
     }
 
     @Override
@@ -974,10 +978,14 @@ public class KubernetesRuntime implements Runtime {
     }
 
     private Map<String, String> getPrometheusAnnotations() {
-        final Map<String, String> annotations = new HashMap<>();
-        annotations.put("prometheus.io/scrape", "true");
-        annotations.put("prometheus.io/port", String.valueOf(metricsPort));
-        return annotations;
+        if (metricsPort != null) {
+            final Map<String, String> annotations = new HashMap<>();
+            annotations.put("prometheus.io/scrape", "true");
+            annotations.put("prometheus.io/port", String.valueOf(metricsPort));
+            return annotations;
+        } else {
+            return Collections.emptyMap();
+        }
     }
 
     private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index d98a161..4b2c7e7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -290,6 +290,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         String overriddenNamespace = manifestCustomizer.map((customizer) -> customizer.customizeNamespace(instanceConfig.getFunctionDetails(), jobNamespace)).orElse(jobNamespace);
         String overriddenName = manifestCustomizer.map((customizer) -> customizer.customizeName(instanceConfig.getFunctionDetails(), jobName)).orElse(jobName);
 
+        // pass metricsPort configured in functionRuntimeFactoryConfigs.metricsPort in functions_worker.yml
+        if (metricsPort != null) {
+            instanceConfig.setMetricsPort(metricsPort);
+        }
+
         return new KubernetesRuntime(
             appsClient,
             coreClient,
@@ -351,7 +356,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             if (k8Uri == null) {
                 log.info("k8Uri is null thus going by defaults");
                 ApiClient cli;
-                if (submittingInsidePod) {
+                if (submittingInsidePod != null && submittingInsidePod) {
                     log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster");
                     cli = Config.fromCluster();
                 } else {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index ef45d7f..995546c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -29,12 +29,14 @@ import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
@@ -42,6 +44,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.ConnectorsManager;
@@ -66,6 +69,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
@@ -1145,4 +1149,52 @@ public class KubernetesRuntimeTest {
         String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
         assertTrue(containerCommand.contains(expectedDownloadCommand));
     }
+
+    @Test
+    public void shouldUseConfiguredMetricsPort() throws Exception {
+        assertMetricsPortConfigured(Collections.singletonMap("metricsPort", 12345), 12345);
+    }
+
+    @Test
+    public void shouldUseDefaultMetricsPortWhenMetricsPortIsntSet() throws Exception {
+        assertMetricsPortConfigured(Collections.emptyMap(), 9094);
+    }
+
+    @Test
+    public void shouldNotAddPrometheusAnnotationIfMetricsPortIsSetToEmpty() throws Exception {
+        assertMetricsPortConfigured(Collections.singletonMap("metricsPort", ""), -1);
+    }
+
+    private void assertMetricsPortConfigured(Map<String, Object> functionRuntimeFactoryConfigs,
+                                             int expectedPort) throws Exception {
+        KubernetesRuntimeFactory kubernetesRuntimeFactory = new KubernetesRuntimeFactory();
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(functionRuntimeFactoryConfigs);
+        AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+        kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class), Optional.empty(), Optional.empty());
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
+        KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, 30l);
+        V1PodTemplateSpec template = container.createStatefulSet().getSpec().getTemplate();
+        Map<String, String> annotations =
+                template.getMetadata().getAnnotations();
+        if (expectedPort != -1) {
+            // metrics port should be passed to k8s annotation for prometheus scraping
+            assertEquals(annotations.get("prometheus.io/port"), String.valueOf(expectedPort));
+            // scraping annotation should exist
+            assertEquals(annotations.get("prometheus.io/scrape"), "true");
+
+            // metrics port should be passed to JavaInstanceStarter with --metrics_port argument
+            assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" "))
+                    .contains("--metrics_port " + expectedPort));
+        } else {
+            // No prometheus annotations should exist
+            assertFalse(annotations.containsKey("prometheus.io/scrape"));
+            assertFalse(annotations.containsKey("prometheus.io/port"));
+            // metrics will be started on random port when the port isn't specified
+            // check that "--metrics_port 0" argument is passed
+            assertTrue(container.getProcessArgs().stream().collect(Collectors.joining(" "))
+                    .contains("--metrics_port 0"));
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 7dd1e74..d94f54c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -68,7 +68,10 @@ public class FunctionsStatsGenerator {
                     if (functionRuntime != null) {
                         try {
 
-                            out.write(functionRuntime.getPrometheusMetrics());
+                            String prometheusMetrics = functionRuntime.getPrometheusMetrics();
+                            if (prometheusMetrics != null) {
+                                out.write(prometheusMetrics);
+                            }
 
                         } catch (IOException e) {
                             log.warn("Failed to collect metrics for function instance {}",

[pulsar] 09/14: [Broker] Fixed wrong behaviour caused by not cleaning up topic policy service state. (#14503)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 086d7109a213ca6d1ea362b28a9e24a62f890141
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Mar 9 09:18:39 2022 +0800

    [Broker] Fixed wrong behaviour caused by not cleaning up topic policy service state. (#14503)
    
    (cherry picked from commit f32154c06c6475fac1cd89d105d3c31d5d8713dc)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 39 +++++++++++-----------
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 9b8c69e..bbb0257 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -255,8 +256,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (ex != null) {
                     log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
                     result.completeExceptionally(ex);
-                    readerCaches.remove(namespace);
-                    reader.closeAsync();
+                    cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 } else {
                     initPolicesCache(reader, result);
                     result.thenRun(() -> readMorePolicies(reader));
@@ -290,14 +290,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
         AtomicInteger bundlesCount = ownedBundlesCountPerNamespace.get(namespace);
         if (bundlesCount == null || bundlesCount.decrementAndGet() <= 0) {
-            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
-                    readerCaches.remove(namespace);
-            if (readerCompletableFuture != null) {
-                readerCompletableFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
-                ownedBundlesCountPerNamespace.remove(namespace);
-                policyCacheInitMap.remove(namespace);
-                policiesCache.entrySet().removeIf(entry -> entry.getKey().getNamespaceObject().equals(namespace));
-            }
+            cleanCacheAndCloseReader(namespace, true);
         }
         return CompletableFuture.completedFuture(null);
     }
@@ -331,9 +324,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 log.error("[{}] Failed to check the move events for the system topic",
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
-                readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                reader.closeAsync();
+                cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 return;
             }
             if (hasMore) {
@@ -342,9 +333,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         log.error("[{}] Failed to read event from the system topic.",
                                 reader.getSystemTopic().getTopicName(), e);
                         future.completeExceptionally(e);
-                        readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
-                        reader.closeAsync();
+                        cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                         return;
                     }
                     refreshTopicPoliciesCache(msg);
@@ -373,6 +362,18 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         });
     }
 
+    private void cleanCacheAndCloseReader(NamespaceName namespace, boolean cleanOwnedBundlesCount) {
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerFuture = readerCaches.remove(namespace);
+        policiesCache.entrySet().removeIf(entry -> Objects.equals(entry.getKey().getNamespaceObject(), namespace));
+        if (cleanOwnedBundlesCount) {
+            ownedBundlesCountPerNamespace.remove(namespace);
+        }
+        if (readerFuture != null && !readerFuture.isCompletedExceptionally()) {
+            readerFuture.thenAccept(SystemTopicClient.Reader::closeAsync);
+        }
+        policyCacheInitMap.remove(namespace);
+    }
+
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
         reader.readNextAsync().whenComplete((msg, ex) -> {
             if (ex == null) {
@@ -382,10 +383,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             } else {
                 if (ex instanceof PulsarClientException.AlreadyClosedException) {
                     log.error("Read more topic policies exception, close the read now!", ex);
-                    NamespaceName namespace = reader.getSystemTopic().getTopicName().getNamespaceObject();
-                    ownedBundlesCountPerNamespace.remove(namespace);
-                    readerCaches.remove(namespace);
+                    cleanCacheAndCloseReader(
+                            reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
                 } else {
+                    log.warn("Read more topic polices exception, read again.", ex);
                     readMorePolicies(reader);
                 }
             }

[pulsar] 08/14: [OWASP] Update mariadb-jdbc dependency and add suppression rule (#14593)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3af1aebf8872858dc5cd3ba5535cb35c9b6dddf
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Tue Mar 8 15:50:58 2022 +0800

    [OWASP] Update mariadb-jdbc dependency and add suppression rule (#14593)
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    (cherry picked from commit 3c5698ac11d91e11aed5bf356e239218acf8313d)
---
 pom.xml                                     |  2 +-
 src/owasp-dependency-check-suppressions.xml | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index f7e1348..3d3bfab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,7 +152,7 @@ flexible messaging model and an intuitive client API.</description>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
     <postgresql-jdbc.version>42.2.25</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
-    <mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
+    <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
     <hdfs-offload-version3>3.3.1</hdfs-offload-version3>
     <json-smart.version>2.4.7</json-smart.version>
     <opensearch.version>1.2.4</opensearch.version>
diff --git a/src/owasp-dependency-check-suppressions.xml b/src/owasp-dependency-check-suppressions.xml
index 89cc001..08edea5 100644
--- a/src/owasp-dependency-check-suppressions.xml
+++ b/src/owasp-dependency-check-suppressions.xml
@@ -444,5 +444,15 @@
         <cve>CVE-2019-10174</cve>
         <cve>CVE-2020-25711</cve>
     </suppress>
+    <suppress>
+        <notes><![CDATA[
+       file name: mariadb-java-client-2.7.5.jar
+       ]]></notes>
+        <sha1>9dd29797ecabe7d2e7fa892ec6713a5552cfcc59</sha1>
+        <cve>CVE-2020-28912</cve>
+        <cve>CVE-2021-46669</cve>
+        <cve>CVE-2021-46666</cve>
+        <cve>CVE-2021-46667</cve>
+    </suppress>
 
 </suppressions>

[pulsar] 05/14: [Flaky-test]: AdminApiTest.testNamespaceSplitBundleConcurrent (#14565)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5228c908d7d05ed0ea49c23a74ea3a64b63e7222
Author: Aloys <lo...@gmail.com>
AuthorDate: Sat Mar 5 18:29:52 2022 +0800

    [Flaky-test]: AdminApiTest.testNamespaceSplitBundleConcurrent (#14565)
    
    (cherry picked from commit 01b55678321b6cf254f5d289f7eb177a6aea9be9)
---
 .../src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 672bf96..c10f342 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -1533,10 +1533,11 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             fail("split bundle shouldn't have thrown exception");
         }
 
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(bundleFactory.getBundles(NamespaceName.get(namespace)).getBundles().size(), 4));
         String[] splitRange4 = { namespace + "/0x00000000_0x3fffffff", namespace + "/0x3fffffff_0x7fffffff",
                 namespace + "/0x7fffffff_0xbfffffff", namespace + "/0xbfffffff_0xffffffff" };
         bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
-        assertEquals(bundles.getBundles().size(), 4);
         for (int i = 0; i < bundles.getBundles().size(); i++) {
             assertEquals(bundles.getBundles().get(i).toString(), splitRange4[i]);
         }
@@ -1566,13 +1567,13 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (Exception e) {
             fail("split bundle shouldn't have thrown exception");
         }
-
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(bundleFactory.getBundles(NamespaceName.get(namespace)).getBundles().size(), 8));
         String[] splitRange8 = { namespace + "/0x00000000_0x1fffffff", namespace + "/0x1fffffff_0x3fffffff",
                 namespace + "/0x3fffffff_0x5fffffff", namespace + "/0x5fffffff_0x7fffffff",
                 namespace + "/0x7fffffff_0x9fffffff", namespace + "/0x9fffffff_0xbfffffff",
                 namespace + "/0xbfffffff_0xdfffffff", namespace + "/0xdfffffff_0xffffffff" };
         bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
-        assertEquals(bundles.getBundles().size(), 8);
         for (int i = 0; i < bundles.getBundles().size(); i++) {
             assertEquals(bundles.getBundles().get(i).toString(), splitRange8[i]);
         }

[pulsar] 03/14: [Flaky-test]: Fix MLTransactionMetadataStoreTest.testInitTransactionReader fails sporadically (#14532)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cafeadedf078d8cc412b5631c9f0c81e352d813f
Author: 包子 <wu...@gmail.com>
AuthorDate: Thu Mar 3 16:36:54 2022 +0800

    [Flaky-test]: Fix MLTransactionMetadataStoreTest.testInitTransactionReader fails sporadically (#14532)
    
    ### Motivation
    
    #14525
    
    When update states is `TxnStatus.COMMITTED`,  Not correctly `completableFuture.complete`.
    
    ### Modifications
    - When update states is `TxnStatus.COMMITTED`, Add return to ending. Avoid direct calls `completableFuture.complete` from other logic.
    
    ### Documentation
    - [x ] `no-need-doc`
    
    (cherry picked from commit c15d0ef24e37df6b86bc8f4c27d43af27f8cc663)
---
 .../pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index b4100de..19d651c 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -393,6 +393,7 @@ public class MLTransactionMetadataStore
                                 txnMetaMap.remove(txnID.getLeastSigBits());
                                 completableFuture.complete(null);
                             });
+                            return;
                         }
                         completableFuture.complete(null);
                     } catch (InvalidTxnStatusException e) {

[pulsar] 12/14: Add log to track negtive unacked msg. (#14501)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fd5bc9d688fa1c9f3f918626b3e12f896de511be
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Mar 10 21:46:35 2022 +0800

    Add log to track negtive unacked msg. (#14501)
    
    (cherry picked from commit 82bbb2c03a4ae14c2bccab183d665ad3248d4184)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java    | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3cf4cac..c30b991 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -138,6 +138,8 @@ public class Consumer {
     @Setter
     private volatile long consumerEpoch;
 
+    private long negtiveUnackedMsgsTimestamp;
+
     public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
@@ -947,6 +949,10 @@ public class Consumer {
             subscription.addUnAckedMessages(ackedMessages);
             unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
         }
+        if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) {
+            negtiveUnackedMsgsTimestamp = System.currentTimeMillis();
+            log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer);
+        }
         return unackedMsgs;
     }
 

[pulsar] 11/14: [C++] Fix thread safety issue for multi topic consumer (#14380)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e0d2f2072447892615f08f30aaf76ed1210c8a4a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Mar 10 16:03:02 2022 +0800

    [C++] Fix thread safety issue for multi topic consumer  (#14380)
    
    * [C++] Fix thread safety issue for multi topic consumer
    
    **Motivation**
    
    In C++ client, if a consumer subscribes multiple topics, a
    `MultiTopicsConsumerImpl` object, which manages a vector of
    `ConsumerImpl`s (`consumers_` field), will be created. However,
    `consumers_` could be accessed by multiple threads, while no
    mutex is locked to protect the access to make it thread safe.
    
    **Modifications**
    
    - Add a `SynchronizedHashMap` class, which implements some thread safe
      methods of traverse, remove, find, clear operations. Since the
      `forEach` methods could call other methods, use the recursive mutex
      instead of the default mutex.
    - Add a related test `SynchronizedHashMapTest` to test the methods and
      the thread safety of `SynchronizedHashMap`.
    - Use `SynchronizedHashMap` as the type of
      `MultiTopicsConsumerImpl::consumers_`.
    
    * Add findFirstValueIf method
    
    * Remove unnecessary return value of forEach
    
    * Fix incorrect calls of forEachValue
    
    * Add missed header
    
    (cherry picked from commit f94eba942b9fb3d2c25b6f7a9e2c0885a194efa0)
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 166 +++++++++------------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   5 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 127 ++++++++++++++++
 pulsar-client-cpp/tests/ConsumerTest.cc            |  13 +-
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc | 125 ++++++++++++++++
 5 files changed, 335 insertions(+), 101 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 4e31e64..0ae86d5 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -171,7 +171,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
         consumer->getConsumerCreatedFuture().addListener(std::bind(
             &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1,
             std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
-        consumers_.insert(std::make_pair(topicName->toString(), consumer));
+        consumers_.emplace(topicName->toString(), consumer);
         LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
         consumer->start();
 
@@ -184,7 +184,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
                 &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
                 std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
             consumer->setPartitionIndex(i);
-            consumers_.insert(std::make_pair(topicPartitionName, consumer));
+            consumers_.emplace(topicPartitionName, consumer);
             LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
             consumer->start();
         }
@@ -232,20 +232,19 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
     state_ = Closing;
     lock.unlock();
 
-    if (consumers_.empty()) {
+    std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
+    auto self = shared_from_this();
+    int numConsumers = 0;
+    consumers_.forEachValue(
+        [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
+            numConsumers++;
+            consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) {
+                self->handleUnsubscribedAsync(result, consumerUnsubed, callback);
+            });
+        });
+    if (numConsumers == 0) {
         // No need to unsubscribe, since the list matching the regex was empty
         callback(ResultOk);
-        return;
-    }
-
-    std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
-
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (consumer->second)
-            ->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync,
-                                         shared_from_this(), std::placeholders::_1, consumerUnsubed,
-                                         callback));
     }
 }
 
@@ -299,17 +298,17 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
 
     for (int i = 0; i < numberPartitions; i++) {
         std::string topicPartitionName = topicName->getTopicPartitionName(i);
-        std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
-
-        if (consumers_.end() == iterator) {
+        auto optConsumer = consumers_.find(topicPartitionName);
+        if (optConsumer.is_empty()) {
             LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
             callback(ResultUnknownError);
+            continue;
         }
 
-        (iterator->second)
-            ->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync,
-                                         shared_from_this(), std::placeholders::_1, consumerUnsubed,
-                                         numberPartitions, topicName, topicPartitionName, callback));
+        optConsumer.value()->unsubscribeAsync(
+            std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, shared_from_this(),
+                      std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName,
+                      callback));
     }
 }
 
@@ -326,10 +325,9 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 
     LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
 
-    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
-    if (consumers_.end() != iterator) {
-        iterator->second->pauseMessageListener();
-        consumers_.erase(iterator);
+    auto optConsumer = consumers_.remove(topicPartitionName);
+    if (optConsumer.is_present()) {
+        optConsumer.value()->pauseMessageListener();
     }
 
     if (consumerUnsubed->load() == numberPartitions) {
@@ -363,7 +361,16 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
 
     setState(Closing);
 
-    if (consumers_.empty()) {
+    auto self = shared_from_this();
+    int numConsumers = 0;
+    consumers_.forEach(
+        [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) {
+            numConsumers++;
+            consumer->closeAsync([self, name, callback](Result result) {
+                self->handleSingleConsumerClose(result, name, callback);
+            });
+        });
+    if (numConsumers == 0) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << subscriptionName_);
         setState(Closed);
@@ -373,27 +380,13 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    // close successfully subscribed consumers
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        std::string topicPartitionName = consumer->first;
-        ConsumerImplPtr consumerPtr = consumer->second;
-
-        consumerPtr->closeAsync(std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose,
-                                          shared_from_this(), std::placeholders::_1, topicPartitionName,
-                                          callback));
-    }
-
     // fail pending recieve
     failPendingReceiveCallback();
 }
 
-void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName,
+void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName,
                                                         CloseCallback callback) {
-    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
-    if (consumers_.end() != iterator) {
-        consumers_.erase(iterator);
-    }
+    consumers_.remove(topicPartitionName);
 
     LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - "
                                                       << numberTopicPartitions_->load());
@@ -543,15 +536,14 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal
     }
 
     const std::string& topicPartitionName = msgId.getTopicName();
-    std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName);
+    auto optConsumer = consumers_.find(topicPartitionName);
 
-    if (consumers_.end() != iterator) {
+    if (optConsumer.is_present()) {
         unAckedMessageTrackerPtr_->remove(msgId);
-        iterator->second->acknowledgeAsync(msgId, callback);
+        optConsumer.value()->acknowledgeAsync(msgId, callback);
     } else {
         LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
         callback(ResultUnknownError);
-        return;
     }
 }
 
@@ -560,11 +552,11 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
 }
 
 void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
-    auto iterator = consumers_.find(msgId.getTopicName());
+    auto optConsumer = consumers_.find(msgId.getTopicName());
 
-    if (consumers_.end() != iterator) {
+    if (optConsumer.is_present()) {
         unAckedMessageTrackerPtr_->remove(msgId);
-        iterator->second->negativeAcknowledge(msgId);
+        optConsumer.value()->negativeAcknowledge(msgId);
     }
 }
 
@@ -605,22 +597,18 @@ bool MultiTopicsConsumerImpl::isOpen() {
 }
 
 void MultiTopicsConsumerImpl::receiveMessages() {
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        ConsumerImplPtr consumerPtr = consumer->second;
-        consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize());
-        LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId());
-    }
+    const auto receiverQueueSize = conf_.getReceiverQueueSize();
+    consumers_.forEachValue([receiverQueueSize](const ConsumerImplPtr& consumer) {
+        consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), receiverQueueSize);
+        LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId());
+    });
 }
 
 Result MultiTopicsConsumerImpl::pauseMessageListener() {
     if (!messageListener_) {
         return ResultInvalidConfiguration;
     }
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (consumer->second)->pauseMessageListener();
-    }
+    consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
     return ResultOk;
 }
 
@@ -628,19 +616,14 @@ Result MultiTopicsConsumerImpl::resumeMessageListener() {
     if (!messageListener_) {
         return ResultInvalidConfiguration;
     }
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (consumer->second)->resumeMessageListener();
-    }
+    consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
     return ResultOk;
 }
 
 void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
     LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (consumer->second)->redeliverUnacknowledgedMessages();
-    }
+    consumers_.forEachValue(
+        [](const ConsumerImplPtr& consumer) { consumer->redeliverUnacknowledgedMessages(); });
     unAckedMessageTrackerPtr_->clear();
 }
 
@@ -653,10 +636,9 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
         return;
     }
     LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
-    for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (consumer->second)->redeliverUnacknowledgedMessages(messageIds);
-    }
+    consumers_.forEachValue([&messageIds](const ConsumerImplPtr& consumer) {
+        consumer->redeliverUnacknowledgedMessages(messageIds);
+    });
 }
 
 int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
@@ -671,15 +653,17 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
     MultiTopicsBrokerConsumerStatsPtr statsPtr =
         std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
     LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
-    int size = consumers_.size();
     lock.unlock();
 
-    ConsumerMap::const_iterator consumer = consumers_.begin();
-    for (int i = 0; i < size; i++, consumer++) {
-        consumer->second->getBrokerConsumerStatsAsync(
-            std::bind(&MultiTopicsConsumerImpl::handleGetConsumerStats, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, latchPtr, statsPtr, i, callback));
-    }
+    auto self = shared_from_this();
+    size_t i = 0;
+    consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
+        size_t index = i++;
+        consumer->getBrokerConsumerStatsAsync(
+            [self, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
+                self->handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+            });
+    });
 }
 
 void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
@@ -725,10 +709,9 @@ void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
 }
 
 void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
-    Lock lock(mutex_);
-    for (auto&& c : consumers_) {
-        c.second->setNegativeAcknowledgeEnabledForTesting(enabled);
-    }
+    consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) {
+        consumer->setNegativeAcknowledgeEnabledForTesting(enabled);
+    });
 }
 
 bool MultiTopicsConsumerImpl::isConnected() const {
@@ -736,24 +719,19 @@ bool MultiTopicsConsumerImpl::isConnected() const {
     if (state_ != Ready) {
         return false;
     }
+    lock.unlock();
 
-    for (const auto& topicAndConsumer : consumers_) {
-        if (!topicAndConsumer.second->isConnected()) {
-            return false;
-        }
-    }
-    return true;
+    return consumers_
+        .findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
+        .is_empty();
 }
 
 uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
-    Lock lock(mutex_);
     uint64_t numberOfConnectedConsumer = 0;
-    const auto consumers = consumers_;
-    lock.unlock();
-    for (const auto& topicAndConsumer : consumers) {
-        if (topicAndConsumer.second->isConnected()) {
+    consumers_.forEachValue([&numberOfConnectedConsumer](const ConsumerImplPtr& consumer) {
+        if (consumer->isConnected()) {
             numberOfConnectedConsumer++;
         }
-    }
+    });
     return numberOfConnectedConsumer;
 }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index aa6b261..98b2f31 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -32,6 +32,7 @@
 #include <lib/MultiTopicsBrokerConsumerStatsImpl.h>
 #include <lib/TopicName.h>
 #include <lib/NamespaceName.h>
+#include <lib/SynchronizedHashMap.h>
 
 namespace pulsar {
 typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
@@ -93,7 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::string consumerStr_;
     std::string topic_;
     const ConsumerConfiguration conf_;
-    typedef std::map<std::string, ConsumerImplPtr> ConsumerMap;
+    typedef SynchronizedHashMap<std::string, ConsumerImplPtr> ConsumerMap;
     ConsumerMap consumers_;
     std::map<std::string, int> topicsPartitions_;
     mutable std::mutex mutex_;
@@ -115,7 +116,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
 
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
-    void handleSingleConsumerClose(Result result, std::string& topicPartitionName, CloseCallback callback);
+    void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
new file mode 100644
index 0000000..3a78467
--- /dev/null
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <mutex>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+#include "Utils.h"
+
+namespace pulsar {
+
+// V must be default constructible and copyable
+template <typename K, typename V>
+class SynchronizedHashMap {
+    using MutexType = std::recursive_mutex;
+    using Lock = std::lock_guard<MutexType>;
+
+   public:
+    using OptValue = Optional<V>;
+    using PairVector = std::vector<std::pair<K, V>>;
+
+    SynchronizedHashMap() = default;
+
+    SynchronizedHashMap(const PairVector& pairs) {
+        for (auto&& kv : pairs) {
+            data_.emplace(kv.first, kv.second);
+        }
+    }
+
+    template <typename... Args>
+    void emplace(Args&&... args) {
+        Lock lock(mutex_);
+        data_.emplace(std::forward<Args>(args)...);
+    }
+
+    void forEach(std::function<void(const K&, const V&)> f) const {
+        Lock lock(mutex_);
+        for (const auto& kv : data_) {
+            f(kv.first, kv.second);
+        }
+    }
+
+    void forEachValue(std::function<void(const V&)> f) const {
+        Lock lock(mutex_);
+        for (const auto& kv : data_) {
+            f(kv.second);
+        }
+    }
+
+    void clear() {
+        Lock lock(mutex_);
+        data_.clear();
+    }
+
+    OptValue find(const K& key) const {
+        Lock lock(mutex_);
+        auto it = data_.find(key);
+        if (it != data_.end()) {
+            return OptValue::of(it->second);
+        } else {
+            return OptValue::empty();
+        }
+    }
+
+    OptValue findFirstValueIf(std::function<bool(const V&)> f) const {
+        Lock lock(mutex_);
+        for (const auto& kv : data_) {
+            if (f(kv.second)) {
+                return OptValue::of(kv.second);
+            }
+        }
+        return OptValue::empty();
+    }
+
+    OptValue remove(const K& key) {
+        Lock lock(mutex_);
+        auto it = data_.find(key);
+        if (it != data_.end()) {
+            auto result = OptValue::of(it->second);
+            data_.erase(it);
+            return result;
+        } else {
+            return OptValue::empty();
+        }
+    }
+
+    // This method is only used for test
+    PairVector toPairVector() const {
+        Lock lock(mutex_);
+        PairVector pairs;
+        for (auto&& kv : data_) {
+            pairs.emplace_back(kv);
+        }
+        return pairs;
+    }
+
+    // This method is only used for test
+    size_t size() const noexcept {
+        Lock lock(mutex_);
+        return data_.size();
+    }
+
+   private:
+    std::unordered_map<K, V> data_;
+    // Use recursive_mutex to allow methods being called in `forEach`
+    mutable MutexType mutex_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index 100086e..b61c15a 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -530,11 +530,14 @@ TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
         multiTopicsConsumerImplPtr->unAckedMessageTrackerPtr_.get());
     ASSERT_EQ(numOfMessages * 3, multiTopicsTracker->size());
     ASSERT_FALSE(multiTopicsTracker->isEmpty());
-    for (auto iter = multiTopicsConsumerImplPtr->consumers_.begin();
-         iter != multiTopicsConsumerImplPtr->consumers_.end(); ++iter) {
-        auto subConsumerPtr = iter->second;
-        auto tracker =
-            static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
+
+    std::vector<UnAckedMessageTrackerEnabled*> trackers;
+    multiTopicsConsumerImplPtr->consumers_.forEach(
+        [&trackers](const std::string& name, const ConsumerImplPtr& consumer) {
+            trackers.emplace_back(
+                static_cast<UnAckedMessageTrackerEnabled*>(consumer->unAckedMessageTrackerPtr_.get()));
+        });
+    for (const auto& tracker : trackers) {
         ASSERT_EQ(0, tracker->size());
         ASSERT_TRUE(tracker->isEmpty());
     }
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
new file mode 100644
index 0000000..62c55c4
--- /dev/null
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <thread>
+#include <vector>
+#include "lib/Latch.h"
+#include "lib/SynchronizedHashMap.h"
+
+using namespace pulsar;
+using SyncMapType = SynchronizedHashMap<int, int>;
+using OptValue = typename SyncMapType::OptValue;
+using PairVector = typename SyncMapType::PairVector;
+
+inline void sleepMs(long millis) { std::this_thread::sleep_for(std::chrono::milliseconds(millis)); }
+
+inline PairVector sort(PairVector pairs) {
+    std::sort(pairs.begin(), pairs.end(), [](const std::pair<int, int>& lhs, const std::pair<int, int>& rhs) {
+        return lhs.first < rhs.first;
+    });
+    return pairs;
+}
+
+TEST(SynchronizedHashMap, testClear) {
+    SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
+    m.clear();
+    ASSERT_EQ(m.toPairVector(), PairVector{});
+}
+
+TEST(SynchronizedHashMap, testRemoveAndFind) {
+    SyncMapType m({{1, 100}, {2, 200}, {3, 300}});
+
+    OptValue optValue;
+    optValue = m.findFirstValueIf([](const int& x) { return x == 200; });
+    ASSERT_TRUE(optValue.is_present());
+    ASSERT_EQ(optValue.value(), 200);
+
+    optValue = m.findFirstValueIf([](const int& x) { return x >= 301; });
+    ASSERT_FALSE(optValue.is_present());
+
+    optValue = m.find(1);
+    ASSERT_TRUE(optValue.is_present());
+    ASSERT_EQ(optValue.value(), 100);
+
+    ASSERT_FALSE(m.find(0).is_present());
+    ASSERT_FALSE(m.remove(0).is_present());
+
+    optValue = m.remove(1);
+    ASSERT_TRUE(optValue.is_present());
+    ASSERT_EQ(optValue.value(), 100);
+
+    ASSERT_FALSE(m.remove(1).is_present());
+    ASSERT_FALSE(m.find(1).is_present());
+}
+
+TEST(SynchronizedHashMapTest, testForEach) {
+    SyncMapType m({{1, 100}, {2, 200}, {3, 300}});
+    std::vector<int> values;
+    m.forEachValue([&values](const int& value) { values.emplace_back(value); });
+    std::sort(values.begin(), values.end());
+    ASSERT_EQ(values, std::vector<int>({100, 200, 300}));
+
+    PairVector pairs;
+    m.forEach([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); });
+    PairVector expectedPairs({{1, 100}, {2, 200}, {3, 300}});
+    ASSERT_EQ(sort(pairs), expectedPairs);
+}
+
+TEST(SynchronizedHashMap, testRecursiveMutex) {
+    SyncMapType m({{1, 100}});
+    OptValue optValue;
+    m.forEach([&m, &optValue](const int& key, const int& value) {
+        optValue = m.find(key);  // the internal mutex was locked again
+    });
+    ASSERT_TRUE(optValue.is_present());
+    ASSERT_EQ(optValue.value(), 100);
+}
+
+TEST(SynchronizedHashMapTest, testThreadSafeForEach) {
+    SyncMapType m({{1, 100}, {2, 200}, {3, 300}});
+
+    Latch latch(1);
+    std::thread t{[&m, &latch] {
+        latch.wait();  // this thread must start after `m.forEach` started
+        m.remove(2);
+    }};
+
+    std::atomic_bool firstElementDone{false};
+    PairVector pairs;
+    m.forEach([&latch, &firstElementDone, &pairs](const int& key, const int& value) {
+        pairs.emplace_back(key, value);
+        if (!firstElementDone) {
+            latch.countdown();
+            firstElementDone = true;
+        }
+        sleepMs(200);
+    });
+    {
+        PairVector expectedPairs({{1, 100}, {2, 200}, {3, 300}});
+        ASSERT_EQ(sort(pairs), expectedPairs);
+    }
+    t.join();
+    {
+        PairVector expectedPairs({{1, 100}, {3, 300}});
+        ASSERT_EQ(sort(m.toPairVector()), expectedPairs);
+    }
+}

[pulsar] 10/14: [C++] Handle exception in creating socket when fd limit is reached (#14587)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 08a6e9b621311cee0ceedf638d12ac499c870d6f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Mar 8 21:33:27 2022 -0800

    [C++] Handle exception in creating socket when fd limit is reached (#14587)
    
    (cherry picked from commit babae8e98a172302aee0bb3790b0f4e4128a7c35)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 28 ++++++++++++++++++++--------
 1 file changed, 20 insertions(+), 8 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d246bf8..cf12f29 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -161,7 +161,6 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
       serverProtocolVersion_(ProtocolVersion_MIN),
       executor_(executor),
       resolver_(executor_->createTcpResolver()),
-      socket_(executor_->createSocket()),
 #if BOOST_VERSION >= 107000
       strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
 #elif BOOST_VERSION >= 106600
@@ -173,12 +172,20 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
       physicalAddress_(physicalAddress),
       cnxString_("[<none> -> " + physicalAddress + "] "),
       incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
-      connectTimeoutTask_(std::make_shared<PeriodicTask>(executor_->getIOService(),
-                                                         clientConfiguration.getConnectionTimeout())),
       outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
-      consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
       maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()) {
 
+    try {
+        socket_ = executor_->createSocket();
+        connectTimeoutTask_ = std::make_shared<PeriodicTask>(executor_->getIOService(),
+                                                             clientConfiguration.getConnectionTimeout());
+        consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
+    } catch (const boost::system::system_error& e) {
+        LOG_ERROR("Failed to initialize connection: " << e.what());
+        close();
+        return;
+    }
+
     LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
     if (clientConfiguration.isUseTls()) {
 #if BOOST_VERSION >= 105400
@@ -1505,9 +1512,11 @@ void ClientConnection::close(Result result) {
     }
     state_ = Disconnected;
     boost::system::error_code err;
-    socket_->close(err);
-    if (err) {
-        LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+    if (socket_) {
+        socket_->close(err);
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+        }
     }
 
     if (tlsSocket_) {
@@ -1542,7 +1551,10 @@ void ClientConnection::close(Result result) {
         consumerStatsRequestTimer_.reset();
     }
 
-    connectTimeoutTask_->stop();
+    if (connectTimeoutTask_) {
+        connectTimeoutTask_->stop();
+        connectTimeoutTask_.reset();
+    }
 
     lock.unlock();
     LOG_INFO(cnxString_ << "Connection closed");

[pulsar] 06/14: fix: NamespacesTest execution order (#14552)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22cdf21a5c106a651b52e7c0d2b96ca2f32a7e22
Author: Baozi <wu...@gmail.com>
AuthorDate: Tue Mar 8 00:07:00 2022 +0800

    fix: NamespacesTest execution order (#14552)
    
    (cherry picked from commit f80e2ccf20e5af41c36983f2a93be95b57be841c)
---
 .../src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 7bd2607..a6436f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -844,11 +844,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
             doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
         }
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(captor2.capture());
         assertEquals(captor2.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());