You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:05 UTC

[pulsar] branch branch-2.10 updated (4bdaa32bc7c -> 3048c876c2a)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 4bdaa32bc7c [improve][broker] Reduce the consumers list sort by priority level (#16243)
     new 4a02d47e269 [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
     new 13a87905ce5 [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
     new 2251e6ebd7a [fix][client] Remove consumer when close consumer command is received (#15761)
     new 41f83ccc09e [fix][admin] Fix typo in validation message (#16021)
     new eb60a55edf0 [fix][client] Remove producer when close producer command is received (#16028)
     new 68a8a31c30c rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)
     new cc9ff5965a1 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
     new cec950eb3e6 [ML] Fix thread safety issues in ManagedCursorContainer related to "heap" field access (#16049)
     new 5f7a6afa2e3 [improve][tests] improved flaky test runs (#16011)
     new 3c0063b4800 [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
     new 51c1985356f [fix][broker]Fix topic policies update not check message expiry (#15941)
     new 718904dcef8 [fix][broker] Fix create client with TLS config (#16014)
     new 028a42ca389 [Flakey-test] fix flaky-test RackAwareTest.testRackUpdate (#16071)
     new c5a6a0b78b3 [fix][txn] Fix NPE when ack message with transaction at cnx = null  (#16142)
     new 4ce967ea672 [fix][broker][monitoring] fix message ack rate (#16108)
     new b68fa32ea85 Fix `messageQueue` release message issue. (#16155)
     new d7f996f8ae4 [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)
     new 5eefdf10e56 [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
     new 6ed4ed05878 [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests (#16160)
     new 6b3ad13986d [fix][broker] Fix NPE when get /admin/v2/namespaces/public/default/maxTopicsPerNamespace (#16076)
     new ea20a896b40 [fix][client] Add classLoader field for `SchemaDefinition` (#15915)
     new 2198c337a2b [fix][tests] TieredStorageConfigurationTests - clear system properties (#15957)
     new 8856606d8f4 [fix][broker] Fix compaction subscription acknowledge Marker msg issue. (#16205)
     new b71f6113121  [fix][broker]Fix subscribe dispathcer limiter not be initialized (#16175)
     new b75af1772c1 [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)
     new 63f5289865a [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)
     new e83c26efcfc [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
     new a51196ecf4d [fix][broker] Fix NPE when drop backlog for time limit. (#16235)
     new 3048c876c2a [fix][broker]fix npe when invoke replaceBookie. (#16239)

The 29 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/ManagedCursorContainer.java       |  16 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |   6 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  10 +-
 .../bookkeeper/mledger/impl/OpReadEntry.java       |   4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  28 ++++
 .../IsolatedBookieEnsemblePlacementPolicy.java     |   4 +
 .../IsolatedBookieEnsemblePlacementPolicyTest.java |  33 ++++
 .../org/apache/pulsar/broker/PulsarService.java    |  11 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   2 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   3 +-
 .../broker/service/AbstractBaseDispatcher.java     |  17 +-
 .../pulsar/broker/service/BacklogQuotaManager.java |   4 +
 .../pulsar/broker/service/BrokerService.java       |  17 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  42 +++--
 .../PersistentDispatcherMultipleConsumers.java     |  16 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../transaction/pendingack/PendingAckStore.java    |   4 +-
 .../pendingack/impl/InMemoryPendingAckStore.java   |   4 +-
 .../pendingack/impl/MLPendingAckStore.java         |   4 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |   4 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  82 +++++-----
 .../apache/pulsar/broker/admin/NamespacesTest.java |   2 +
 .../pulsar/broker/service/MessageTTLTest.java      |  34 +++-
 .../pulsar/broker/service/RackAwareTest.java       |  14 ++
 .../service/SubscribeDispatchLimiterTest.java      | 108 +++++++++++++
 .../persistent/PersistentSubscriptionTest.java     |   4 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     |  97 ++++++++----
 .../pulsar/broker/transaction/TransactionTest.java |  54 ++++++-
 .../client/api/ConsumerBatchReceiveTest.java       |  47 ++++++
 .../pulsar/client/api/MultiTopicsConsumerTest.java |   2 +-
 .../pulsar/client/impl/MessageChunkingTest.java    |  10 ++
 .../apache/pulsar/compaction/CompactionTest.java   |  70 +++++++++
 .../functions/worker/PulsarFunctionTlsTest.java    |   7 +-
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   4 +-
 .../pulsar/client/api/schema/SchemaDefinition.java |   7 +
 .../client/api/schema/SchemaDefinitionBuilder.java |   9 ++
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |   2 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  11 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  29 ++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  32 +++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  24 +--
 .../PersistentAcknowledgmentsGroupingTracker.java  | 141 +++++++++--------
 .../pulsar/client/impl/PulsarClientImpl.java       |  12 +-
 .../pulsar/client/impl/schema/AvroSchema.java      |   5 +-
 .../impl/schema/SchemaDefinitionBuilderImpl.java   |  15 +-
 .../client/impl/schema/SchemaDefinitionImpl.java   |  11 +-
 .../pulsar/client/util/ExecutorProvider.java       |  10 +-
 .../client/util/ScheduledExecutorProvider.java     |  17 +-
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  38 +++++
 .../pulsar/client/impl/LastCumulativeAckTest.java  |  86 ++++++++++
 .../client/impl/MultiTopicsConsumerImplTest.java   |   2 +-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  77 ++++++++-
 .../client/impl/schema/SchemaBuilderTest.java      |   8 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  14 +-
 .../pulsar/sql/presto/TestPulsarConnector.java     |   4 +-
 .../impl/MLTransactionMetadataStore.java           | 174 +++++++++++----------
 .../impl/MLTransactionMetadataStoreProvider.java   |   4 +-
 .../MLTransactionMetadataStoreTest.java            |  38 ++---
 .../provider/TieredStorageConfigurationTests.java  |  42 ++---
 59 files changed, 1178 insertions(+), 400 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java
 copy pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkContext.java => pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java (66%)
 create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java


[pulsar] 01/29: [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4a02d47e269e9d005db1197cb707704fb0ea80db
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Jun 6 16:48:40 2022 +0800

    [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
    
    (cherry picked from commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1)
---
 .../pulsar/broker/web/PulsarWebResource.java       | 82 ++++++++++++----------
 1 file changed, 45 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index d810de85bf4..0c07ba7a091 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -830,19 +830,26 @@ public abstract class PulsarWebResource {
                     log.warn(msg);
                     validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
                 } else if (!policies.replication_clusters.contains(localCluster)) {
-                    ClusterDataImpl ownerPeerCluster = getOwnerFromPeerClusterList(pulsarService,
-                            policies.replication_clusters);
-                    if (ownerPeerCluster != null) {
-                        // found a peer that own this namespace
-                        validationFuture.complete(ownerPeerCluster);
-                        return;
-                    }
-                    String msg = String.format(
-                            "Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s",
-                            localCluster, namespace.toString(), policies.replication_clusters);
-
-                    log.warn(msg);
-                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg));
+                    getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters)
+                            .thenAccept(ownerPeerCluster -> {
+                                if (ownerPeerCluster != null) {
+                                    // found a peer that own this namespace
+                                    validationFuture.complete(ownerPeerCluster);
+                                } else {
+                                    String msg = String.format(
+                                            "Namespace missing local cluster name in clusters list: local_cluster=%s"
+                                                    + " ns=%s clusters=%s",
+                                            localCluster, namespace.toString(), policies.replication_clusters);
+                                    log.warn(msg);
+                                    validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED,
+                                            msg));
+                                }
+                            })
+                            .exceptionally(ex -> {
+                                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                                validationFuture.completeExceptionally(new RestException(cause));
+                                return null;
+                            });
                 } else {
                     validationFuture.complete(null);
                 }
@@ -861,34 +868,35 @@ public abstract class PulsarWebResource {
         return validationFuture;
     }
 
-    private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) {
+    private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar,
+            Set<String> replicationClusters) {
         String currentCluster = pulsar.getConfiguration().getClusterName();
         if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
-            return null;
+            return CompletableFuture.completedFuture(null);
         }
 
-        try {
-            Optional<ClusterData> cluster =
-                    pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster);
-            if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
-                return null;
-            }
-            for (String peerCluster : cluster.get().getPeerClusterNames()) {
-                if (replicationClusters.contains(peerCluster)) {
-                    return (ClusterDataImpl) pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster)
-                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                                    "Peer cluster " + peerCluster + " data not found"));
-                }
-            }
-        } catch (Exception e) {
-            log.error("Failed to get peer-cluster {}-{}", currentCluster, e.getMessage());
-            if (e instanceof RestException) {
-                throw (RestException) e;
-            } else {
-                throw new RestException(e);
-            }
-        }
-        return null;
+        return pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster)
+                .thenCompose(cluster -> {
+                    if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    for (String peerCluster : cluster.get().getPeerClusterNames()) {
+                        if (replicationClusters.contains(peerCluster)) {
+                            return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
+                                    .thenApply(ret -> {
+                                        if (!ret.isPresent()) {
+                                            throw new RestException(Status.NOT_FOUND,
+                                                    "Peer cluster " + peerCluster + " data not found");
+                                        }
+                                        return (ClusterDataImpl) ret.get();
+                                    });
+                        }
+                    }
+                    return CompletableFuture.completedFuture(null);
+                }).exceptionally(ex -> {
+                    log.error("Failed to get peer-cluster {}-{}", currentCluster, ex.getMessage());
+                    throw FutureUtil.wrapToCompletionException(ex);
+                });
     }
 
     protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,


[pulsar] 23/29: [fix][broker] Fix compaction subscription acknowledge Marker msg issue. (#16205)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8856606d8f46082773799770e201731b8207d658
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Jun 26 16:28:24 2022 +0800

    [fix][broker] Fix compaction subscription acknowledge Marker msg issue. (#16205)
    
    (cherry picked from commit 8e0cd9c954a7d3bf00fc1bc790e811443c411c32)
---
 .../broker/service/AbstractBaseDispatcher.java     | 17 ++++--
 .../apache/pulsar/compaction/CompactionTest.java   | 70 ++++++++++++++++++++++
 2 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 8e7dfc14aa7..c9ea4a56d6c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
@@ -36,6 +37,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -170,15 +172,13 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 if (Markers.isTxnMarker(msgMetadata)) {
                     // because consumer can receive message is smaller than maxReadPosition,
                     // so this marker is useless for this subscription
-                    subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
-                            Collections.emptyMap());
+                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
                 } else if (((PersistentTopic) subscription.getTopic())
                         .isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()))) {
-                    subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
-                            Collections.emptyMap());
+                    individualAcknowledgeMessageIfNeeded(entry.getPosition(), Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
@@ -193,8 +193,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
 
                 entries.set(i, null);
                 entry.release();
-                subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual,
-                        Collections.emptyMap());
+                individualAcknowledgeMessageIfNeeded(pos, Collections.emptyMap());
                 continue;
             } else if (msgMetadata.hasDeliverAtTime()
                     && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
@@ -252,6 +251,12 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         return totalEntries;
     }
 
+    private void individualAcknowledgeMessageIfNeeded(Position position, Map<String, Long> properties) {
+        if (!(subscription instanceof CompactorSubscription)) {
+            subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, properties);
+        }
+    }
+
     private static EntryFilter.FilterResult getFilterResult(FilterContext filterContext, Entry entry,
                                                             ImmutableList<EntryFilterWithClassLoader> entryFilters) {
         for (EntryFilter entryFilter : entryFilters) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index 8c9df66dac4..ddad53fbc82 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertFalse;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -44,11 +45,17 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.Topic;
@@ -66,13 +73,17 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1678,4 +1689,63 @@ public class CompactionTest extends MockedPulsarServiceBaseTest {
         producer1.close();
         producer2.close();
     }
+
+    @Test(timeOut = 60000)
+    public void testCompactionWithMarker() throws Exception {
+        String namespace = "my-property/use/my-ns";
+        final TopicName dest = TopicName.get(
+                BrokerTestUtil.newUniqueName("persistent://" + namespace + "/testWriteMarker"));
+        admin.topics().createNonPartitionedTopic(dest.toString());
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(dest.toString())
+                .subscriptionName("test-compaction-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
+                .subscribe();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(dest.toString())
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        producer.send("msg-1".getBytes(StandardCharsets.UTF_8));
+        Optional<Topic> topic = pulsar.getBrokerService().getTopic(dest.toString(), true).join();
+        Assert.assertTrue(topic.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            int rad = random.nextInt(3);
+            ByteBuf marker;
+            if (rad == 0) {
+                marker = Markers.newTxnCommitMarker(-1L, 0, i);
+            } else if (rad == 1) {
+                marker = Markers.newTxnAbortMarker(-1L, 0, i);
+            } else {
+                marker = Markers.newReplicatedSubscriptionsSnapshotRequest(UUID.randomUUID().toString(), "r1");
+            }
+            persistentTopic.getManagedLedger().asyncAddEntry(marker, new AsyncCallbacks.AddEntryCallback() {
+                @Override
+                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
+                    //
+                }
+
+                @Override
+                public void addFailed(ManagedLedgerException exception, Object ctx) {
+                    //
+                }
+            }, null);
+            marker.release();
+        }
+        producer.send("msg-2".getBytes(StandardCharsets.UTF_8));
+        admin.topics().triggerCompaction(dest.toString());
+        Awaitility.await()
+                .atMost(50, TimeUnit.SECONDS)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .untilAsserted(() -> {
+                    long ledgerId = admin.topics().getInternalStats(dest.toString()).compactedLedger.ledgerId;
+                    Assert.assertNotEquals(ledgerId, -1L);
+                });
+    }
 }


[pulsar] 13/29: [Flakey-test] fix flaky-test RackAwareTest.testRackUpdate (#16071)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 028a42ca3891686a5d2cb30e278c6d56abf092c4
Author: Aloys <lo...@gmail.com>
AuthorDate: Thu Jun 16 16:01:28 2022 +0800

    [Flakey-test] fix flaky-test RackAwareTest.testRackUpdate (#16071)
    
    (cherry picked from commit 8914b84115bacaf38fe892d66533c1b70431acbf)
---
 .../org/apache/pulsar/broker/service/RackAwareTest.java    | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 949cc11c6cb..618ae28e6b6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.gson.Gson;
 import java.io.File;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
@@ -34,10 +35,12 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping;
@@ -241,6 +244,13 @@ public class RackAwareTest extends BkEnsemblesTestBase {
         });
 
         BookKeeper bkc = this.pulsar.getBookKeeperClient();
+        Field field = bkc.getClass().getDeclaredField("placementPolicy");
+        field.setAccessible(true);
+        RackawareEnsemblePlacementPolicy ensemblePlacementPolicy = (RackawareEnsemblePlacementPolicy) field.get(bkc);
+        Field topoField =
+                ensemblePlacementPolicy.getClass().getSuperclass().getSuperclass().getDeclaredField("topology");
+        topoField.setAccessible(true);
+        NetworkTopologyImpl networkTopology = (NetworkTopologyImpl) topoField.get(ensemblePlacementPolicy);
 
         // 3. test create ledger
         try {
@@ -279,6 +289,10 @@ public class RackAwareTest extends BkEnsemblesTestBase {
             assertTrue(racks.containsAll(Lists.newArrayList("rack-0", "rack-1")));
         });
 
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(networkTopology.getNumOfRacks(), 2);
+        });
+
         // 5. create ledger required for 2 racks
         for (int i = 0; i < 2; i++) {
             LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);


[pulsar] 06/29: rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 68a8a31c30cbf9a98a02b58c22dd10b1f824c31c
Author: Frank Xiong <Fr...@outlook.com>
AuthorDate: Tue Jun 14 02:20:37 2022 +0800

    rename pulsar_producer_configuration_set_crypto_failure_action to pulsar_producer_configuration_get_crypto_failure_action (#16031)
    
    Fixes #16030
    
    ### Motivation
    Fix symlink error for function pulsar_producer_configuration_get_crypto_failure_action
    
    ### Modifications
    
    Rename function name `pulsar_producer_configuration_set_crypto_failure_action` to `pulsar_producer_configuration_get_crypto_failure_action`
    
    (cherry picked from commit bff34000385f6faf6dbff4385d0dc562602ac623)
---
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index f26f63a593b..906a4d8230c 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -209,7 +209,7 @@ void pulsar_producer_configuration_set_default_crypto_key_reader(pulsar_producer
     conf->conf.setCryptoKeyReader(keyReader);
 }
 
-pulsar_producer_crypto_failure_action pulsar_producer_configuration_set_crypto_failure_action(
+pulsar_producer_crypto_failure_action pulsar_producer_configuration_get_crypto_failure_action(
     pulsar_producer_configuration_t *conf) {
     return (pulsar_producer_crypto_failure_action)conf->conf.getCryptoFailureAction();
 }


[pulsar] 16/29: Fix `messageQueue` release message issue. (#16155)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b68fa32ea853048c0e0d163f5d8d6efc5f5ed539
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 21 17:13:00 2022 +0800

    Fix `messageQueue` release message issue. (#16155)
    
    (cherry picked from commit 141c44022a27be2fc07eab9827cfdb168e448953)
---
 .../org/apache/pulsar/sql/presto/PulsarRecordCursor.java   | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 1ea232203d3..558f6b47e9d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -726,19 +726,17 @@ public class PulsarRecordCursor implements RecordCursor {
     public void close() {
         log.info("Closing cursor record");
 
-        if (currentMessage != null) {
-            currentMessage.release();
-        }
-
-        if (messageQueue != null) {
-            messageQueue.drain(RawMessage::release);
-        }
-
         if (deserializeEntries != null) {
             deserializeEntries.close().whenComplete((r, t) -> {
                 if (entryQueue != null) {
                     entryQueue.drain(Entry::release);
                 }
+                if (messageQueue != null) {
+                    messageQueue.drain(RawMessage::release);
+                }
+                if (currentMessage != null) {
+                    currentMessage.release();
+                }
             });
         }
 


[pulsar] 28/29: [fix][broker] Fix NPE when drop backlog for time limit. (#16235)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a51196ecf4d4d626487f43b9caf4c04e363a0d56
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Jun 28 14:39:00 2022 +0800

    [fix][broker] Fix NPE when drop backlog for time limit. (#16235)
    
    (cherry picked from commit d24d82780fd27a98c6cdbee28d756ee7d419495f)
---
 .../java/org/apache/pulsar/broker/service/BacklogQuotaManager.java    | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index c41901222f3..805d00adca6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -214,6 +214,10 @@ public class BacklogQuotaManager {
                     ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
                     Position oldestPosition = slowestConsumer.getMarkDeletedPosition();
                     ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
+                    if (ledgerInfo == null) {
+                        slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) oldestPosition));
+                        continue;
+                    }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
                             && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) {


[pulsar] 21/29: [fix][client] Add classLoader field for `SchemaDefinition` (#15915)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea20a896b408d052fe0ef366c4a052b6ccfb6c72
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Fri Jun 24 00:33:59 2022 +0800

    [fix][client] Add classLoader field for `SchemaDefinition` (#15915)
    
    Fixes #15899
    
    ### Motivation
    
    Now, don‘t register logical type conversions when use `SchemaDefinition.<T>builder().withJsonDef()` beacase it without  classLoader param.
    
    See:
    https://github.com/apache/pulsar/blob/04aa9e8e51869d1621a7e25402a656084eebfc09/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java#L58-L68
    
    We can add the classLoader field for `SchemaDefinition`, user can manually pass a classLoader to register logical type conversions
    
    ### Modifications
    
    Add classLoader field for `SchemaDefinition`
    
    (cherry picked from commit 8434500b6879abc9ab74de6e5b75883e8053fd9c)
---
 .../pulsar/client/api/schema/SchemaDefinition.java |  7 ++
 .../client/api/schema/SchemaDefinitionBuilder.java |  9 +++
 .../pulsar/client/impl/schema/AvroSchema.java      |  5 +-
 .../impl/schema/SchemaDefinitionBuilderImpl.java   | 15 ++++-
 .../client/impl/schema/SchemaDefinitionImpl.java   | 11 +++-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  | 77 ++++++++++++++++++++--
 .../client/impl/schema/SchemaBuilderTest.java      |  8 ++-
 7 files changed, 122 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
index a6777c5c2fa..88dd3670608 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -75,6 +75,13 @@ public interface SchemaDefinition<T> {
      */
     Class<T> getPojo();
 
+    /**
+     * Get pojo classLoader.
+     *
+     * @return pojo schema
+     */
+    ClassLoader getClassLoader();
+
     /**
      * Get supportSchemaVersioning schema definition.
      *
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
index 61d246674a8..97d822b927d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -80,6 +80,15 @@ public interface SchemaDefinitionBuilder<T> {
      */
     SchemaDefinitionBuilder<T> withPojo(Class pojo);
 
+    /**
+     * Set schema of pojo classLoader.
+     *
+     * @param classLoader pojo classLoader
+     *
+     * @return schema definition builder
+     */
+    SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader);
+
     /**
      * Set schema of json definition.
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 3d0bf157cb3..d2ea9cd4a9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -89,9 +89,12 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> {
                     schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
         }
         ClassLoader pojoClassLoader = null;
-        if (schemaDefinition.getPojo() != null) {
+        if (schemaDefinition.getClassLoader() != null) {
+            pojoClassLoader = schemaDefinition.getClassLoader();
+        } else if (schemaDefinition.getPojo() != null) {
             pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
         }
+
         return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
index fe85a55e117..06a2f50abd6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -40,6 +40,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
      */
     private  Class<T> clazz;
 
+    /**
+     * the classLoader definition class.
+     */
+    private ClassLoader classLoader;
+
     /**
      * The flag of schema type always allow null.
      *
@@ -100,6 +105,12 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
         return this;
     }
 
+    @Override
+    public SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader) {
+        this.classLoader = classLoader;
+        return this;
+    }
+
     @Override
     public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
         this.jsonDef = jsonDef;
@@ -149,8 +160,8 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
 
         properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
         properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
-        return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
-                jsr310ConversionEnabled, reader, writer);
+        return new SchemaDefinitionImpl(clazz, jsonDef, classLoader,
+                alwaysAllowNull, properties, supportSchemaVersioning, jsr310ConversionEnabled, reader, writer);
 
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
index d0db78963db..090211a63fe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -50,6 +50,8 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
 
     private final String jsonDef;
 
+    private final ClassLoader classLoader;
+
     private final boolean supportSchemaVersioning;
 
     private final boolean jsr310ConversionEnabled;
@@ -58,13 +60,15 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
 
     private final SchemaWriter<T> writer;
 
-    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties,
+    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, ClassLoader classLoader,
+                                boolean alwaysAllowNull, Map<String, String> properties,
                                 boolean supportSchemaVersioning, boolean jsr310ConversionEnabled,
                                 SchemaReader<T> reader, SchemaWriter<T> writer) {
         this.alwaysAllowNull = alwaysAllowNull;
         this.properties = properties;
         this.jsonDef = jsonDef;
         this.pojo = pojo;
+        this.classLoader = classLoader;
         this.supportSchemaVersioning = supportSchemaVersioning;
         this.jsr310ConversionEnabled = jsr310ConversionEnabled;
         this.reader = reader;
@@ -104,6 +108,11 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> {
         return pojo;
     }
 
+    @Override
+    public ClassLoader getClassLoader() {
+        return this.classLoader;
+    }
+
     @Override
     public boolean getSupportSchemaVersioning() {
         return supportSchemaVersioning;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index d69f8bf66ba..2a5040d7815 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -36,7 +36,9 @@ import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.UUID;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaValidationException;
@@ -463,21 +465,88 @@ public class AvroSchemaTest {
 
 
     @Data
-    private static class TimestampStruct {
+    @AllArgsConstructor
+    @NoArgsConstructor
+    private static class TimestampPojo {
         Instant value;
     }
 
     @Test
     public void testTimestampWithJsr310Conversion() {
-        AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class);
+        AvroSchema<TimestampPojo> schema = AvroSchema.of(TimestampPojo.class);
         Assert.assertEquals(
                 schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
                 new TimeConversions.TimestampMicrosConversion().getLogicalTypeName());
 
-        AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder()
-                .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build());
+        AvroSchema<TimestampPojo> schema2 = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class).withJSR310ConversionEnabled(true).build());
         Assert.assertEquals(
                 schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(),
                 new TimeConversions.TimestampMillisConversion().getLogicalTypeName());
     }
+
+    @Test
+    public void testTimestampWithJsonDef(){
+        AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class)
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+        byte[] encode = schemaWithPojo.encode(timestampPojo);
+        TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+        Assert.assertEquals(decodeWithPojo, timestampPojo);
+
+        String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+        AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withClassLoader(TimestampPojo.class.getClassLoader())
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+        Assert.assertEquals(decodeWithJson, decodeWithPojo);
+        Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+        AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withJSR310ConversionEnabled(false).build());
+
+        TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+        Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+        Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+    }
+
+    @Test
+    public void testTimestampWithJsonDefAndJSR310ConversionEnabled(){
+        AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withPojo(TimestampPojo.class)
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z"));
+        byte[] encode = schemaWithPojo.encode(timestampPojo);
+        TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode);
+
+        Assert.assertNotEquals(decodeWithPojo, timestampPojo);
+
+        String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema());
+        AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withClassLoader(TimestampPojo.class.getClassLoader())
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode);
+
+        Assert.assertEquals(decodeWithJson, decodeWithPojo);
+        Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass());
+
+        AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder()
+                .withJsonDef(schemaDefinition)
+                .withJSR310ConversionEnabled(true).build());
+
+        TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode);
+        Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo);
+        Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass());
+    }
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index fa88e144a31..a1530864c92 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -20,11 +20,15 @@ package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
-
 import lombok.Data;
 import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;


[pulsar] 08/29: [ML] Fix thread safety issues in ManagedCursorContainer related to "heap" field access (#16049)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cec950eb3e60c5a78834c1277d8b340b9ecddb05
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue Jun 14 15:21:30 2022 +0300

    [ML] Fix thread safety issues in ManagedCursorContainer related to "heap" field access (#16049)
    
    (cherry picked from commit ec9676f6431aa5ebd7df2235f6ebd49ddbf92288)
---
 .../bookkeeper/mledger/impl/ManagedCursorContainer.java  | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
index 65d254112d1..f9591d9ee6a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java
@@ -105,7 +105,21 @@ public class ManagedCursorContainer implements Iterable<ManagedCursor> {
     }
 
     public PositionImpl getSlowestReadPositionForActiveCursors() {
-        return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
+        long stamp = rwLock.readLock();
+        try {
+            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
+        } finally {
+            rwLock.unlockRead(stamp);
+        }
+    }
+
+    public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
+        long stamp = rwLock.readLock();
+        try {
+            return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
+        } finally {
+            rwLock.unlockRead(stamp);
+        }
     }
 
     public ManagedCursor get(String name) {


[pulsar] 02/29: [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 13a87905ce5008063b395ac297e4bbeafd8fb1b9
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Tue Jun 7 21:52:05 2022 +0800

    [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837)
    
    (cherry picked from commit 7a3ad611f51511afca4bcaa1de299517a1907e8e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++------
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  4 ++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 28 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 87f97ca8329..d1267fed27a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2215,15 +2215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
+    PositionImpl startReadOperationOnLedger(PositionImpl position) {
         Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
-        if (null == ledgerId) {
-            opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key"
-                    + ") method is used to return the least key greater than or equal to the given key, "
-                    + "or null if there is no such key"), null);
-        }
-
-        if (ledgerId != position.getLedgerId()) {
+        if (ledgerId != null && ledgerId != position.getLedgerId()) {
             // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
             // to skip on the next available ledger
             position = new PositionImpl(ledgerId, 0);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 006accaf252..a805802e633 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback {
     public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
             ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
         OpReadEntry op = RECYCLER.get();
-        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
+        op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
         op.cursor = cursor;
         op.count = count;
         op.callback = callback;
@@ -140,7 +140,7 @@ class OpReadEntry implements ReadEntriesCallback {
 
             // We still have more entries to read from the next ledger, schedule a new async operation
             cursor.ledger.getExecutor().execute(safeRun(() -> {
-                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
+                readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
                 cursor.ledger.asyncReadEntries(OpReadEntry.this);
             }));
         } else {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8e7a77c7a50..3e90ef5ec6b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -52,6 +52,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
@@ -408,6 +409,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException {
+        ManagedLedger ledger = factory.open("my_test_ledger_1");
+        ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
+        NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo();
+        LedgerInfo ledgerInfo = ledgers.firstEntry().getValue();
+        ledgers.clear();
+        ManagedCursor c1 = ledger.openCursor("c1");
+        PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0);
+        PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99);
+        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20,
+                new ReadEntriesCallback() {
+
+                    @Override
+                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
+
+                    }
+
+                    @Override
+                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+                    }
+                }, null, maxPosition);
+        Assert.assertEquals(opReadEntry.readPosition, position);
+    }
+
+
     @Test(timeOut = 20000)
     public void spanningMultipleLedgersWithSize() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);


[pulsar] 05/29: [fix][client] Remove producer when close producer command is received (#16028)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb60a55edf0cea48074a3a9a30510dbbb2cda240
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 15:16:39 2022 +0800

    [fix][client] Remove producer when close producer command is received (#16028)
    
    (cherry picked from commit 5ef895af7d8dec851167e56cdf3e8bec11080f8d)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +++--
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 24 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 38a508bb716..322138699d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -111,7 +111,8 @@ public class ClientCnx extends PulsarHandler {
     // LookupRequests that waiting in client side.
     private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
-    private final ConcurrentLongHashMap<ProducerImpl<?>> producers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ProducerImpl<?>> producers =
             ConcurrentLongHashMap.<ProducerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -721,7 +722,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseProducer(CommandCloseProducer closeProducer) {
         log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId());
         final long producerId = closeProducer.getProducerId();
-        ProducerImpl<?> producer = producers.get(producerId);
+        ProducerImpl<?> producer = producers.remove(producerId);
         if (producer != null) {
             producer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a3a00b1b70e..6ce4afecd02 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
+import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -156,7 +157,7 @@ public class ClientCnxTest {
 
     @Test
     public void testHandleCloseConsumer() {
-        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseConsumer");
         EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
         ClientConfigurationData conf = new ClientConfigurationData();
         ClientCnx cnx = new ClientCnx(conf, eventLoop);
@@ -165,11 +166,28 @@ public class ClientCnxTest {
         cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
         assertEquals(cnx.consumers.size(), 1);
 
-        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
-                .setConsumerId(1);
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer().setConsumerId(consumerId);
         cnx.handleCloseConsumer(closeConsumer);
         assertEquals(cnx.consumers.size(), 0);
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseProducer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testHandleCloseProducer");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long producerId = 1;
+        cnx.registerProducer(producerId, mock(ProducerImpl.class));
+        assertEquals(cnx.producers.size(), 1);
+
+        CommandCloseProducer closeProducerCmd = new CommandCloseProducer().setProducerId(producerId);
+        cnx.handleCloseProducer(closeProducerCmd);
+        assertEquals(cnx.producers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }


[pulsar] 09/29: [improve][tests] improved flaky test runs (#16011)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f7a6afa2e3b166faf9902fe97a82c7e6902ddaf
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Tue Jun 14 13:31:24 2022 -0700

    [improve][tests] improved flaky test runs (#16011)
    
    * [improve][tests] improved flaky test runs
    - improved PulsarFunctionTlsTests by reordering tearDown() logic
    - improved ManagedLedgerFactoryImpl.shutdown() by closing cacheEviction threads early
    - improved TestPulsarConnector memory consumption by removing unnecessary spy()
    - improved PulsarFunctionsTest run by using receive() instead of receive(30, TimeUnit.SECONDS);
    
    * Reverted PulsarFunctionsTest consumer.receive() change
    
    (cherry picked from commit b1b25ef15be4595e2284cd4bbd4b8cfe39ed0743)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java   | 6 +++---
 .../org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java  | 7 ++++---
 .../src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java  | 4 ++--
 .../java/org/apache/pulsar/sql/presto/TestPulsarConnector.java     | 4 ++--
 4 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 5ba1dc14c44..d829efad87f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -257,7 +257,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         double evictionFrequency = Math.max(Math.min(config.getCacheEvictionFrequency(), 1000.0), 0.001);
         long waitTimeMillis = (long) (1000 / evictionFrequency);
 
-        while (true) {
+        while (!closed) {
             try {
                 doCacheEviction();
 
@@ -509,6 +509,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         statsTask.cancel(true);
         flushCursorsTask.cancel(true);
+        cacheEvictionExecutor.shutdownNow();
 
         List<String> ledgerNames = new ArrayList<>(this.ledgers.keySet());
         List<CompletableFuture<Void>> futures = new ArrayList<>(ledgerNames.size());
@@ -589,7 +590,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 }));
             }
         }));
-        cacheEvictionExecutor.shutdownNow();
         entryCacheManager.clear();
         return FutureUtil.waitForAll(futures);
     }
@@ -603,6 +603,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         statsTask.cancel(true);
         flushCursorsTask.cancel(true);
+        cacheEvictionExecutor.shutdownNow();
 
         // take a snapshot of ledgers currently in the map to prevent race conditions
         List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
@@ -646,7 +647,6 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         }
 
         scheduledExecutor.shutdownNow();
-        cacheEvictionExecutor.shutdownNow();
 
         entryCacheManager.clear();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 64e3b588236..844596743d0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -177,12 +177,13 @@ public class PulsarFunctionTlsTest {
     void tearDown() throws Exception {
         try {
             for (int i = 0; i < BROKER_COUNT; i++) {
-                if (pulsarServices[i] != null) {
-                    pulsarServices[i].close();
-                }
                 if (pulsarAdmins[i] != null) {
                     pulsarAdmins[i].close();
                 }
+                if (pulsarServices[i] != null) {
+                    pulsarServices[i].close();
+                }
+
             }
             bkEnsemble.stop();
         } finally {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 15ee27dc3a5..9f6d9f65dbb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -171,9 +171,9 @@ public class PulsarFunctionTlsTest {
         log.info("--- Shutting down ---");
         try {
             functionAdmin.close();
-            bkEnsemble.stop();
-            workerServer.stop();
             functionsWorkerService.stop();
+            workerServer.stop();
+            bkEnsemble.stop();
         } finally {
             if (tempDirectory != null) {
                 tempDirectory.delete();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 7db32f59148..e7b19c8e5f5 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -699,10 +699,10 @@ public abstract class TestPulsarConnector {
         when(PulsarConnectorCache.instance.getManagedLedgerFactory()).thenReturn(managedLedgerFactory);
 
         for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
-            PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+            PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
                     topicsToColumnHandles.get(split.getKey()), split.getValue(),
                     pulsarConnectorConfig, managedLedgerFactory, new ManagedLedgerConfig(),
-                    new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory));
+                    new PulsarConnectorMetricsTracker(new NullStatsProvider()),dispatchingRowDecoderFactory);
             this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
         }
     }


[pulsar] 07/29: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc9ff5965a191d4fa727de7bab925d0c522b415f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
    
    * [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
    
    ### Motivation
    
    When a partitioned topic was created concurrently, especially when
    automatically created by many producers. This case can be reproduced
    easily by configuring `allowAutoTopicCreationType=non-partitioned` and
    starting a Pulsar standalone. Then, run the following code:
    
    ```java
    try (PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650").build()) {
        for (int i = 0; i < 10; i++) {
            client.newProducer().topic("topic").createAsync();
        }
        Thread.sleep(1000);
    }
    ```
    
    We can see a lot of "Could not get connection while
    getPartitionedTopicMetadata" warning logs at client side, while there
    were more warning logs with full stack traces at broker side:
    
    ```
    2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN  org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    ```
    
    It's because when broker handles the partitioned metadata command, it
    calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
    will try creating a partitioned topic if it doesn't exist. It's a race
    condition that if many connections are established during a short time
    interval and one of them created successfully, the following will fail
    with the `AlreadyExistsException`.
    
    ### Modifications
    
    Handles the `MetadataStoreException.AlreadyExistsException` in
    `unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
    `fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
    again.
    
    ### Verifying this change
    
    Even if without this patch, the creation of producers could also succeed
    because they will reconnect to broker again after 100 ms because broker
    will return a `ServiceNotReady` error in thiss case. The only way to
    verify this fix is reproducing the bug again with this patch, we can
    see no reconnection will happen from the logs.
    
    * Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
    
    This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
    
    * Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
 .../org/apache/pulsar/broker/service/BrokerService.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a4840d9f52f..1287076c10f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2540,7 +2540,20 @@ public class BrokerService implements Closeable {
                                         pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
                                                 .thenAccept(md -> future.complete(md))
                                                 .exceptionally(ex -> {
-                                                    future.completeExceptionally(ex);
+                                                    if (ex.getCause()
+                                                            instanceof MetadataStoreException.AlreadyExistsException) {
+                                                        // The partitioned topic might be created concurrently
+                                                        fetchPartitionedTopicMetadataAsync(topicName)
+                                                                .whenComplete((metadata2, ex2) -> {
+                                                                    if (ex2 == null) {
+                                                                        future.complete(metadata2);
+                                                                    } else {
+                                                                        future.completeExceptionally(ex2);
+                                                                    }
+                                                                });
+                                                    } else {
+                                                        future.completeExceptionally(ex);
+                                                    }
                                                     return null;
                                                 });
                                     } else {


[pulsar] 25/29: [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b75af1772c1c0c38447169f7cff02b089a0ac11d
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 09:19:18 2022 +0800

    [improve][broker] Avoid go through all the consumers to get the message ack owner (#16245)
    
    ### Motivation
    
    The broker don't need to go through all the consumers to get the ack owner consumer.
    Instead, it should check the current consumer first. If the pending acks of current consumer
    don't have the ack position, go through all the consumers to find the owner consumer.
    
    (cherry picked from commit 68484f9162bc768816cfd039140fb78196485d19)
---
 .../main/java/org/apache/pulsar/broker/service/Consumer.java   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 9b0d678ffc9..627c06b3976 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -627,10 +627,12 @@ public class Consumer {
     private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
         Consumer ackOwnerConsumer = this;
         if (Subscription.isIndividualAckMode(subType)) {
-            for (Consumer consumer : subscription.getConsumers()) {
-                if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
-                    ackOwnerConsumer = consumer;
-                    break;
+            if (!getPendingAcks().containsKey(ledgerId, entryId)) {
+                for (Consumer consumer : subscription.getConsumers()) {
+                    if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
+                        ackOwnerConsumer = consumer;
+                        break;
+                    }
                 }
             }
         }


[pulsar] 19/29: [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests (#16160)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6ed4ed058789bf41995a9f364b48500f401bbc26
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 23 09:04:25 2022 +0800

    [improve][java-client] Only trigger the batch receive timeout when having pending batch receives requests (#16160)
    
    The consumer will apply the default batch receive policy even if the user will not use the batch receive API.
    
    https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61
    
    This will consume lots of CPU if the client have many consumers (100k consumers)
    
    The Pulsar perf tool can also reproduce the problem if run the test with many consumers
    
    If there is no pending batch receive operation for a consumer, no need to trigger the
    batch timeout task periodically. We can only start the timeout check after adding batch
    receive request to pending request queue.
    
    Remove the lock in MultiTopicsConsumerImpl as #10352 does
    
    Added new test to verify the batch receive timeout task will not start if no batch
    receive request
    
    (cherry picked from commit a0ccdc96bb05d19651f3778c23b89425d516d77a)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 47 ++++++++++++++++++++++
 .../pulsar/client/impl/MessageChunkingTest.java    |  1 +
 .../apache/pulsar/client/impl/ConsumerBase.java    | 20 +++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  1 +
 .../client/impl/MultiTopicsConsumerImpl.java       | 20 +++------
 .../client/impl/MultiTopicsConsumerImplTest.java   |  2 +-
 6 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 8f3d6423afb..8109e8ce8eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.api;
 
 import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerBase;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -48,6 +50,14 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @DataProvider(name = "partitioned")
+    public Object[][] partitionedTopicProvider() {
+        return new Object[][] {
+            { true },
+            { false }
+        };
+    }
+
     @DataProvider(name = "batchReceivePolicy")
     public Object[][] batchReceivePolicyProvider() {
         return new Object[][] {
@@ -425,6 +435,43 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         latch.await();
     }
 
+    @Test(dataProvider = "partitioned")
+    public void testBatchReceiveTimeoutTask(boolean partitioned) throws Exception {
+        final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
+
+        if (partitioned) {
+            admin.topics().createPartitionedTopic(topic, 3);
+        }
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .receiverQueueSize(1)
+                .batchReceivePolicy(BatchReceivePolicy.builder()
+                        .maxNumBytes(1024 * 1024)
+                        .maxNumMessages(1)
+                        .timeout(5, TimeUnit.SECONDS)
+                        .build())
+                .subscribe();
+        Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        final int messagesToSend = 500;
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        for (int i = 0; i < 100; i++) {
+            Assert.assertNotNull(consumer.receive());
+        }
+        Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout());
+        for (int i = 0; i < 400; i++) {
+            Messages<String> batchReceived = consumer.batchReceive();
+            Assert.assertEquals(batchReceived.size(), 1);
+        }
+        Awaitility.await().untilAsserted(() -> Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+        Assert.assertEquals(consumer.batchReceive().size(), 0);
+        Awaitility.await().untilAsserted(() -> Assert.assertFalse(((ConsumerBase<?>)consumer).hasBatchReceiveTimeout()));
+    }
+
 
     private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer,
                                                                             BatchReceivePolicy batchReceivePolicy,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 3f3557290b5..c223208a8ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 689c4eb7405..ed8fb39a3ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -149,7 +149,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
         }
 
-        if (batchReceivePolicy.getTimeoutMs() > 0) {
+    }
+
+    protected void triggerBatchReceiveTimeoutTask() {
+        if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {
             batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
                     batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
         }
@@ -905,7 +908,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
 
         long timeToWaitMs;
-
+        boolean hasPendingReceives = false;
         synchronized (this) {
             // If it's closing/closed we need to ignore this timeout and not schedule next timeout.
             if (getState() == State.Closing || getState() == State.Closed) {
@@ -942,13 +945,18 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                 } else {
                     // The diff is greater than zero, set the timeout to the diff value
                     timeToWaitMs = diff;
+                    hasPendingReceives = true;
                     break;
                 }
 
                 opBatchReceive = pendingBatchReceives.peek();
             }
-            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
-                    timeToWaitMs, TimeUnit.MILLISECONDS);
+            if (hasPendingReceives) {
+                batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
+                        timeToWaitMs, TimeUnit.MILLISECONDS);
+            } else {
+                batchReceiveTimeout = null;
+            }
         }
     }
 
@@ -1091,5 +1099,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return true;
     }
 
+    public boolean hasBatchReceiveTimeout() {
+        return batchReceiveTimeout != null;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 40fc120464e..064d347fce2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -540,6 +540,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
             }
         });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index abfa5f72155..3353dea8733 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -44,8 +44,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -98,8 +96,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private volatile Timeout partitionsAutoUpdateTimeout = null;
     TopicsPartitionChangedListener topicsPartitionChangedListener;
     CompletableFuture<Void> partitionsAutoUpdateFuture = null;
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
     private final ConsumerStatsRecorder stats;
     private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
@@ -407,8 +403,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
-        try {
-            lock.writeLock().lock();
+        internalPinnedExecutor.execute(() -> {
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -429,13 +424,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
                 pendingBatchReceives.add(opBatchReceive);
+                triggerBatchReceiveTimeoutTask();
                 cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
             }
             resumeReceivingFromPausedConsumersIfNeeded();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
+        });
         return result;
     }
 
@@ -677,8 +670,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        lock.writeLock().lock();
-        try {
+        internalPinnedExecutor.execute(() -> {
             CONSUMER_EPOCH.incrementAndGet(this);
             consumers.values().stream().forEach(consumer -> {
                 consumer.redeliverUnacknowledgedMessages();
@@ -686,9 +678,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             });
             clearIncomingMessages();
             unAckedMessageTracker.clear();
-        } finally {
-            lock.writeLock().unlock();
-        }
+        });
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 95dc07c2625..297059b5fee 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -176,7 +176,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Messages<byte[]>> future = consumer.batchReceiveAsync();
-        assertTrue(consumer.hasPendingBatchReceive());
+        Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasPendingBatchReceive()));
         // when
         future.cancel(true);
         // then


[pulsar] 22/29: [fix][tests] TieredStorageConfigurationTests - clear system properties (#15957)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2198c337a2b8c606e696210c69c577db706498dd
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Fri Jun 24 15:45:26 2022 +0200

    [fix][tests] TieredStorageConfigurationTests - clear system properties (#15957)
    
    (cherry picked from commit bacc9d69c66777879a6418e3d61c546150a5e753)
---
 .../provider/TieredStorageConfigurationTests.java  | 42 ++++++++++++----------
 1 file changed, 24 insertions(+), 18 deletions(-)

diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
index bf5e046bf70..8370fb95804 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/provider/TieredStorageConfigurationTests.java
@@ -129,19 +129,21 @@ public class TieredStorageConfigurationTests {
         // set the aws properties with fake creds so the defaultProviderChain works
         System.setProperty("aws.accessKeyId", "fakeid1");
         System.setProperty("aws.secretKey", "fakekey1");
-        Credentials creds1 = config.getProviderCredentials().get();
-        assertEquals(creds1.identity, "fakeid1");
-        assertEquals(creds1.credential, "fakekey1");
+        try {
+            Credentials creds1 = config.getProviderCredentials().get();
+            assertEquals(creds1.identity, "fakeid1");
+            assertEquals(creds1.credential, "fakekey1");
 
-        // reset the properties and ensure we get different values by re-evaluating the chain
-        System.setProperty("aws.accessKeyId", "fakeid2");
-        System.setProperty("aws.secretKey", "fakekey2");
-        Credentials creds2 = config.getProviderCredentials().get();
-        assertEquals(creds2.identity, "fakeid2");
-        assertEquals(creds2.credential, "fakekey2");
-
-        System.clearProperty("aws.accessKeyId");
-        System.clearProperty("aws.secretKey");
+            // reset the properties and ensure we get different values by re-evaluating the chain
+            System.setProperty("aws.accessKeyId", "fakeid2");
+            System.setProperty("aws.secretKey", "fakekey2");
+            Credentials creds2 = config.getProviderCredentials().get();
+            assertEquals(creds2.identity, "fakeid2");
+            assertEquals(creds2.credential, "fakekey2");
+        } finally {
+            System.clearProperty("aws.accessKeyId");
+            System.clearProperty("aws.secretKey");
+        }
     }
 
     /**
@@ -215,11 +217,15 @@ public class TieredStorageConfigurationTests {
         map.put("s3ManagedLedgerOffloadRegion", "my-region");
         System.setProperty("jclouds.SystemPropertyA", "A");
         System.setProperty("jclouds.region", "jclouds-region");
-        TieredStorageConfiguration config = new TieredStorageConfiguration(map);
-        Properties properties = config.getOverrides();
-        System.out.println(properties.toString());
-        assertEquals(properties.get("jclouds.region"), "jclouds-region");
-        assertEquals(config.getServiceEndpoint(), "http://localhost");
-        assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+        try {
+            TieredStorageConfiguration config = new TieredStorageConfiguration(map);
+            Properties properties = config.getOverrides();
+            assertEquals(properties.get("jclouds.region"), "jclouds-region");
+            assertEquals(config.getServiceEndpoint(), "http://localhost");
+            assertEquals(properties.get("jclouds.SystemPropertyA"), "A");
+        } finally {
+            System.clearProperty("jclouds.SystemPropertyA");
+            System.clearProperty("jclouds.region");
+        }
     }
 }


[pulsar] 17/29: [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d7f996f8ae4fb0c665b43df4fa9047dfa3912088
Author: Zike Yang <zi...@apache.org>
AuthorDate: Wed Jun 22 09:46:55 2022 +0800

    [fix][client] Fix the startMessageId can't be respected as the ChunkMessageID (#16154)
    
    ### Motivation
    
    This is the same problem as when the consumer inclusive seeks the chunked message.
    
    See more detail in [PIP-107](https://github.com/apache/pulsar/issues/12402)
    
    ### Modifications
    
    * Use the first chunk message id as the startMessageId when creating the consumer/reader.
    
    (cherry picked from commit 33cf2d09502cec160dcf637786dd5b8fb5669343)
---
 .../java/org/apache/pulsar/client/impl/MessageChunkingTest.java  | 9 +++++++++
 .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java    | 9 ++++++++-
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 3504e263d51..3f3557290b5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -488,6 +488,15 @@ public class MessageChunkingTest extends ProducerConsumerBase {
             assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
         }
 
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topicName)
+                .startMessageIdInclusive()
+                .startMessageId(msgIds.get(1))
+                .create();
+
+        Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
+        assertEquals(msgIds.get(1), readMsg.getMessageId());
+
         consumer1.close();
         consumer2.close();
         producer.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9422877ef63..40fc120464e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -251,7 +251,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = conf.getSubscriptionMode();
-        this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
+        if (startMessageId != null) {
+            if (startMessageId instanceof ChunkMessageIdImpl) {
+                this.startMessageId = new BatchMessageIdImpl(
+                        ((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
+            } else {
+                this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
+            }
+        }
         this.initialStartMessageId = this.startMessageId;
         this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
         AVAILABLE_PERMITS_UPDATER.set(this, 0);


[pulsar] 18/29: [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5eefdf10e563c32552772a3d50127c55ff18d557
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Jun 22 23:34:49 2022 +0800

    [fix][Java Client] Fix thread safety issue of `LastCumulativeAck` (#16072)
    
    ### Motivation
    
    There were several issues caused by the thread safe issue of
    `LastCumulativeAck`, see:
    - https://github.com/apache/pulsar/pull/10586
    - https://github.com/apache/pulsar/pull/12343
    
    The root cause is that `LastCumulativeAck` could be accessed by
    different threads, especially in `flushAsync` method. But the fields are
    accessed directly and no thread safety can be guaranteed.
    
    In addition, the current `LastCumulativeAck` class  was added in
    https://github.com/apache/pulsar/pull/8996 to hold two object
    references, but this modification is wrong.
    
    Before #8996, there are two CAS operations in `doCumulativeAck` method
    in case it's called concurretly. Though the composite CAS operation is
    not atomic.
    
    However, after #8996, only CAS operation was performed but it's compared
    with a `LastCumulativeAck` object, not the two fields (`messageId` and
    `bitSetRecyclable`).
    
    There is another issue that it uses a flag `cumulativeAckFlushRequired`
    to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
    was called concurrently, both would send ACK commands to broker.
    
    ### Modifications
    
    To solve the thread safety issue, this PR move the `LastCumulativeAck`
    out of the `PersistentAcknowledgmentsGroupingTracker` to disable
    directly access to the internal fields. Then, the following synchronized
    methods were added to guarantee the thread safety:
    - `update`: Guarantee the safe write operations. It also recycles the
      `BitSetRecyclable` object before assigning new values and indicates
      itself can be flushed.
    - `flush`: If it can be flushed, return a thread local
      `LastCumulativeAck` instance that contains the message ID and the bit
      set. The bit set is deep copied to avoid the original reference being
      recycled in another `update` call.
    
    In addition, since the `messageId` field is volatile, the `getMessageId`
    method can always retrieve the latest reference.
    
    `LastCumulativeAckTest` is added to verify the sematics above.
    
    Based on the new design, we can only maintain a `LastCumulativeAck`
    field in `PersistentAcknowledgmentsGroupingTracker` and call the related
    methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
    that two concurrent `flushAsync` calls might send the same ACK command
    twice.
    
    (cherry picked from commit 936d6fdc780ea454e72e82b6c7a1885799158d02)
---
 .../PersistentAcknowledgmentsGroupingTracker.java  | 141 +++++++++++----------
 .../pulsar/client/impl/LastCumulativeAckTest.java  |  86 +++++++++++++
 2 files changed, 159 insertions(+), 68 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index f0f0cfd7548..9829babece7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
-import io.netty.util.Recycler;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -34,9 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import lombok.NonNull;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
@@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
     private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
 
-    private volatile LastCumulativeAck lastCumulativeAck =
-            LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
-
-    private volatile boolean cumulativeAckFlushRequired = false;
+    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
 
     // When we flush the command, we should ensure current ack request will send correct
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck>
-            LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
-                    PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");
-
     /**
      * This is a set of all the individual acks that the application has issued and that were not already sent to
      * broker.
@@ -116,13 +108,13 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * resent after a disconnection and for which the user has already sent an acknowledgement.
      */
     @Override
-    public boolean isDuplicate(@NonNull MessageId messageId) {
-        final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
+    public boolean isDuplicate(MessageId messageId) {
+        final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
         if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
             // Already included in a cumulative ack
             return true;
         } else {
-            return pendingIndividualAcks.contains(messageId);
+            return pendingIndividualAcks.contains((MessageIdImpl) messageId);
         }
     }
 
@@ -370,30 +362,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
     private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
-        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
-        while (true) {
-            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
-            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
-                    if (lastCumulativeAck.bitSetRecyclable != null) {
-                        try {
-                            lastCumulativeAck.bitSetRecyclable.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
-                        lastCumulativeAck.bitSetRecyclable = null;
-                    }
-                    lastCumulativeAck.recycle();
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
-                }
-            } else {
-                currentCumulativeAck.recycle();
-                // message id acknowledging an before the current last cumulative ack
-                return;
-            }
-        }
+        lastCumulativeAck.update(msgId, bitSet);
     }
 
     private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
@@ -474,15 +443,15 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     }
 
     private void flushAsync(ClientCnx cnx) {
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
         boolean shouldFlush = false;
-        if (cumulativeAckFlushRequired) {
-            newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
-                    lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
-                    AckType.Cumulative, null, Collections.emptyMap(), false,
-                    this.currentCumulativeAckFuture, null);
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
+        if (lastCumulativeAckToFlush != null) {
             shouldFlush = true;
-            cumulativeAckFlushRequired = false;
+            final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
+            newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
+                    lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
+                    Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
+            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
         }
 
         // Flush all individual acks
@@ -560,7 +529,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
     @Override
     public void flushAndClean() {
         flush();
-        lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
+        lastCumulativeAck.reset();
         pendingIndividualAcks.clear();
     }
 
@@ -664,36 +633,72 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
         return ackReceiptEnabled && cnx != null
                 && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
     }
+}
 
-    private static class LastCumulativeAck {
-        private MessageIdImpl messageId;
-        private BitSetRecyclable bitSetRecyclable;
+@Getter
+class LastCumulativeAck {
 
-        static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
-            LastCumulativeAck op = RECYCLER.get();
-            op.messageId = messageId;
-            op.bitSetRecyclable = bitSetRecyclable;
-            return op;
-        }
+    // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
+    public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
+            new FastThreadLocal<LastCumulativeAck>() {
 
-        private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
+                @Override
+                protected LastCumulativeAck initialValue() {
+                    return new LastCumulativeAck();
+                }
+            };
+    public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
 
-        void recycle() {
-            if (bitSetRecyclable != null) {
+    private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
+    private BitSetRecyclable bitSetRecyclable = null;
+    private boolean flushRequired = false;
+
+    public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        if (messageId.compareTo(this.messageId) > 0) {
+            if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
                 this.bitSetRecyclable.recycle();
             }
-            this.messageId = null;
-            recyclerHandle.recycle(this);
+            set(messageId, bitSetRecyclable);
+            flushRequired = true;
         }
+    }
 
-        private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
-        private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() {
-            @Override
-            protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) {
-                return new LastCumulativeAck(handle);
+    public synchronized LastCumulativeAck flush() {
+        if (flushRequired) {
+            final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
+            if (bitSetRecyclable != null) {
+                localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
+            } else {
+                localLastCumulativeAck.set(this.messageId, null);
             }
-        };
+            flushRequired = false;
+            return localLastCumulativeAck;
+        } else {
+            // Return null to indicate nothing to be flushed
+            return null;
+        }
+    }
+
+    public synchronized void reset() {
+        if (bitSetRecyclable != null) {
+            bitSetRecyclable.recycle();
+        }
+        messageId = DEFAULT_MESSAGE_ID;
+        bitSetRecyclable = null;
+        flushRequired = false;
+    }
+
+    private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
+        this.messageId = messageId;
+        this.bitSetRecyclable = bitSetRecyclable;
+    }
+
+    @Override
+    public String toString() {
+        String s = messageId.toString();
+        if (bitSetRecyclable != null) {
+            s += " (bit set: " + bitSetRecyclable + ")";
+        }
+        return s;
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
new file mode 100644
index 00000000000..102ccfc0e07
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.annotations.Test;
+
+public class LastCumulativeAckTest {
+
+    @Test
+    public void testUpdate() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID);
+        assertNull(lastCumulativeAck.getBitSetRecyclable());
+
+        final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
+        final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
+        bitSetRecyclable1.set(0, 3);
+        lastCumulativeAck.update(messageId1, bitSetRecyclable1);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAck.getMessageId(), messageId1);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8);
+        lastCumulativeAck.update(messageId2, bitSetRecyclable1);
+        // bitSetRecyclable1 is not recycled
+        assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}");
+
+        final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
+        bitSetRecyclable2.set(0, 2);
+
+        // `update()` only accepts a newer message ID, so this call here has no side effect
+        lastCumulativeAck.update(messageId2, bitSetRecyclable2);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);
+
+        final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9);
+        lastCumulativeAck.update(messageId3, bitSetRecyclable2);
+        // bitSetRecyclable1 is recycled because it's replaced in `update`
+        assertEquals(bitSetRecyclable1.toString(), "{}");
+        assertSame(lastCumulativeAck.getMessageId(), messageId3);
+        assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
+        bitSetRecyclable2.recycle();
+    }
+
+    @Test
+    public void testFlush() {
+        final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
+        assertNull(lastCumulativeAck.flush());
+
+        final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3);
+        final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
+        bitSetRecyclable.set(0, 3);
+        lastCumulativeAck.update(messageId, bitSetRecyclable);
+        assertTrue(lastCumulativeAck.isFlushRequired());
+
+        final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
+        assertFalse(lastCumulativeAck.isFlushRequired());
+        assertSame(lastCumulativeAckToFlush.getMessageId(), messageId);
+        assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+        assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
+    }
+
+}


[pulsar] 11/29: [fix][broker]Fix topic policies update not check message expiry (#15941)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 51c1985356fbb50e647f135c8addcebcab962f98
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jun 15 14:35:03 2022 +0800

    [fix][broker]Fix topic policies update not check message expiry (#15941)
    
    (cherry picked from commit cb0cffd6a03799dbbffa54813ebaddba0535787e)
---
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../pulsar/broker/service/MessageTTLTest.java      | 34 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a82bdb8d286..ad7903ba8d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3070,7 +3070,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
             }
             replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
-
+            checkMessageExpiry();
             if (policies.getReplicationClusters() != null) {
                 checkReplicationAndRetryOnFailure();
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
index e05ec328b41..31556197486 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageTTLTest.java
@@ -18,19 +18,26 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import com.google.common.collect.Lists;
-
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.testng.Assert.assertEquals;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -102,4 +109,27 @@ public class MessageTTLTest extends BrokerTestBase {
 
     }
 
+    @Test
+    public void testTTLPoliciesUpdate() throws Exception {
+        final String namespace = "prop/ns-abc";
+        final String topicName = "persistent://" + namespace + "/testTTLPoliciesUpdate";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
+        assertNotNull(topicRef);
+
+        PersistentTopic topicRefMock = spy(topicRef);
+
+        // Namespace polices must be initiated from admin, which contains `replication_clusters`
+        Policies policies = admin.namespaces().getPolicies(namespace);
+        policies.message_ttl_in_seconds = 10;
+        topicRefMock.onPoliciesUpdate(policies);
+        verify(topicRefMock, times(1)).checkMessageExpiry();
+
+        TopicPolicies topicPolicies = new TopicPolicies();
+        topicPolicies.setMessageTTLInSeconds(5);
+        topicRefMock.onUpdate(topicPolicies);
+        verify(topicRefMock, times(2)).checkMessageExpiry();
+    }
 }


[pulsar] 20/29: [fix][broker] Fix NPE when get /admin/v2/namespaces/public/default/maxTopicsPerNamespace (#16076)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6b3ad13986d7f104c3d4ed35f29d82766a1356b1
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Jun 23 12:13:05 2022 +0800

    [fix][broker] Fix NPE when get /admin/v2/namespaces/public/default/maxTopicsPerNamespace (#16076)
    
    (cherry picked from commit c7d74f39757371dd1b2864602534539f7d8cd4cf)
---
 .../main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java  | 3 ++-
 .../src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java   | 2 ++
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4f589a5684e..5cdbdc9930a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2564,7 +2564,8 @@ public abstract class NamespacesBase extends AdminResource {
 
     protected int internalGetMaxTopicsPerNamespace() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ);
-        return getNamespacePolicies(namespaceName).max_topics_per_namespace;
+        return getNamespacePolicies(namespaceName).max_topics_per_namespace != null
+                ? getNamespacePolicies(namespaceName).max_topics_per_namespace : 0;
     }
 
    protected void internalRemoveMaxTopicsPerNamespace() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 0b62f6ea90e..cd9fef4b34c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1500,6 +1500,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         admin.tenants().createTenant("testTenant", tenantInfo);
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
 
+        assertEquals(0, admin.namespaces().getMaxTopicsPerNamespace(namespace));
+
         admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
         assertEquals(10, admin.namespaces().getMaxTopicsPerNamespace(namespace));
 


[pulsar] 10/29: [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c0063b4800c322d3ad12bc1a5d174c59b15746f
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 15 14:17:44 2022 +0800

    [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
    
    [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely.
    ### Motivation
    The MLTransactionMetadataStore constructor and openTransactionMetadataStore method are asynchronous. So there may be situations where the store in the Initializing state was put into stores
    ### Modification
    Pass in the future to wait for MLTransactionMetadataStore initialization to complete
    
    (cherry picked from commit 0fe8ac0d2c1bb909729580e5456b4d57c2a00346)
---
 .../pulsar/broker/transaction/TransactionTest.java |  13 +-
 .../impl/MLTransactionMetadataStore.java           | 174 +++++++++++----------
 .../impl/MLTransactionMetadataStoreProvider.java   |   4 +-
 .../MLTransactionMetadataStoreTest.java            |  38 ++---
 4 files changed, 121 insertions(+), 108 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0b5cab28540..3f6ea313652 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -691,9 +691,8 @@ public class TransactionTest extends TransactionTestBase {
         doNothing().when(timeoutTracker).start();
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
-
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore1.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
 
@@ -705,8 +704,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore2.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
 
@@ -718,8 +717,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore3 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore3.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 685d57e664e..6c88d27cc22 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -79,7 +80,6 @@ public class MLTransactionMetadataStore
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
-                                      TransactionRecoverTracker recoverTracker,
                                       MLTransactionSequenceIdGenerator sequenceIdGenerator) {
         super(State.None);
         this.sequenceIdGenerator = sequenceIdGenerator;
@@ -96,96 +96,108 @@ public class MLTransactionMetadataStore
         DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
                 + tcID.toString() + "thread_factory");
         this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
 
+    public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracker recoverTracker) {
+        CompletableFuture<TransactionMetadataStore> completableFuture = new CompletableFuture<>();
         if (!changeToInitializingState()) {
             log.error("Managed ledger transaction metadata store change state error when init it");
-            return;
-        }
-
-        internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
-            @Override
-            public void replayComplete() {
-                recoverTracker.appendOpenTransactionToTimeoutTracker();
-                if (!changeToReadyState()) {
-                    log.error("Managed ledger transaction metadata store change state error when replay complete");
-                } else {
-                    recoverTracker.handleCommittingAndAbortingTransaction();
-                    timeoutTracker.start();
+            completableFuture
+                    .completeExceptionally(new TransactionCoordinatorClientException
+                    .CoordinatorNotFoundException("transaction metadata store with tcId "
+                            + tcID.toString() + " change state to Initializing error when init it"));
+        } else {
+            internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+                @Override
+                public void replayComplete() {
+                    recoverTracker.appendOpenTransactionToTimeoutTracker();
+                    if (!changeToReadyState()) {
+                        log.error("Managed ledger transaction metadata store change state error when replay complete");
+                        completableFuture
+                                .completeExceptionally(new TransactionCoordinatorClientException
+                                        .CoordinatorNotFoundException("transaction metadata store with tcId "
+                                        + tcID.toString() + " change state to Ready error when init it"));
+
+                    } else {
+                        recoverTracker.handleCommittingAndAbortingTransaction();
+                        timeoutTracker.start();
+                        completableFuture.complete(MLTransactionMetadataStore.this);
+                    }
                 }
-            }
-
-            @Override
-            public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
 
-                try {
+                @Override
+                public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
 
-                    TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
+                    try {
+                        TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
                             transactionMetadataEntry.getTxnidLeastBits());
-                    long transactionId = transactionMetadataEntry.getTxnidLeastBits();
-                    switch (transactionMetadataEntry.getMetadataOp()) {
-                        case NEW:
-                            long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
-                            if (txnMetaMap.containsKey(transactionId)) {
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            } else {
-                                List<Position> positions = new ArrayList<>();
-                                positions.add(position);
-                                long openTimestamp = transactionMetadataEntry.getStartTime();
-                                long timeoutAt = transactionMetadataEntry.getTimeoutMs();
-                                txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
-                                        openTimestamp, timeoutAt), positions));
-                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
-                                        timeoutAt + openTimestamp);
-                            }
-                            break;
-                        case ADD_PARTITION:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            }
-                            break;
-                        case ADD_SUBSCRIPTION:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .addAckedPartitions(subscriptionToTxnSubscription(
-                                                transactionMetadataEntry.getSubscriptionsList()));
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            }
-                            break;
-                        case UPDATE:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
-                                                transactionMetadataEntry.getExpectedStatus());
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                                recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
-                                if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                                    transactionLog.deletePosition(txnMetaMap
-                                            .get(transactionId).getRight()).thenAccept(v ->
-                                            txnMetaMap.remove(transactionId).getLeft());
+                        long transactionId = transactionMetadataEntry.getTxnidLeastBits();
+                        switch (transactionMetadataEntry.getMetadataOp()) {
+                            case NEW:
+                                long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
+                                if (txnMetaMap.containsKey(transactionId)) {
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                } else {
+                                    List<Position> positions = new ArrayList<>();
+                                    positions.add(position);
+                                    long openTimestamp = transactionMetadataEntry.getStartTime();
+                                    long timeoutAt = transactionMetadataEntry.getTimeoutMs();
+                                    txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
+                                            openTimestamp, timeoutAt), positions));
+                                    recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                            timeoutAt + openTimestamp);
                                 }
-                            }
-                            break;
-                        default:
-                            throw new InvalidTxnStatusException("Transaction `"
-                                    + txnID + "` load replay metadata operation "
-                                    + "from transaction log with unknown operation");
+                                break;
+                            case ADD_PARTITION:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                }
+                                break;
+                            case ADD_SUBSCRIPTION:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .addAckedPartitions(subscriptionToTxnSubscription(
+                                                    transactionMetadataEntry.getSubscriptionsList()));
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                }
+                                break;
+                            case UPDATE:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+                                                    transactionMetadataEntry.getExpectedStatus());
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                    recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
+                                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                                        transactionLog.deletePosition(txnMetaMap
+                                                .get(transactionId).getRight()).thenAccept(v ->
+                                                txnMetaMap.remove(transactionId).getLeft());
+                                    }
+                                }
+                                break;
+                            default:
+                                throw new InvalidTxnStatusException("Transaction `"
+                                        + txnID + "` load replay metadata operation "
+                                        + "from transaction log with unknown operation");
+                        }
+                    } catch (InvalidTxnStatusException  e) {
+                        transactionLog.deletePosition(Collections.singletonList(position));
+                        log.error(e.getMessage(), e);
                     }
-                } catch (InvalidTxnStatusException  e) {
-                    transactionLog.deletePosition(Collections.singletonList(position));
-                    log.error(e.getMessage(), e);
                 }
-            }
-        }));
+            }));
+        }
+        return completableFuture;
     }
 
     @Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 0711f00ac70..20df6439827 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -48,8 +48,8 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                 managedLedgerFactory, managedLedgerConfig);
 
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
-        return txnLog.initialize().thenApply(__ ->
+        return txnLog.initialize().thenCompose(__ ->
                 new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
-                        recoverTracker, mlTransactionSequenceIdGenerator));
+                        mlTransactionSequenceIdGenerator).init(recoverTracker));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index a06bf9e6dea..aafe54e6069 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -74,8 +74,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        new TransactionTimeoutTrackerImpl(),
                         mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -149,8 +150,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -178,8 +179,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -201,10 +202,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
+
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -244,10 +246,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                         managedLedgerConfig);
                 txnLog2.initialize().join();
+
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
-                                txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                                mlTransactionSequenceIdGenerator);
+                                txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                transactionMetadataStoreTest.init(new TransactionRecoverTrackerImpl()).get();
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -315,8 +318,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -382,9 +385,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
-
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
 
@@ -401,8 +403,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -423,8 +425,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();


[pulsar] 15/29: [fix][broker][monitoring] fix message ack rate (#16108)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4ce967ea6727afa0a36245b5755fa67f43a170da
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Jun 21 15:53:03 2022 +0800

    [fix][broker][monitoring] fix message ack rate (#16108)
    
    (cherry picked from commit 8869d8c18361fcd5fcf731f9edf2d38ae07cc0cf)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 32 ++++---
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 97 +++++++++++++++-------
 2 files changed, 87 insertions(+), 42 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 349fbd860e4..9b0d678ffc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -368,7 +368,7 @@ public class Consumer {
     }
 
     public CompletableFuture<Void> messageAcked(CommandAck ack) {
-        CompletableFuture<Void> future;
+        CompletableFuture<Long> future;
 
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String, Long> properties = Collections.emptyMap();
@@ -404,11 +404,12 @@ public class Consumer {
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = Collections.singletonList(position);
                 future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
-                        ack.getTxnidLeastBits(), positionsAcked);
+                        ack.getTxnidLeastBits(), positionsAcked)
+                        .thenApply(unused -> 1L);
             } else {
                 List<Position> positionsAcked = Collections.singletonList(position);
                 subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
-                future = CompletableFuture.completedFuture(null);
+                future = CompletableFuture.completedFuture(1L);
             }
         } else {
             if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
@@ -419,16 +420,16 @@ public class Consumer {
         }
 
         return future
-                .whenComplete((__, t) -> {
-                    if (t == null) {
-                        this.messageAckRate.recordEvent(ack.getMessageIdsCount());
-                    }
+                .thenApply(v -> {
+                    this.messageAckRate.recordEvent(v);
+                    return null;
                 });
     }
 
     //this method is for individual ack not carry the transaction
-    private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
+    private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
+        long totalAckCount = 0;
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -461,10 +462,12 @@ public class Consumer {
             checkCanRemovePendingAcksAndHandle(position, msgId);
 
             checkAckValidationError(ack, position);
+
+            totalAckCount += ackedCount;
         }
         subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        completableFuture.complete(null);
+        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
+        completableFuture.complete(totalAckCount);
         if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
             completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
                 //check if the position can remove from the consumer pending acks.
@@ -482,7 +485,7 @@ public class Consumer {
 
 
     //this method is for individual ack carry the transaction
-    private CompletableFuture<Void> individualAckWithTransaction(CommandAck ack) {
+    private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
         // Individual ack
         List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
         if (!isTransactionEnabled()) {
@@ -490,6 +493,7 @@ public class Consumer {
                     new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
         }
 
+        LongAdder totalAckCount = new LongAdder();
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -518,6 +522,8 @@ public class Consumer {
             checkCanRemovePendingAcksAndHandle(position, msgId);
 
             checkAckValidationError(ack, position);
+
+            totalAckCount.add(ackedCount);
         }
 
         CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
@@ -533,7 +539,7 @@ public class Consumer {
                         }
                     }));
         }
-        return completableFuture;
+        return completableFuture.thenApply(__ -> totalAckCount.sum());
     }
 
     private long getBatchSize(MessageIdData msgId) {
@@ -756,9 +762,9 @@ public class Consumer {
         messageAckRate.calculateRate();
 
         stats.msgRateOut = msgOut.getRate();
-        stats.messageAckRate = messageAckRate.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
+        stats.messageAckRate = messageAckRate.getValueRate();
         stats.chunkedMessageRate = chunkedMessageRate.getRate();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 412efe69632..e35be235f3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -29,7 +29,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -53,6 +53,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -249,56 +250,94 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
     private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
             throws Exception {
-        final int messages = 100;
+        final int messages = 1000;
         String subName = "test_sub";
+        CountDownLatch latch = new CountDownLatch(messages);
 
         @Cleanup
-        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
+                .enableBatching(true).batchingMaxMessages(10).create();
+
+        MessageListener<String> listener = (consumer, msg) -> {
+            try {
+                consumer.acknowledge(msg);
+                latch.countDown();
+            } catch (PulsarClientException e) {
+                //ignore
+            }
+        };
         @Cleanup
-        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
-                .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+        Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener(listener)
+                .subscribe();
+        @Cleanup
+        Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .messageListener(listener)
+                .subscribe();
 
         String namespace = TopicName.get(topicName).getNamespace();
 
         for (int i = 0; i < messages; i++) {
-            producer.send(UUID.randomUUID().toString());
+            producer.sendAsync(UUID.randomUUID().toString());
         }
+        producer.flush();
 
-        for (int i = 0; i < messages; i++) {
-            Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
-            if (message == null) {
-                break;
-            }
-
-            consumer.acknowledge(message);
-        }
+        latch.await(20, TimeUnit.SECONDS);
+        TimeUnit.SECONDS.sleep(1);
 
         Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
         Subscription subscription = topic.getSubscription(subName);
         List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
-        Assert.assertEquals(consumers.size(), 1);
+        Assert.assertEquals(consumers.size(), 2);
         org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+        org.apache.pulsar.broker.service.Consumer consumer2 = consumers.get(1);
         consumer1.updateRates();
+        consumer2.updateRates();
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
         String metricStr = output.toString(StandardCharsets.UTF_8.name());
 
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
-        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
-        Assert.assertTrue(metrics.size() > 0);
-
-        int num = 0;
-        for (PrometheusMetricsTest.Metric metric : metrics) {
-            if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
-                num++;
-                Assert.assertTrue(metric.value > 0);
-            } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
-                num++;
-                Assert.assertTrue(metric.value > 0);
-            }
+        Collection<PrometheusMetricsTest.Metric> ackRateMetric = metricsMap.get("pulsar_consumer_msg_ack_rate");
+
+        String rateOutMetricName = exposeTopicLevelMetrics ? "pulsar_consumer_msg_rate_out" : "pulsar_rate_out";
+        Collection<PrometheusMetricsTest.Metric> rateOutMetric = metricsMap.get(rateOutMetricName);
+        Assert.assertTrue(ackRateMetric.size() > 0);
+        Assert.assertTrue(rateOutMetric.size() > 0);
+
+        if (exposeTopicLevelMetrics) {
+            String consumer1Name = consumer1.consumerName();
+            String consumer2Name = consumer2.consumerName();
+            double totalAckRate = ackRateMetric.stream()
+                    .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+                            || metric.tags.get("consumer_name").equals(consumer2Name))
+                    .mapToDouble(metric -> metric.value).sum();
+            double totalRateOut = rateOutMetric.stream()
+                    .filter(metric -> metric.tags.get("consumer_name").equals(consumer1Name)
+                            || metric.tags.get("consumer_name").equals(consumer2Name))
+                    .mapToDouble(metric -> metric.value).sum();
+
+            Assert.assertTrue(totalAckRate > 0D);
+            Assert.assertTrue(totalRateOut > 0D);
+            Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
+        } else {
+            double totalAckRate = ackRateMetric.stream()
+                    .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+                    .mapToDouble(metric -> metric.value).sum();
+            double totalRateOut = rateOutMetric.stream()
+                    .filter(metric -> namespace.equals(metric.tags.get("namespace")))
+                    .mapToDouble(metric -> metric.value).sum();
+
+            Assert.assertTrue(totalAckRate > 0D);
+            Assert.assertTrue(totalRateOut > 0D);
+            Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
         }
-
-        Assert.assertTrue(num > 0);
     }
 }


[pulsar] 24/29: [fix][broker]Fix subscribe dispathcer limiter not be initialized (#16175)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b71f6113121940566006ed353ed11fdf456f07b0
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sun Jun 26 16:30:19 2022 +0800

     [fix][broker]Fix subscribe dispathcer limiter not be initialized (#16175)
    
    (cherry picked from commit 7afc41137b7efa331c66d3a13032f971512f6db6)
---
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../service/SubscribeDispatchLimiterTest.java      | 108 +++++++++++++++++++++
 2 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 1287076c10f..7b442698e92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2302,7 +2302,7 @@ public class BrokerService implements Closeable {
                 topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
                     Dispatcher dispatcher = persistentSubscription.getDispatcher();
                     if (dispatcher != null) {
-                        dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
+                        dispatcher.updateRateLimiter();
                     }
                 });
             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java
new file mode 100644
index 00000000000..801e1be6a73
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeDispatchLimiterTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SubscribeDispatchLimiterTest extends BrokerTestBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setDispatchThrottlingRatePerSubscriptionInMsg(0);
+        conf.setDispatchThrottlingRatePerSubscriptionInByte(0L);
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+        final String subscribeName = "cg_testDispatchRateLimiterPerSubscriptionInMsgOnlyBrokerLevel";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName(subscribeName)
+            .subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+
+        PersistentSubscription subscription = topic.getSubscriptions().get(subscribeName);
+        assertNotNull(subscription);
+        assertFalse(subscription.getDispatcher().getRateLimiter().isPresent());
+
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", "100");
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInMsg(), 100)
+        );
+        Awaitility.await().untilAsserted(() -> {
+            Optional<DispatchRateLimiter> limiterOpt = subscription.getDispatcher().getRateLimiter();
+            assertTrue(limiterOpt.isPresent());
+            assertEquals(limiterOpt.get().getDispatchRateOnMsg(), 100);
+        });
+    }
+
+    @Test
+    public void testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel() throws Exception {
+        final String topicName = "persistent://" + newTopicName();
+        final String subscribeName = "testDispatchRateLimiterPerSubscriptionInByteOnlyBrokerLevel";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topic(topicName)
+            .subscriptionName(subscribeName)
+            .subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+        assertNotNull(topic);
+
+        PersistentSubscription subscription = topic.getSubscriptions().get(subscribeName);
+        assertNotNull(subscription);
+        assertFalse(subscription.getDispatcher().getRateLimiter().isPresent());
+
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInByte", "1000");
+        Awaitility.await().untilAsserted(() ->
+            assertEquals(pulsar.getConfig().getDispatchThrottlingRatePerSubscriptionInByte(), 1000)
+        );
+        Awaitility.await().untilAsserted(() -> {
+            Optional<DispatchRateLimiter> limiterOpt = subscription.getDispatcher().getRateLimiter();
+            assertTrue(limiterOpt.isPresent());
+            assertEquals(limiterOpt.get().getDispatchRateOnByte(), 1000);
+        });
+    }
+}


[pulsar] 14/29: [fix][txn] Fix NPE when ack message with transaction at cnx = null (#16142)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c5a6a0b78b3538098eb04466c8242128144011a9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jun 21 15:33:23 2022 +0800

    [fix][txn] Fix NPE when ack message with transaction at cnx = null  (#16142)
    
    Fix https://github.com/apache/pulsar/issues/16124
    ## Motivation
    When a channel is inactive, connectHandler will set the cnx = null and reconnect.
    At this time, consumers use transaction to ack messages will report NPE.
    ## Modification
    Return exception when cnx = null.
    
    **Why not use a queue to store operations?**
    1. If we use a queue to store op, we need to take care of the timeout of the op. And the lock is required.
    2. If the connection time is long or there is a BUG client that has not been connected, the client will crash.
    
    (cherry picked from commit 53cc84a580dd747685905e1d11b8e19c0e59a614)
---
 .../pulsar/broker/transaction/TransactionTest.java | 41 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  9 ++++-
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 3f6ea313652..68e73d895e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.Bytes;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -87,6 +88,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -109,6 +111,7 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.powermock.reflect.Whitebox;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -992,6 +995,44 @@ public class TransactionTest extends TransactionTestBase {
         transaction.commit().get();
     }
 
+    @Test
+    public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
+        String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer(Schema.BYTES)
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().value(Bytes.toBytes(i)).send();
+        }
+        ClientCnx cnx = Whitebox.invokeMethod(consumer, "cnx");
+        Whitebox.invokeMethod(consumer, "connectionClosed", cnx);
+
+        Message<byte[]> message = consumer.receive();
+        Transaction transaction = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        try {
+            consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+            fail();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.ConnectException);
+        }
+    }
+
+
     @Test
     public void testPendingAckBatchMessageCommit() throws Exception {
         String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b4ee5a2e784..9422877ef63 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2644,7 +2644,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         } else {
             unAckedMessageTracker.remove(messageId);
         }
-        return cnx().newAckForReceipt(cmd, requestId);
+        ClientCnx cnx = cnx();
+        if (cnx == null) {
+            return FutureUtil.failedFuture(new PulsarClientException
+                    .ConnectException("Failed to ack message [" + messageId + "] "
+                    + "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState()));
+        } else {
+            return cnx.newAckForReceipt(cmd, requestId);
+        }
     }
 
     public Map<MessageIdImpl, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {


[pulsar] 12/29: [fix][broker] Fix create client with TLS config (#16014)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 718904dcef8fbe056c733c9ab6215d86827d96b5
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Wed Jun 15 21:40:25 2022 +0800

    [fix][broker] Fix create client with TLS config (#16014)
    
    ### Motivation
    
    In PulsarService, create a client with an incorrect config.
    
    When `tlsEnabled` is `true`, and `brokerClientTlsEnabled` is `false`, users will meet `Failed reason: General OpenSslEngine problem`, due to `tlsTrustCertsFilePath` is incorrect.
    
    ### Modifications
    
    - Fix check TLS enable
    - Setup ciphers and protocols
    - Remove duplicate setTlsTrustCertsFilePath
    
    (cherry picked from commit 22057ca0296e4eb6e0c9d41bc10e24bdbdc71efc)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0acc4ec6956..944201d6125 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1381,12 +1381,13 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 ClientConfigurationData conf =
                         ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
 
-                conf.setServiceUrl(this.getConfiguration().isTlsEnabled()
-                                ? this.brokerServiceUrlTls : this.brokerServiceUrl);
-                conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
-                conf.setTlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
+                boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
+                conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);
 
-                if (this.getConfiguration().isBrokerClientTlsEnabled()) {
+                if (tlsEnabled) {
+                    conf.setTlsCiphers(this.getConfiguration().getBrokerClientTlsCiphers());
+                    conf.setTlsProtocols(this.getConfiguration().getBrokerClientTlsProtocols());
+                    conf.setTlsAllowInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection());
                     if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) {
                         conf.setUseKeyStoreTls(true);
                         conf.setTlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType());


[pulsar] 26/29: [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 63f5289865a04114c93ab3174d7f77c54ecbd340
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:32 2022 +0800

    [improve][java-client] Replace ScheduledExecutor to improve performance of message consumption (#16236)
    
    ### Motivation
    
    The Scheduled Executor doesn't work very efficiently because each task will add to a DelayedQueue(A priority queue) first even if using the `.execute()` method without any schedule delay.
    
    <img width="1845" alt="image" src="https://user-images.githubusercontent.com/12592133/175871343-ecda138f-43a2-472e-ac42-8efdefb58810.png">
    
    <img width="1848" alt="image" src="https://user-images.githubusercontent.com/12592133/175871415-3d8d9fbd-f140-4a4b-a78d-306c1ec9673c.png">
    
    Profile result:
    [perf_consumer_0.html.txt](https://github.com/apache/pulsar/files/8989093/perf_consumer_0.html.txt)
    
    Running a performance test for single topic max message read rate test:
    
    ```
    bin/pulsar-perf consume test -q 1000000 -p 100000000
    bin/pulsar-perf produce test -r 1000000 -s 1 -mk random -o 10000 -threads 2
    ```
    
    Without this PR (2.10.1):
    
    ```
    Profiling started
    2022-06-27T13:44:01,183+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23919664 msg --- 265702.851  msg/s --- 2.027 Mbit/s  --- Latency: mean: 49430.572 ms - med: 49406 - 95pct: 52853 - 99pct: 53024 - 99.9pct: 53053 - 99.99pct: 53056 - Max: 53057
    2022-06-27T13:44:11,196+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 26690802 msg --- 276759.125  msg/s --- 2.112 Mbit/s  --- Latency: mean: 56106.186 ms - med: 56000 - 95pct: 59289 - 99pct: 59985 - 99.9pct: 60037 - 99.99pct: 60042 - Max: 60042
    2022-06-27T13:44:21,216+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 28788693 msg --- 209467.861  msg/s --- 1.598 Mbit/s  --- Latency: mean: 63523.143 ms - med: 63580 - 95pct: 67202 - 99pct: 67523 - 99.9pct: 67547 - 99.99pct: 67548 - Max: 67548
    2022-06-27T13:44:31,233+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 31255365 msg --- 246190.932  msg/s --- 1.878 Mbit/s  --- Latency: mean: 71152.370 ms - med: 71058 - 95pct: 74555 - 99pct: 74806 - 99.9pct: 74842 - 99.99pct: 74847 - Max: 74847
    2022-06-27T13:44:41,247+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 33606630 msg --- 234769.313  msg/s --- 1.791 Mbit/s  --- Latency: mean: 78636.478 ms - med: 78724 - 95pct: 81694 - 99pct: 82090 - 99.9pct: 82279 - 99.99pct: 82285 - Max: 82286
    ```
    
    With this PR:
    
    ```
    Profiling started
    2022-06-27T13:56:20,426+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 431272207 msg --- 1079360.516  msg/s --- 8.235 Mbit/s  --- Latency: mean: 272.645 ms - med: 334 - 95pct: 470 - 99pct: 510 - 99.9pct: 522 - 99.99pct: 523 - Max: 524
    2022-06-27T13:56:30,438+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 441292346 msg --- 1000645.852  msg/s --- 7.634 Mbit/s  --- Latency: mean: 15.512 ms - med: 14 - 95pct: 29 - 99pct: 39 - 99.9pct: 54 - 99.99pct: 55 - Max: 55
    2022-06-27T13:56:40,450+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 451303308 msg --- 999973.040  msg/s --- 7.629 Mbit/s  --- Latency: mean: 18.265 ms - med: 14 - 95pct: 53 - 99pct: 98 - 99.9pct: 174 - 99.99pct: 176 - Max: 177
    2022-06-27T13:56:50,462+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 461308082 msg --- 999309.458  msg/s --- 7.624 Mbit/s  --- Latency: mean: 14.728 ms - med: 14 - 95pct: 18 - 99pct: 41 - 99.9pct: 50 - 99.99pct: 51 - Max: 52
    2022-06-27T13:57:00,475+0800 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 471327606 msg --- 1000738.584  msg/s --- 7.635 Mbit/s  --- Latency: mean: 21.291 ms - med: 16 - 95pct: 52 - 99pct: 61 - 99.9pct: 65 - 99.99pct: 66 - Max: 66
    ```
    
    Profile result with this PR:
    
    [perf_consumer_1.html.txt](https://github.com/apache/pulsar/files/8989095/perf_consumer_1.html.txt)
    
    ### Modification
    
    - Change internal executor and external executor to normal executor service
    - Added a new ScheduledExecutorProvider to handle the scheduled tasks.
    
    (cherry picked from commit 96237a9615fefa2bed247b416bf1a12d8bc4b201)
---
 .../transaction/pendingack/PendingAckStore.java    |  4 +--
 .../pendingack/impl/InMemoryPendingAckStore.java   |  4 +--
 .../pendingack/impl/MLPendingAckStore.java         |  4 +--
 .../pendingack/impl/PendingAckHandleImpl.java      |  4 +--
 .../persistent/PersistentSubscriptionTest.java     |  4 +--
 .../pulsar/client/api/MultiTopicsConsumerTest.java |  2 +-
 .../apache/pulsar/client/impl/ConsumerBase.java    |  9 +++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 13 +++++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 ++-
 .../pulsar/client/impl/PulsarClientImpl.java       | 12 +++++++-
 .../pulsar/client/util/ExecutorProvider.java       | 10 +++---
 .../client/util/ScheduledExecutorProvider.java     | 36 ++++++++++++++++++++++
 12 files changed, 78 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
index 3da676eb827..2f85d2430db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
@@ -38,7 +38,7 @@ public interface PendingAckStore {
      * @param pendingAckHandle the handle of pending ack
      * @param executorService the replay executor service
      */
-    void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService);
+    void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService);
 
     /**
      * Close the transaction pending ack store.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
index d882c80c478..44c9fbe039b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStore.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.transaction.pendingack.impl;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
@@ -33,7 +33,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 public class InMemoryPendingAckStore implements PendingAckStore {
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService scheduledExecutorService) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService scheduledExecutorService) {
         pendingAckHandle.changeToReadyState();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 4ac0cd9b82c..8b115543561 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -110,7 +110,7 @@ public class MLPendingAckStore implements PendingAckStore {
     }
 
     @Override
-    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService transactionReplayExecutor) {
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService transactionReplayExecutor) {
         transactionReplayExecutor
                 .execute(new PendingAckReplay(new MLPendingAckReplyCallBack(pendingAckHandle)));
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 9f4d49f3f59..90bddef52d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -157,8 +156,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 this.pendingAckStoreFuture =
                         pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
                 this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
-                    pendingAckStore.replayAsync(this,
-                            (ScheduledExecutorService) internalPinnedExecutor);
+                    pendingAckStore.replayAsync(this, internalPinnedExecutor);
                 }).exceptionally(e -> {
                     acceptQueue.clear();
                     changeToErrorState();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index c0ca352c153..0e2d300f7a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -40,7 +40,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutorService;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -128,7 +128,7 @@ public class PersistentSubscriptionTest {
             public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
                 return CompletableFuture.completedFuture(new PendingAckStore() {
                     @Override
-                    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ScheduledExecutorService executorService) {
+                    public void replayAsync(PendingAckHandleImpl pendingAckHandle, ExecutorService executorService) {
                         try {
                             Field field = PendingAckHandleState.class.getDeclaredField("state");
                             field.setAccessible(true);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index d8c8bd657f8..29ecb39853a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -72,7 +72,7 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
         return new PulsarClientImpl(conf) {
             {
                 ScheduledExecutorService internalExecutorService =
-                        (ScheduledExecutorService) super.getInternalExecutorService();
+                        (ScheduledExecutorService) super.getScheduledExecutorProvider().getExecutor();
                 internalExecutorServiceDelegate = mock(ScheduledExecutorService.class,
                         // a spy isn't used since that doesn't work for private classes, instead
                         // the mock delegatesTo an existing instance. A delegate is sufficient for verifying
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ed8fb39a3ae..33c2a3cc266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -33,7 +33,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
@@ -73,8 +72,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
-    protected final ScheduledExecutorService externalPinnedExecutor;
-    protected final ScheduledExecutorService internalPinnedExecutor;
+    protected final ExecutorService externalPinnedExecutor;
+    protected final ExecutorService internalPinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -113,8 +112,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.unAckedChunkedMessageIdSequenceMap =
                 ConcurrentOpenHashMap.<MessageIdImpl, MessageIdImpl[]>newBuilder().build();
         this.executorProvider = executorProvider;
-        this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
-        this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
+        this.externalPinnedExecutor = executorProvider.getExecutor();
+        this.internalPinnedExecutor = client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 064d347fce2..4d59064a035 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1363,10 +1364,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            internalPinnedExecutor
-                    .scheduleAtFixedRate(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages),
-                            expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
-                            TimeUnit.MILLISECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
+                    () -> internalPinnedExecutor
+                            .execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
+                    expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
+                    TimeUnit.MILLISECONDS
+            );
             expireChunkMessageTaskScheduled = true;
         }
 
@@ -2353,7 +2356,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            internalPinnedExecutor.schedule(() -> {
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3353dea8733..68c5acbc3d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -278,7 +279,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 return null;
             }
             log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
-            internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+            ((ScheduledExecutorService) client.getScheduledExecutorProvider())
+                    .schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
             return null;
         });
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c2bf8a216e2..3c65339e91d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvid
 import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -105,6 +106,8 @@ public class PulsarClientImpl implements PulsarClient {
     private boolean needStopTimer;
     private final ExecutorProvider externalExecutorProvider;
     private final ExecutorProvider internalExecutorProvider;
+
+    private final ScheduledExecutorProvider scheduledExecutorProvider;
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
 
@@ -190,6 +193,8 @@ public class PulsarClientImpl implements PulsarClient {
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
             this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider :
                     new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal");
+            this.scheduledExecutorProvider = new ScheduledExecutorProvider(conf.getNumIoThreads(),
+                    "pulsar-client-scheduled");
             if (conf.getServiceUrl().startsWith("http")) {
                 lookup = new HttpLookupService(conf, this.eventLoopGroup);
             } else {
@@ -996,7 +1001,7 @@ public class PulsarClientImpl implements PulsarClient {
             }
             previousExceptions.add(e);
 
-            ((ScheduledExecutorService) externalExecutorProvider.getExecutor()).schedule(() -> {
+            ((ScheduledExecutorService) scheduledExecutorProvider.getExecutor()).schedule(() -> {
                 log.warn("[topic: {}] Could not get connection while getPartitionedTopicMetadata -- "
                         + "Will try again in {} ms", topicName, nextDelay);
                 remainingTime.addAndGet(-nextDelay);
@@ -1118,6 +1123,11 @@ public class PulsarClientImpl implements PulsarClient {
     public ExecutorService getInternalExecutorService() {
         return internalExecutorProvider.getExecutor();
     }
+
+    public ScheduledExecutorProvider getScheduledExecutorProvider() {
+        return scheduledExecutorProvider;
+    }
+
     //
     // Transaction related API
     //
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index b5fb3543b82..67606af63a7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Getter;
@@ -41,7 +40,7 @@ public class ExecutorProvider {
     private final String poolName;
     private volatile boolean isShutdown;
 
-    private static class ExtendedThreadFactory extends DefaultThreadFactory {
+    protected static class ExtendedThreadFactory extends DefaultThreadFactory {
 
         @Getter
         private Thread thread;
@@ -56,7 +55,6 @@ public class ExecutorProvider {
         }
     }
 
-
     public ExecutorProvider(int numThreads, String poolName) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
@@ -65,13 +63,17 @@ public class ExecutorProvider {
         for (int i = 0; i < numThreads; i++) {
             ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
                     poolName, Thread.currentThread().isDaemon());
-            ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+            ExecutorService executor = createExecutor(threadFactory);
             executors.add(Pair.of(executor, threadFactory));
         }
         isShutdown = false;
         this.poolName = poolName;
     }
 
+    protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+       return Executors.newSingleThreadExecutor(threadFactory);
+    }
+
     public ExecutorService getExecutor() {
         return executors.get((currentThread.getAndIncrement() & Integer.MAX_VALUE) % numThreads).getKey();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
new file mode 100644
index 00000000000..887ae3bb7ff
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ScheduledExecutorProvider extends ExecutorProvider {
+
+    public ScheduledExecutorProvider(int numThreads, String poolName) {
+        super(numThreads, poolName);
+    }
+
+    @Override
+    protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
+        return Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
+}


[pulsar] 03/29: [fix][client] Remove consumer when close consumer command is received (#15761)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2251e6ebd7a71eb6d54a22060e8e45af727d4498
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jun 13 09:01:16 2022 +0800

    [fix][client] Remove consumer when close consumer command is received (#15761)
    
    (cherry picked from commit 5246c8e1cc44b96db6ba684e0ce64914cfd05a61)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java     |  6 ++++--
 .../org/apache/pulsar/client/impl/ClientCnxTest.java | 20 ++++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 873f6201117..38a508bb716 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static org.apache.pulsar.client.impl.TransactionMetaStoreHandler.getExceptionByServerError;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
@@ -115,7 +116,8 @@ public class ClientCnx extends PulsarHandler {
                     .expectedItems(16)
                     .concurrencyLevel(1)
                     .build();
-    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
+    @VisibleForTesting
+    final ConcurrentLongHashMap<ConsumerImpl<?>> consumers =
             ConcurrentLongHashMap.<ConsumerImpl<?>>newBuilder()
                     .expectedItems(16)
                     .concurrencyLevel(1)
@@ -731,7 +733,7 @@ public class ClientCnx extends PulsarHandler {
     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId());
         final long consumerId = closeConsumer.getConsumerId();
-        ConsumerImpl<?> consumer = consumers.get(consumerId);
+        ConsumerImpl<?> consumer = consumers.remove(consumerId);
         if (consumer != null) {
             consumer.connectionClosed(this);
         } else {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 558c0bfa13f..a3a00b1b70e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandError;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
@@ -152,4 +153,23 @@ public class ClientCnxTest {
 
         eventLoop.shutdownGracefully();
     }
+
+    @Test
+    public void testHandleCloseConsumer() {
+        ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory);
+        ClientConfigurationData conf = new ClientConfigurationData();
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        long consumerId = 1;
+        cnx.registerConsumer(consumerId, mock(ConsumerImpl.class));
+        assertEquals(cnx.consumers.size(), 1);
+
+        CommandCloseConsumer closeConsumer = new CommandCloseConsumer()
+                .setConsumerId(1);
+        cnx.handleCloseConsumer(closeConsumer);
+        assertEquals(cnx.consumers.size(), 0);
+
+        eventLoop.shutdownGracefully();
+    }
 }


[pulsar] 04/29: [fix][admin] Fix typo in validation message (#16021)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 41f83ccc09e7a05091eaad3815fbf5825539e672
Author: visortelle <vi...@gmail.com>
AuthorDate: Mon Jun 13 03:53:39 2022 +0200

    [fix][admin] Fix typo in validation message (#16021)
    
    (cherry picked from commit 8d8a19f786aadb2d18146f4bded3d550d2a2d040)
---
 .../src/main/java/org/apache/pulsar/broker/admin/AdminResource.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 94b2090c3d1..488cf73c9ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -836,7 +836,7 @@ public abstract class AdminResource extends PulsarWebResource {
         checkArgument(
                 (persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
                         && (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
-                String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
+                String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuorum (%s)",
                         persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
                         persistence.getBookkeeperAckQuorum()));
 


[pulsar] 29/29: [fix][broker]fix npe when invoke replaceBookie. (#16239)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3048c876c2a5096e367f4542260382d584c8750f
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Tue Jun 28 20:07:38 2022 +0800

    [fix][broker]fix npe when invoke replaceBookie. (#16239)
    
    * fix npe when invoke replaceBookie.
    
    * fix npe when invoke replaceBookie.
    
    * fix npe when invoke replaceBookie.
    
    Co-authored-by: nicklixinyang <ni...@didiglobal.com>
    (cherry picked from commit 0eed84203b23e325ac15d7dc50e1ed6dbdf4fa2a)
---
 .../IsolatedBookieEnsemblePlacementPolicy.java     |  4 +++
 .../IsolatedBookieEnsemblePlacementPolicyTest.java | 33 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index c086e2c4e5e..b0de57f0420 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -183,9 +183,13 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
                     castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
             if (!primaryIsolationGroupString.isEmpty()) {
                 pair.setLeft(new HashSet(Arrays.asList(primaryIsolationGroupString.split(","))));
+            } else {
+                pair.setLeft(Collections.emptySet());
             }
             if (!secondaryIsolationGroupString.isEmpty()) {
                 pair.setRight(new HashSet(Arrays.asList(secondaryIsolationGroupString.split(","))));
+            } else {
+                pair.setRight(Collections.emptySet());
             }
         }
         return pair;
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 052e3e84527..85feeaecfdd 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -268,6 +268,39 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
 
         isolationPolicy.newEnsemble(4, 4, 4, Collections.emptyMap(), new HashSet<>());
+
+        BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
+        BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
+        BookieId bookie3Id = new BookieSocketAddress(BOOKIE3).toBookieId();
+        BookieId bookie4Id = new BookieSocketAddress(BOOKIE4).toBookieId();
+        // when we set strictBookieAffinityEnabled=true and some namespace not set ISOLATION_BOOKIE_GROUPS there will set "" by default.
+        Map<String, Object> placementPolicyProperties1 = new HashMap<>();
+        placementPolicyProperties1.put(
+                IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "");
+        placementPolicyProperties1.put(
+                IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
+        EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
+                IsolatedBookieEnsemblePlacementPolicy.class,
+                placementPolicyProperties1
+        );
+        Map<String, byte[]> customMetadata1 = new HashMap<>();
+        customMetadata1.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
+
+        BookieId replaceBookie1 = isolationPolicy.replaceBookie(3, 3, 3, customMetadata1,
+                Arrays.asList(bookie1Id,bookie2Id,bookie3Id), bookie3Id, null).getResult();
+        assertEquals(replaceBookie1, bookie4Id);
+
+        // when ISOLATION_BOOKIE_GROUPS miss.
+        Map<String, Object> placementPolicyProperties2 = new HashMap<>();
+        EnsemblePlacementPolicyConfig policyConfig2 = new EnsemblePlacementPolicyConfig(
+                IsolatedBookieEnsemblePlacementPolicy.class,
+                placementPolicyProperties2
+        );
+        Map<String, byte[]> customMetadata2 = new HashMap<>();
+        customMetadata2.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
+        BookieId replaceBookie2 = isolationPolicy.replaceBookie(3, 3, 3, customMetadata2,
+                Arrays.asList(bookie1Id,bookie2Id,bookie3Id), bookie3Id, null).getResult();
+        assertEquals(replaceBookie2, bookie4Id);
     }
 
     /**


[pulsar] 27/29: [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e83c26efcfce5d0d10464a281240feb3e30ab8a2
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 28 11:29:57 2022 +0800

    [improve][broker] Reduce the re-schedule message read operation for PersistentDispatcherMultipleConsumers (#16241)
    
    ### Motivation
    
    Fix the CPU consumption while having many consumers (> 100k) and enabled dispatch rate limit.
    
    ![image](https://user-images.githubusercontent.com/12592133/175940861-7be13d62-042d-46b9-923d-3b1e8354d331.png)
    
    [broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt)
    
    ### Modification
    
    - Added `isRescheduleReadInProgress` to ensure the dispatcher only has one pending re-schedule read task at a time.
    - Added DEBUG log for the re-schedule read operation
    
    (cherry picked from commit eec46ddcba4d2b4f956e1b4d63154cc43087f507)
---
 .../PersistentDispatcherMultipleConsumers.java           | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0a6cf8e02a9..f77a55338f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -105,6 +106,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     "blockedDispatcherOnUnackedMsgs");
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
+    private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
+
     protected enum ReadType {
         Normal, Replay
     }
@@ -290,8 +293,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     @Override
     protected void reScheduleRead() {
-        topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                TimeUnit.MILLISECONDS);
+        if (isRescheduleReadInProgress.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
+            }
+            topic.getBrokerService().executor().schedule(
+                    () -> {
+                        isRescheduleReadInProgress.set(false);
+                        readMoreEntries();
+                        },
+                    MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+        }
     }
 
     // left pair is messagesToRead, right pair is bytesToRead