You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:23 UTC

[pulsar] branch branch-2.9 updated (be806ea -> 69de8d7)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from be806ea  [Proxy] Fix port exhaustion and connection issues in Pulsar Proxy (#14078)
     new 96f6f7b  [C++] Fix pulsar client cpp build fail in gcc-4.8.5 (#14053)
     new dafc838  Upgrade commons-cli to 1.5.0 (#14094)
     new e2c94b8  Updated dependencies to get rid of pulsar-io/jdbc related CVE-2020-13692 (#13753)
     new d78e315  Fixed deadlock on txn semaphore permit exhaustion (#14131)
     new 33448a7  Fix OpBase.callback is not called in failPendingRequest (#14133)
     new befdb84f [Issue 14105] Avoid creating any topics in `NamespaceService#checkTopicExists` during topic lookup. (#14134)
     new 12ee1a1  fix consume failure when BatchReceivePolicy#maxNumBytes < message size (#14139)
     new fafdb6d  [Doc] Fix doc for the wrong default value of `maxPendingChunkedMessage` (#14144)
     new 5ebd5b6  [Transaction] Fix subscription ack transaction marker. (#14170)
     new de5dec4  [Transaction] Optimize transaction timeout (#14172)
     new 8a360da  Add test to ensure correct zk children cache invalidation (#14178)
     new 0b516b8  [Transaction] Fix send normal message can't change MaxReadPosition (#14192)
     new 69de8d7  Fix unack message count for transaction Ack while disabled batch index ack (#14071)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +-
 pom.xml                                            |  10 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  23 ++-
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 .../org/apache/pulsar/broker/service/Consumer.java |  19 ++-
 .../service/persistent/PersistentSubscription.java |  83 ----------
 .../buffer/impl/TopicTransactionBuffer.java        |  38 +++--
 .../buffer/impl/TransactionBufferHandlerImpl.java  |  32 ----
 .../pendingack/impl/PendingAckHandleImpl.java      |  11 +-
 .../org/apache/pulsar/broker/BrokerTestUtil.java   |  15 ++
 .../pulsar/broker/service/BatchMessageTest.java    |  92 +++++++++++
 .../BatchMessageWithBatchIndexLevelTest.java       |  70 +-------
 .../service/TransactionMarkerDeleteTest.java       | 184 ++++++++++-----------
 .../pulsar/broker/transaction/TransactionTest.java |  27 ++-
 .../broker/transaction/TransactionTestBase.java    |   3 +-
 .../buffer/TransactionBufferClientTest.java        |  11 --
 .../buffer/TransactionStablePositionTest.java      | 100 ++++++++++-
 .../pendingack/PendingAckInMemoryDeleteTest.java   |   1 +
 .../MultiBrokerMetadataConsistencyTest.java        |  82 +++++++++
 .../client/api/ConsumerBatchReceiveTest.java       |  22 +++
 .../client/impl/TransactionEndToEndTest.java       |  11 +-
 ...ansactionEndToEndWithoutBatchIndexAckTest.java} |  24 ++-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   4 +-
 pulsar-client-cpp/docker-build-centos7.sh          |   2 +-
 pulsar-client-cpp/docker/centos-7/Dockerfile       |   7 +
 pulsar-client-cpp/tests/ConsumerTest.cc            |   2 +-
 .../apache/pulsar/client/impl/MessagesImpl.java    |   4 +
 .../client/impl/TransactionMetaStoreHandler.java   |  20 +--
 .../impl/transaction/TransactionBuilderImpl.java   |   4 +-
 .../client/impl/transaction/TransactionImpl.java   |   6 +
 pulsar-sql/presto-distribution/LICENSE             |   2 +-
 src/owasp-dependency-check-suppressions.xml        |  74 ++++++++-
 32 files changed, 624 insertions(+), 365 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/MultiBrokerMetadataConsistencyTest.java
 copy pulsar-broker/src/test/java/org/apache/pulsar/{broker/service/persistent/PersistentStreamingDispatcherBlockConsumerTest.java => client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java} (56%)

[pulsar] 02/13: Upgrade commons-cli to 1.5.0 (#14094)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dafc8385719c4d7451140290267f86bc52599f01
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Feb 2 20:29:23 2022 +0200

    Upgrade commons-cli to 1.5.0 (#14094)
    
    - commons-cli 1.3+ is required by Zookeeper 3.7.0+
      - https://github.com/apache/zookeeper/commit/492fd79b
      - commons-cli versions <1.3 fail with error:
        java.lang.NoClassDefFoundError: org/apache/commons/cli/DefaultParser
    
    (cherry picked from commit 20af454af44d979fc43c77de3b8d0e0114411db7)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
 pom.xml                                          | 6 ++++++
 pulsar-sql/presto-distribution/LICENSE           | 2 +-
 3 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index a4c643a..5ca8eee 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -341,7 +341,7 @@ The Apache Software License, Version 2.0
     - com.yahoo.datasketches-memory-0.8.3.jar
     - com.yahoo.datasketches-sketches-core-0.8.3.jar
  * Apache Commons
-    - commons-cli-commons-cli-1.2.jar
+    - commons-cli-commons-cli-1.5.0.jar
     - commons-codec-commons-codec-1.15.jar
     - commons-collections-commons-collections-3.2.2.jar
     - commons-configuration-commons-configuration-1.10.jar
diff --git a/pom.xml b/pom.xml
index 88e1635..41ef32b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@ flexible messaging model and an intuitive client API.</description>
 
     <bookkeeper.version>4.14.4</bookkeeper.version>
     <zookeeper.version>3.6.3</zookeeper.version>
+    <commons-cli.version>1.5.0</commons-cli.version>
     <snappy.version>1.1.7</snappy.version> <!-- ZooKeeper server -->
     <dropwizardmetrics.version>3.2.5</dropwizardmetrics.version> <!-- ZooKeeper server -->
     <curator.version>5.1.0</curator.version>
@@ -342,6 +343,11 @@ flexible messaging model and an intuitive client API.</description>
         <version>${zookeeper.version}</version>
       </dependency>
       <dependency>
+        <groupId>commons-cli</groupId>
+        <artifactId>commons-cli</artifactId>
+        <version>${commons-cli.version}</version>
+      </dependency>
+      <dependency>
         <groupId>io.dropwizard.metrics</groupId>
         <artifactId>metrics-core</artifactId>
         <version>${dropwizardmetrics.version}</version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index c6840b8..2aaf175 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -428,7 +428,7 @@ The Apache Software License, Version 2.0
     - prometheus-metrics-provider-4.14.4.jar
     - codahale-metrics-provider-4.14.4.jar
   * Apache Commons
-    - commons-cli-1.2.jar
+    - commons-cli-1.5.0.jar
     - commons-codec-1.15.jar
     - commons-collections4-4.1.jar
     - commons-configuration-1.10.jar

[pulsar] 09/13: [Transaction] Fix subscription ack transaction marker. (#14170)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5ebd5b6e56c21ec7f3b12c15dbdef076a21887e6
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:41:48 2022 +0800

    [Transaction] Fix subscription ack transaction marker. (#14170)
    
    (cherry picked from commit 603d252ed6e405d2015d2d0fb73047bb9a96b268)
---
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 .../service/persistent/PersistentSubscription.java |  83 ----------
 .../org/apache/pulsar/broker/BrokerTestUtil.java   |  15 ++
 .../service/TransactionMarkerDeleteTest.java       | 184 ++++++++++-----------
 4 files changed, 104 insertions(+), 182 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index f98cfe5..ba757b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -126,6 +126,10 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
                     && msgMetadata.hasTxnidLeastBits()) {
                 if (Markers.isTxnMarker(msgMetadata)) {
+                    // because consumer can receive message is smaller than maxReadPosition,
+                    // so this marker is useless for this subscription
+                    subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
+                            Collections.emptyMap());
                     entries.set(i, null);
                     entry.release();
                     continue;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index cd9f462..f69db68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -69,7 +69,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -77,8 +76,6 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,26 +111,15 @@ public class PersistentSubscription implements Subscription {
     private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();
 
     private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
-    private volatile Position lastMarkDeleteForTransactionMarker;
     private final PendingAckHandle pendingAckHandle;
 
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
-    private DeleteTransactionMarkerState deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
-
-    private final Object waitObject = new Object();
-
     static {
         REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
     }
 
-    public enum DeleteTransactionMarkerState {
-        Process,
-        Wait,
-        None
-    }
-
     static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
         return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
     }
@@ -419,8 +405,6 @@ public class PersistentSubscription implements Subscription {
             }
         }
 
-        deleteTransactionMarker(properties);
-
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
             // Notify all consumer that the end of topic was reached
             if (dispatcher != null) {
@@ -429,73 +413,6 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
-    private void deleteTransactionMarker(Map<String, Long> properties) {
-
-        if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            PositionImpl currentMarkDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
-            if ((lastMarkDeleteForTransactionMarker == null
-                    || ((PositionImpl) lastMarkDeleteForTransactionMarker)
-                    .compareTo(currentMarkDeletePosition) < 0)) {
-                if (currentMarkDeletePosition != null) {
-                    ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
-                    PositionImpl nextPosition = managedLedger.getNextValidPosition(currentMarkDeletePosition);
-                    if (nextPosition != null
-                            && nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) {
-                        synchronized (waitObject) {
-                            if (deleteTransactionMarkerState == DeleteTransactionMarkerState.None) {
-                                deleteTransactionMarkerState = DeleteTransactionMarkerState.Process;
-                                managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
-                                    @Override
-                                    public void readEntryComplete(Entry entry, Object ctx) {
-                                        try {
-                                            MessageMetadata messageMetadata =
-                                                    Commands.parseMessageMetadata(entry.getDataBuffer());
-                                            if (Markers.isTxnCommitMarker(messageMetadata)
-                                                    || Markers.isTxnAbortMarker(messageMetadata)) {
-                                                synchronized (waitObject) {
-                                                    deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
-                                                }
-                                                lastMarkDeleteForTransactionMarker = currentMarkDeletePosition;
-                                                acknowledgeMessage(Collections.singletonList(nextPosition),
-                                                        AckType.Individual, properties);
-                                            } else {
-                                                synchronized (waitObject) {
-                                                    if (deleteTransactionMarkerState
-                                                            == DeleteTransactionMarkerState.Wait) {
-                                                        deleteTransactionMarkerState =
-                                                                DeleteTransactionMarkerState.None;
-                                                        deleteTransactionMarker(properties);
-                                                    } else {
-                                                        deleteTransactionMarkerState =
-                                                                DeleteTransactionMarkerState.None;
-                                                    }
-                                                }
-                                            }
-                                        } finally {
-                                            entry.release();
-                                        }
-                                    }
-
-                                    @Override
-                                    public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                                        synchronized (waitObject) {
-                                            deleteTransactionMarkerState =
-                                                    DeleteTransactionMarkerState.None;
-                                        }
-                                        log.error("Fail to read transaction marker! Position : {}",
-                                                currentMarkDeletePosition, exception);
-                                    }
-                                }, null);
-                            } else if (deleteTransactionMarkerState == DeleteTransactionMarkerState.Process) {
-                                deleteTransactionMarkerState = DeleteTransactionMarkerState.Wait;
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-
     public CompletableFuture<Void> transactionIndividualAcknowledge(
             TxnID txnId,
             List<MutablePair<PositionImpl, Integer>> positions) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index ff03e42..224060c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker;
 
 import java.util.UUID;
+import org.mockito.Mockito;
 
 /**
  * Holds util methods used in test.
@@ -29,4 +30,18 @@ public class BrokerTestUtil {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    /**
+     * Creates a Mockito spy directly without an intermediate instance to spy.
+     * This is to address flaky test issue where a spy created with a given instance fails with
+     * {@link org.mockito.exceptions.misusing.WrongTypeOfReturnValue} exception.
+     *
+     * @param classToSpy the class to spy
+     * @param args the constructor arguments to use when creating the spy instance
+     * @return a spy of the provided class created with given constructor arguments
+     */
+    public static <T> T spyWithClassAndConstructorArgs(Class<T> classToSpy, Object... args) {
+        return Mockito.mock(classToSpy, Mockito.withSettings()
+                .useConstructor(args)
+                .defaultAnswer(Mockito.CALLS_REAL_METHODS));
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index f25b346..aa2a8d4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -25,45 +26,39 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
+import static org.testng.Assert.assertNull;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.protocol.Commands;
-import org.apache.pulsar.common.protocol.Markers;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.testng.collections.Sets;
 
 @Test(groups = "broker")
-public class TransactionMarkerDeleteTest extends BrokerTestBase {
+public class TransactionMarkerDeleteTest extends TransactionTestBase {
 
+    private static final int TOPIC_PARTITION = 3;
+    private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+    private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
-    @Override
     protected void setup() throws Exception {
-        conf.setTransactionCoordinatorEnabled(true);
-        super.baseSetup();
-        admin.tenants().createTenant("public",
-                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
-
-        admin.namespaces().createNamespace("public/default");
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -74,7 +69,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
 
     @Test
     public void testMarkerDeleteTimes() throws Exception {
-        ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test"));
+        ManagedLedgerImpl managedLedger =
+                spy((ManagedLedgerImpl) getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
         PersistentTopic topic = mock(PersistentTopic.class);
         BrokerService brokerService = mock(BrokerService.class);
         PulsarService pulsarService = mock(PulsarService.class);
@@ -85,8 +81,8 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
         doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
         doReturn(managedLedger).when(topic).getManagedLedger();
         ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = spy(new PersistentSubscription(topic, "test",
-                cursor, false));
+        PersistentSubscription persistentSubscription =
+                spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false);
         Position position = managedLedger.addEntry("test".getBytes());
         persistentSubscription.acknowledgeMessage(Collections.singletonList(position),
                 AckType.Individual, Collections.emptyMap());
@@ -96,84 +92,74 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase {
 
     @Test
     public void testMarkerDelete() throws Exception {
-
-        MessageMetadata msgMetadata = new MessageMetadata().clear()
-                .setPublishTime(1)
-                .setProducerName("test")
-                .setSequenceId(0);
-
-        ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
-
-        payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                msgMetadata, payload);
-
-        ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
-        PersistentTopic topic = mock(PersistentTopic.class);
-        doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
-        doReturn(managedLedger).when(topic).getManagedLedger();
-        doReturn("test").when(topic).getName();
-        ManagedCursor cursor = managedLedger.openCursor("test");
-        PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
-                managedLedger.openCursor("test"), false);
-
-        byte[] payloadBytes = toBytes(payload);
-        Position position1 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        Position position2 = managedLedger.addEntry(payloadBytes);
-        Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position position3 = managedLedger.addEntry(payloadBytes);
-
-        assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
-        assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
-
-        // ack position1, markerDeletePosition to markerPosition1
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
-                AckType.Individual, Collections.emptyMap());
-
-        // ack position1, markerDeletePosition to markerPosition1
-        Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition1) == 0);
-
-        // ack position2, markerDeletePosition to markerPosition2
-        persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
-                AckType.Individual, Collections.emptyMap());
-
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition2) == 0);
-
-        // add consequent marker
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnCommitMarker(1, 1, 1)));
-
-        managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
-                .newTxnAbortMarker(1, 1, 1)));
-
-        // ack with transaction, then commit this transaction
-        persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
-                Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();
-
-        persistentSubscription.endTxn(0, 0, 0, 0).get();
-
-        // ack with transaction, then commit this transaction
-        Awaitility.await().until(() ->
-                ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
-                        .compareTo((PositionImpl) markerPosition3) == 0);
-
+        final String subName = "testMarkerDelete";
+        final String topicName = NAMESPACE1 + "/testMarkerDelete";
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .isAckReceiptEnabled(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        Transaction txn1 = getTxn();
+        Transaction txn2 = getTxn();
+        Transaction txn3 = getTxn();
+        Transaction txn4 = getTxn();
+
+        MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send();
+        MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+        txn1.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to msgId1, msgId2 have not be committed
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString());
+
+        MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send();
+        txn2.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1,
+        // because send msgId2 before commit txn1
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString());
+
+        MessageIdImpl msgId4 = (MessageIdImpl) producer.newMessage(txn4).send();
+        txn3.commit().get();
+
+        consumer.acknowledgeAsync(consumer.receive()).get();
+        assertNull(consumer.receive(1, TimeUnit.SECONDS));
+
+        // maxReadPosition move to txn2 marker, because msgId4 have not be committed
+        assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
+                PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 1).toString());
+
+        txn4.abort().get();
+
+        // maxReadPosition move to txn4 abort marker, so entryId is msgId4.getEntryId() + 2
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInternalStats(topicName)
+                .cursors.get(subName).markDeletePosition, PositionImpl.get(msgId4.getLedgerId(),
+                msgId4.getEntryId() + 2).toString()));
     }
 
-    static byte[] toBytes(ByteBuf byteBuf) {
-        byte[] buf = new byte[byteBuf.readableBytes()];
-        byteBuf.readBytes(buf);
-        byteBuf.release();
-        return buf;
+    private Transaction getTxn() throws Exception {
+        return pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
     }
 }

[pulsar] 12/13: [Transaction] Fix send normal message can't change MaxReadPosition (#14192)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b516b8672ee9987f8dac791717f170fd4391ca5
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Feb 10 09:15:19 2022 +0800

    [Transaction] Fix send normal message can't change MaxReadPosition (#14192)
    
    link https://github.com/apache/pulsar/pull/14097
    
    When disable transaction producer connect to broker and the TopicTransactionBuffer is recovering, the TopicTransactionBuffer state is None or Initializing, then send normal message can't change the MaxReadPosition.
    If recover success and then producer don't send message to this topic. The maxReadPosition will not be change and consumer will not receive message when the disable transaction producer sent.
    
    1. recover to Ready state, if no ongoing txns, change maxReadPosition to LAC
    2. recover to NoSnapshot state, change maxReadPosition to LAC
    
    (cherry picked from commit 0287f7f4278c8bf892d869d81d5d1982be39a516)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  38 +++++---
 .../buffer/TransactionStablePositionTest.java      | 100 +++++++++++++++++++--
 2 files changed, 119 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 12818b7..89c77d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -104,25 +104,43 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+        this.recover();
+    }
+
+    private void recover() {
         this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
                 .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
                     @Override
                     public void recoverComplete() {
-                        if (!changeToReadyState()) {
-                            log.error("[{}]Transaction buffer recover fail", topic.getName());
-                        } else {
-                            timer.newTimeout(TopicTransactionBuffer.this,
-                                    takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
-                            transactionBufferFuture.complete(null);
+                        synchronized (TopicTransactionBuffer.this) {
+                            // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover
+                            // completely the normal message have been sent to broker and state is
+                            // not Ready can't sync maxReadPosition when no ongoing transactions
+                            if (ongoingTxns.isEmpty()) {
+                                maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+                            }
+                            if (!changeToReadyState()) {
+                                log.error("[{}]Transaction buffer recover fail", topic.getName());
+                            } else {
+                                timer.newTimeout(TopicTransactionBuffer.this,
+                                        takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                                transactionBufferFuture.complete(null);
+                            }
                         }
                     }
 
                     @Override
                     public void noNeedToRecover() {
-                        if (!changeToNoSnapshotState()) {
-                            log.error("[{}]Transaction buffer recover fail", topic.getName());
-                        } else {
-                            transactionBufferFuture.complete(null);
+                        synchronized (TopicTransactionBuffer.this) {
+                            // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover
+                            // completely the normal message have been sent to broker and state is
+                            // not NoSnapshot can't sync maxReadPosition
+                            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+                            if (!changeToNoSnapshotState()) {
+                                log.error("[{}]Transaction buffer recover fail", topic.getName());
+                            } else {
+                                transactionBufferFuture.complete(null);
+                            }
                         }
                     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index ef1c761..0184b27 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -18,33 +18,37 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.NoSnapshot;
+import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.Ready;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
-
-import com.google.common.collect.Sets;
-
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -157,4 +161,82 @@ public class TransactionStablePositionTest extends TransactionTestBase {
         assertNull(message);
     }
 
+    @DataProvider(name = "enableTransactionAndState")
+    public static Object[][] enableTransactionAndState() {
+        return new Object[][] {
+                { true, TopicTransactionBufferState.State.None },
+                { false, TopicTransactionBufferState.State.None },
+                { true, TopicTransactionBufferState.State.Initializing },
+                { false, TopicTransactionBufferState.State.Initializing }
+        };
+    }
+
+    @Test(dataProvider = "enableTransactionAndState")
+    public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction,
+                                                    TopicTransactionBufferState.State state) throws Exception {
+
+        final String topicName = NAMESPACE1 + "/testSyncNormalPositionWhenTBRecover-"
+                + clientEnableTransaction + state.name();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(clientEnableTransaction)
+                .build();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(TopicName.get(topicName).toString(), false).get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+
+        // wait topic transaction buffer recover success
+        checkTopicTransactionBufferState(clientEnableTransaction, topicTransactionBuffer);
+
+        Field field = TopicTransactionBufferState.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(topicTransactionBuffer, state);
+
+        // init maxReadPosition is PositionImpl.EARLIEST
+        Position position = topicTransactionBuffer.getMaxReadPosition();
+        assertEquals(position, PositionImpl.earliest);
+
+        MessageIdImpl messageId = (MessageIdImpl) producer.send("test".getBytes());
+
+        // send normal message can't change MaxReadPosition when state is None or Initializing
+        position = topicTransactionBuffer.getMaxReadPosition();
+        assertEquals(position, PositionImpl.earliest);
+
+        // invoke recover
+        Method method = TopicTransactionBuffer.class.getDeclaredMethod("recover");
+        method.setAccessible(true);
+        method.invoke(topicTransactionBuffer);
+
+        // change to None state can recover
+        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.None);
+
+        // recover success again
+        checkTopicTransactionBufferState(clientEnableTransaction, topicTransactionBuffer);
+
+        // change MaxReadPosition to normal message position
+        assertEquals(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()),
+                topicTransactionBuffer.getMaxReadPosition());
+    }
+
+    private void checkTopicTransactionBufferState(boolean clientEnableTransaction,
+                                                  TopicTransactionBuffer topicTransactionBuffer) {
+        // recover success
+        Awaitility.await().until(() -> {
+            if (clientEnableTransaction) {
+                // recover success, client enable transaction will change to Ready State
+                return topicTransactionBuffer.getStats().state.equals(Ready.name());
+            } else {
+                // recover success, client disable transaction will change to NoSnapshot State
+                return topicTransactionBuffer.getStats().state.equals(NoSnapshot.name());
+            }
+        });
+    }
 }

[pulsar] 05/13: Fix OpBase.callback is not called in failPendingRequest (#14133)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33448a74f0357befd8d11056a77757eb08fc3240
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Feb 7 18:28:41 2022 +0800

    Fix OpBase.callback is not called in failPendingRequest (#14133)
    
    (cherry picked from commit 49490dbd2439328628b62a0d7c9ebcb152c6c5ab)
---
 .../client/impl/TransactionMetaStoreHandler.java     | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 3c6286b..a30c235 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -123,7 +123,6 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
             if (getState() == State.Closing || getState() == State.Closed) {
                 setState(State.Closed);
                 failPendingRequest();
-                this.pendingRequests.clear();
                 return;
             }
 
@@ -173,17 +172,16 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     }
 
     private void failPendingRequest() {
-        internalPinnedExecutor.execute(() -> {
-            pendingRequests.keys().forEach(k -> {
-                OpBase<?> op = pendingRequests.remove(k);
-                if (op != null && !op.callback.isDone()) {
-                    op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
-                            "Could not get response from transaction meta store when " +
-                                    "the transaction meta store has already close."));
-                    onResponse(op);
-                }
-            });
+        // this method is executed in internalPinnedExecutor.
+        pendingRequests.forEach((k, op) -> {
+            if (op != null && !op.callback.isDone()) {
+                op.callback.completeExceptionally(new PulsarClientException.AlreadyClosedException(
+                        "Could not get response from transaction meta store when "
+                                + "the transaction meta store has already close."));
+                onResponse(op);
+            }
         });
+        this.pendingRequests.clear();
     }
 
     public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit unit) {

[pulsar] 13/13: Fix unack message count for transaction Ack while disabled batch index ack (#14071)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 69de8d7550859854b066927e9e88bff5ce3b3021
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Feb 10 10:56:20 2022 +0800

    Fix unack message count for transaction Ack while disabled batch index ack (#14071)
    
    * Fix unack message count for transaction Ack while disabled batch index ack.
    
    ### Motivation
    
    Fix unack message count for transaction Ack while disabled batch index ack.
    
    Transaction Ack is different with normal message ack for a batch message.
    
    For normal message, we are using a bitset to carry the batch index state, for example
    
    ```
    1. Ack with `00111111` means acks batch index 0 and 1
    2. For ack batch index 2 and 3, the client will send `00001111` to broker
    3. After all the batch been acked, send `00000000` to broker
    ```
    
    The following is for transaction ack:
    
    ```
    1. `00111111` means acks batch index 0 and 1
    1. `11001111` means acks batch index 2 and 3
    ```
    
    ### Verification
    
    Enabled transaction e2e test for batch index ack disabled
    
    (cherry picked from commit a64014627bcdce0a1584b592ee942435e119a2a7)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 19 +++--
 .../pendingack/impl/PendingAckHandleImpl.java      | 11 ++-
 .../pulsar/broker/service/BatchMessageTest.java    | 92 ++++++++++++++++++++++
 .../BatchMessageWithBatchIndexLevelTest.java       | 70 +---------------
 .../broker/transaction/TransactionTestBase.java    |  3 +-
 .../pendingack/PendingAckInMemoryDeleteTest.java   |  1 +
 .../client/impl/TransactionEndToEndTest.java       | 11 +--
 ...ransactionEndToEndWithoutBatchIndexAckTest.java | 46 +++++++++++
 8 files changed, 167 insertions(+), 86 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 630caac..846eb6e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -465,17 +465,13 @@ public class Consumer {
                     ackSets[j] = msgId.getAckSetAt(j);
                 }
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
-                ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
+                ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
-                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
+                ackedCount = batchSize;
             }
 
-            if (msgId.hasBatchIndex()) {
-                positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
-            } else {
-                positionsAcked.add(new MutablePair<>(position, 0));
-            }
+            positionsAcked.add(new MutablePair<>(position, (int) batchSize));
 
             addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
 
@@ -519,7 +515,7 @@ public class Consumer {
     }
 
     private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
-        if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) {
+        if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
             long[] cursorAckSet = getCursorAckSet(position);
             if (cursorAckSet != null) {
                 return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
@@ -548,6 +544,13 @@ public class Consumer {
         return ackedCount;
     }
 
+    private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) {
+        BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets);
+        long ackedCount = batchSize - bitset.cardinality();
+        bitset.recycle();
+        return ackedCount;
+    }
+
     private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) {
         long unAckedCount = batchSize;
         if (isAcknowledgmentAtBatchIndexLevelEnabled) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 1c0b10f..6fafc68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -674,13 +674,18 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                     && individualAckPositions.containsKey(entry.getValue())) {
                 BitSetRecyclable thisBitSet =
                         BitSetRecyclable.valueOf(entry.getValue().getAckSet());
-                thisBitSet.flip(0, individualAckPositions.get(entry.getValue()).right);
+                int batchSize = individualAckPositions.get(entry.getValue()).right;
+                thisBitSet.flip(0, batchSize);
                 BitSetRecyclable otherBitSet =
                         BitSetRecyclable.valueOf(individualAckPositions
                                 .get(entry.getValue()).left.getAckSet());
                 otherBitSet.or(thisBitSet);
-                individualAckPositions.get(entry.getKey())
-                        .left.setAckSet(otherBitSet.toLongArray());
+                if (otherBitSet.cardinality() == batchSize) {
+                    individualAckPositions.remove(entry.getValue());
+                } else {
+                    individualAckPositions.get(entry.getKey())
+                            .left.setAckSet(otherBitSet.toLongArray());
+                }
                 otherBitSet.recycle();
                 thisBitSet.recycle();
             } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 6d8b31c..e35d2a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -39,7 +39,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -49,9 +51,13 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -63,6 +69,8 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class BatchMessageTest extends BrokerTestBase {
 
+    private static final Logger log = LoggerFactory.getLogger(BatchMessageTest.class);
+
     @BeforeClass
     @Override
     protected void setup() throws Exception {
@@ -95,6 +103,15 @@ public class BatchMessageTest extends BrokerTestBase {
         };
     }
 
+    @DataProvider(name = "testSubTypeAndEnableBatch")
+    public Object[][] testSubTypeAndEnableBatch() {
+        return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
+                { SubscriptionType.Failover, Boolean.TRUE },
+                { SubscriptionType.Shared, Boolean.FALSE },
+                { SubscriptionType.Failover, Boolean.FALSE }
+        };
+    }
+
     @Test(dataProvider = "codecAndContainerBuilder")
     public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressionType, BatcherBuilder builder) throws Exception {
         int numMsgs = 50;
@@ -919,5 +936,80 @@ public class BatchMessageTest extends BrokerTestBase {
         consumer1.close();
     }
 
+    @Test(dataProvider="testSubTypeAndEnableBatch")
+    private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
+                                                             boolean enableBatch) throws Exception {
+        final int messageCount = 50;
+        final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
+        final String subscriptionName = "sub-batch-1";
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
+                .newConsumer(Schema.BYTES)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(subType)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .enableBatching(enableBatch)
+                .topic(topicName)
+                .batchingMaxPublishDelay(Integer.MAX_VALUE, TimeUnit.MILLISECONDS)
+                .create();
+
+        CountDownLatch countDownLatch = new CountDownLatch(messageCount);
+        for (int i = 0; i < messageCount; i++) {
+            producer.sendAsync((i + "").getBytes()).thenAccept(msgId -> {
+                log.info("Published message with msgId: {}", msgId);
+                countDownLatch.countDown();
+            });
+            // To generate batch message with different batch size
+            // 31 total batches, 5 batches with 3 messages, 8 batches with 2 messages and 37 batches with 1 message
+            if (((i / 3) % (i % 3 + 1)) == 0) {
+                producer.flush();
+            }
+        }
+
+        countDownLatch.await();
+
+        for (int i = 0; i < messageCount; i++) {
+            Message<byte[]> message = consumer.receive();
+            if (enableBatch) {
+                // only ack messages which batch index < 2, which means we will not to ack the
+                // whole batch for the batch that with more than 2 messages
+                if (((BatchMessageIdImpl) message.getMessageId()).getBatchIndex() < 2) {
+                    consumer.acknowledgeAsync(message).get();
+                }
+            } else {
+                if (i % 2 == 0) {
+                    consumer.acknowledgeAsync(message).get();
+                }
+            }
+        }
+
+        String topic = TopicName.get(topicName).toString();
+        PersistentSubscription persistentSubscription =  (PersistentSubscription) pulsar.getBrokerService()
+                .getTopic(topic, false).get().get().getSubscription(subscriptionName);
+
+        Awaitility.await().untilAsserted(() -> {
+            if (subType == SubscriptionType.Shared) {
+                if (enableBatch) {
+                    if (conf.isAcknowledgmentAtBatchIndexLevelEnabled()) {
+                        assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 1);
+                    } else {
+                        assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 5 * 3);
+                    }
+                } else {
+                    assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
+                }
+            } else {
+                assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
+            }
+        });
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(BatchMessageTest.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 5e09def..c3785c7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -19,28 +19,24 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.Lists;
-import lombok.Cleanup;
 import lombok.SneakyThrows;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
+
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import static org.testng.Assert.assertEquals;
 
 @Test(groups = "broker")
@@ -111,66 +107,4 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
             assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
         });
     }
-
-    @DataProvider(name = "testSubTypeAndEnableBatch")
-    public Object[][] testSubTypeAndEnableBatch() {
-        return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
-                { SubscriptionType.Failover, Boolean.TRUE },
-                { SubscriptionType.Shared, Boolean.FALSE },
-                { SubscriptionType.Failover, Boolean.FALSE }};
-    }
-
-
-    @Test(dataProvider="testSubTypeAndEnableBatch")
-    private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
-                                                             boolean enableBatch) throws Exception {
-
-        final int messageCount = 50;
-        final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
-        final String subscriptionName = "sub-batch-1";
-        @Cleanup
-        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
-                .newConsumer(Schema.BYTES)
-                .topic(topicName)
-                .isAckReceiptEnabled(true)
-                .subscriptionName(subscriptionName)
-                .subscriptionType(subType)
-                .enableBatchIndexAcknowledgment(true)
-                .subscribe();
-
-        @Cleanup
-        Producer<byte[]> producer = pulsarClient
-                .newProducer()
-                .enableBatching(enableBatch)
-                .topic(topicName)
-                .batchingMaxMessages(10)
-                .create();
-
-        CountDownLatch countDownLatch = new CountDownLatch(messageCount);
-        for (int i = 0; i < messageCount; i++) {
-            producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown);
-        }
-
-        countDownLatch.await();
-
-        for (int i = 0; i < messageCount; i++) {
-            Message<byte[]> message = consumer.receive();
-            // wait for receipt
-            if (i < messageCount / 2) {
-                consumer.acknowledgeAsync(message.getMessageId()).get();
-            }
-        }
-
-        String topic = TopicName.get(topicName).toString();
-        PersistentSubscription persistentSubscription =  (PersistentSubscription) pulsar.getBrokerService()
-                .getTopic(topic, false).get().get().getSubscription(subscriptionName);
-
-        Awaitility.await().untilAsserted(() -> {
-            if (subType == SubscriptionType.Shared) {
-                assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
-            } else {
-                assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
-            }
-        });
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index fe7a813..d7a0d3d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -85,6 +85,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
 
     public static final String TENANT = "tnx";
     protected static final String NAMESPACE1 = TENANT + "/ns1";
+    protected ServiceConfiguration conf = new ServiceConfiguration();
 
     public void internalSetup() throws Exception {
         incrementSetupNumber();
@@ -146,7 +147,6 @@ public abstract class TransactionTestBase extends TestRetrySupport {
 
     protected void startBroker() throws Exception {
         for (int i = 0; i < brokerCount; i++) {
-            ServiceConfiguration conf = new ServiceConfiguration();
             conf.setClusterName(CLUSTER_NAME);
             conf.setAdvertisedAddress("localhost");
             conf.setManagedLedgerCacheSizeMB(8);
@@ -156,7 +156,6 @@ public abstract class TransactionTestBase extends TestRetrySupport {
             conf.setConfigurationStoreServers("localhost:3181");
             conf.setAllowAutoTopicCreationType("non-partitioned");
             conf.setBookkeeperClientExposeStatsToPrometheus(true);
-            conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
 
             conf.setBrokerShutdownTimeoutMs(0L);
             conf.setBrokerServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
index bc22473..da2a3a9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java
@@ -62,6 +62,7 @@ public class PendingAckInMemoryDeleteTest extends TransactionTestBase {
     private static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         setUpBase(1, NUM_PARTITIONS, NAMESPACE1 +"/test", 0);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 1f2bd06..b4f43f8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -83,12 +83,13 @@ import org.testng.annotations.Test;
 @Test(groups = "flaky")
 public class TransactionEndToEndTest extends TransactionTestBase {
 
-    private static final int TOPIC_PARTITION = 3;
-    private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
-    private static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
-    private static final int NUM_PARTITIONS = 16;
+    protected static final int TOPIC_PARTITION = 3;
+    protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
+    protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 + "/message-ack-test";
+    protected static final int NUM_PARTITIONS = 16;
     @BeforeMethod
     protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
         setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
         admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
     }
@@ -323,7 +324,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         txnAckTest(true, 200, SubscriptionType.Failover);
     }
 
-    private void txnAckTest(boolean batchEnable, int maxBatchSize,
+    protected void txnAckTest(boolean batchEnable, int maxBatchSize,
                          SubscriptionType subscriptionType) throws Exception {
         String normalTopic = NAMESPACE1 + "/normal-topic";
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
new file mode 100644
index 0000000..1ef3998
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndWithoutBatchIndexAckTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * End to end transaction test.
+ */
+@Slf4j
+@Test(groups = "flaky")
+public class TransactionEndToEndWithoutBatchIndexAckTest extends TransactionEndToEndTest {
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(false);
+        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
+        admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
+    }
+
+    // TODO need to fix which using transaction with individual ack for failover subscription
+    @Test
+    public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+        txnAckTest(true, 200, SubscriptionType.Failover);
+    }
+}

[pulsar] 06/13: [Issue 14105] Avoid creating any topics in `NamespaceService#checkTopicExists` during topic lookup. (#14134)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit befdb84fb9466e5b406cc77ea69d7467dbd54a78
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Feb 8 23:20:48 2022 +0800

    [Issue 14105] Avoid creating any topics in `NamespaceService#checkTopicExists` during topic lookup. (#14134)
    
    (cherry picked from commit 6d03880aabd11e614a29148859aa71678376c7aa)
---
 .../pulsar/broker/namespace/NamespaceService.java  | 23 +++++++++++++++++++---
 1 file changed, 20 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 3c31f71..058354c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.broker.web.PulsarWebResource;
@@ -1121,9 +1122,25 @@ public class NamespaceService implements AutoCloseable {
         if (topic.isPersistent()) {
             return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
         } else {
-            return pulsar.getBrokerService()
-                    .getTopicIfExists(topic.toString())
-                    .thenApply(optTopic -> optTopic.isPresent());
+            if (topic.isPartitioned()) {
+                final TopicName partitionedTopicName = TopicName.get(topic.getPartitionedTopicName());
+                return pulsar.getBrokerService()
+                        .fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+                        .thenApply((metadata) -> topic.getPartitionIndex() < metadata.partitions);
+            } else {
+                // only checks and don't do any topic creating and loading.
+                CompletableFuture<Optional<Topic>> topicFuture =
+                        pulsar.getBrokerService().getTopics().get(topic.toString());
+                if (topicFuture == null) {
+                    return CompletableFuture.completedFuture(false);
+                } else {
+                    return topicFuture.thenApply(Optional::isPresent).exceptionally(throwable -> {
+                        LOG.warn("[{}] topicFuture completed with exception when checkTopicExists, {}",
+                                topic, throwable.getMessage());
+                        return false;
+                    });
+                }
+            }
         }
     }
 

[pulsar] 04/13: Fixed deadlock on txn semaphore permit exhaustion (#14131)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d78e315c01056ab1c3886b19ca3029c2ccaed89a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Feb 6 23:54:54 2022 -0800

    Fixed deadlock on txn semaphore permit exhaustion (#14131)
    
    ### Motivation
    
    Removing semaphore on the end of transactions operations. The semaphore is not very useful here as we are already closing the transactions (backpressure should eventually be applied at the starting of the transactions).
    
    The semaphore here is being acquired from a BK callback thread and it causes a deadlock in broker when the semaphore is full, because the response that will release the permits on the semaphore are coming from either the same thread or a thread in the same condition.
    
    ```
    sun.misc.Unsafe.park(Unsafe.java)
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
    java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
    java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
    org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.canSendRequest(TransactionBufferHandlerImpl.java:216)
    org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.endTxnOnTopic(TransactionBufferHandlerImpl.java:93)
    org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl.commitTxnOnTopic(TransactionBufferClientImpl.java:50)
    org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$23(TransactionMetadataStoreService.java:484)
    org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1253.accept()
    java.util.ArrayList.forEach(ArrayList.java:1257)
    org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$endTxnInTransactionBuffer$25(TransactionMetadataStoreService.java:481)
    org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1251.accept()
    java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
    java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
    org.apache.pulsar.broker.TransactionMetadataStoreService.endTxnInTransactionBuffer(TransactionMetadataStoreService.java:458)
    org.apache.pulsar.broker.TransactionMetadataStoreService.lambda$null$11(TransactionMetadataStoreService.java:349)
    org.apache.pulsar.broker.TransactionMetadataStoreService$$Lambda$1309.accept()
    java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
    java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
    java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
    org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl$3.addComplete(MLTransactionLogImpl.java:160)
    org.apache.bookkeeper.mledger.impl.OpAddEntry.safeRun(OpAddEntry.java:228)
    org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:748)
    ```
    
    (cherry picked from commit bea5bb875c60173e5d365655617185b0df783051)
---
 .../buffer/impl/TransactionBufferHandlerImpl.java  | 32 ----------------------
 .../buffer/TransactionBufferClientTest.java        | 11 --------
 2 files changed, 43 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
index cb74bf3..552ee27 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java
@@ -28,14 +28,12 @@ import io.netty.util.ReferenceCountUtil;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
-import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -52,8 +50,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
     private final AtomicLong requestIdGenerator = new AtomicLong();
     private final long operationTimeoutInMills;
     private final HashedWheelTimer timer;
-    private final Semaphore semaphore;
-    private final boolean blockIfReachMaxPendingOps;
     private final PulsarClient pulsarClient;
 
     private final LoadingCache<String, CompletableFuture<ClientCnx>> cache = CacheBuilder.newBuilder()
@@ -77,8 +73,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         this.pulsarClient = pulsarClient;
         this.pendingRequests = new ConcurrentSkipListMap<>();
         this.operationTimeoutInMills = 3000L;
-        this.semaphore = new Semaphore(10000);
-        this.blockIfReachMaxPendingOps = true;
         this.timer = timer;
     }
 
@@ -90,9 +84,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                     topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue());
         }
         CompletableFuture<TxnID> cb = new CompletableFuture<>();
-        if (!canSendRequest(cb)) {
-            return cb;
-        }
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, action, lowWaterMark);
@@ -108,9 +99,6 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
                     topic, new TxnID(txnIdMostBits, txnIdLeastBits), action.getValue());
         }
         CompletableFuture<TxnID> cb = new CompletableFuture<>();
-        if (!canSendRequest(cb)) {
-            return cb;
-        }
         long requestId = requestIdGenerator.getAndIncrement();
         ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
                 topic, subscription, action, lowWaterMark);
@@ -210,29 +198,9 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
         onResponse(op);
     }
 
-    private boolean canSendRequest(CompletableFuture<?> callback) {
-        try {
-            if (blockIfReachMaxPendingOps) {
-                semaphore.acquire();
-            } else {
-                if (!semaphore.tryAcquire()) {
-                    callback.completeExceptionally(new ReachMaxPendingOpsException("Reach max pending ops."));
-                    return false;
-                }
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            callback.completeExceptionally(TransactionBufferClientException.unwrap(e));
-            return false;
-        }
-        return true;
-    }
-
-
     void onResponse(OpRequestSend op) {
         ReferenceCountUtil.safeRelease(op.byteBuf);
         op.recycle();
-        semaphore.release();
     }
 
     private static final class OpRequestSend {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
index 0082d81..c25447b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
 
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -248,16 +247,6 @@ public class TransactionBufferClientTest extends TransactionTestBase {
 
     @Test
     public void testTransactionBufferHandlerSemaphore() throws Exception {
-
-        Field field = TransactionBufferClientImpl.class.getDeclaredField("tbHandler");
-        field.setAccessible(true);
-        TransactionBufferHandlerImpl transactionBufferHandler = (TransactionBufferHandlerImpl) field.get(tbClient);
-
-        field = TransactionBufferHandlerImpl.class.getDeclaredField("semaphore");
-        field.setAccessible(true);
-        field.set(transactionBufferHandler, new Semaphore(2));
-
-
         String topic = "persistent://" + namespace + "/testTransactionBufferHandlerSemaphore";
         String subName = "test";
 

[pulsar] 08/13: [Doc] Fix doc for the wrong default value of `maxPendingChunkedMessage` (#14144)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fafdb6d5cff00201d61ce07ee47b9ef2b319ac48
Author: Zike Yang <zk...@streamnative.io>
AuthorDate: Tue Feb 8 15:11:30 2022 +0800

    [Doc] Fix doc for the wrong default value of `maxPendingChunkedMessage` (#14144)
    
    (cherry picked from commit ef405ceac89cf23077ae01f8a795faaa61422a88)
---
 .../src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 3c3ce17..4b822a0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -677,7 +677,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked.
      * This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull
      *
-     * @default 100
+     * The default value is 10.
      *
      * @param maxPendingChuckedMessage
      * @return
@@ -702,7 +702,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * the outstanding unchunked-messages by silently acking or asking broker to redeliver later by marking it unacked.
      * This behavior can be controlled by configuration: @autoAckOldestChunkedMessageOnQueueFull
      *
-     * @default 100
+     * The default value is 10.
      *
      * @param maxPendingChunkedMessage
      * @return

[pulsar] 07/13: fix consume failure when BatchReceivePolicy#maxNumBytes < message size (#14139)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 12ee1a1d6e01e69dda7ce55ea3e304bb78abd6bf
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Feb 8 23:21:22 2022 +0800

    fix consume failure when BatchReceivePolicy#maxNumBytes < message size (#14139)
    
    (cherry picked from commit 88fc8445213650f3ab8eb4e3a8cc6fbe24545d07)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 22 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/MessagesImpl.java    |  4 ++++
 2 files changed, 26 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 5522dd7..1473f28 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -403,6 +403,28 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, batchReceivePolicy, messagesToSend / muxNumMessages);
     }
 
+    @Test
+    public void verifyNumBytesSmallerThanMessageSize() throws Exception {
+        final int messagesToSend = 500;
+
+        final String topic = "persistent://my-property/my-ns/batch-receive-" + UUID.randomUUID();
+        BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumBytes(10).build();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("s2")
+                .batchReceivePolicy(batchReceivePolicy)
+                .subscribe();
+
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        CountDownLatch latch = new CountDownLatch(messagesToSend+1);
+        receiveAsync(consumer, messagesToSend, latch);
+        latch.await();
+    }
+
 
     private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer,
                                                                             BatchReceivePolicy batchReceivePolicy,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index 4ff23eb..bb0335b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -45,6 +45,10 @@ public class MessagesImpl<T> implements Messages<T> {
     }
 
     protected boolean canAdd(Message<T> message) {
+        if (currentNumberOfMessages == 0) {
+            // It's ok to add at least one message into a batch.
+            return true;
+        }
         if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) {
             return false;
         }

[pulsar] 03/13: Updated dependencies to get rid of pulsar-io/jdbc related CVE-2020-13692 (#13753)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2c94b8976ecaf6582684cde1584a81c80ef7abd
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Sun Jan 16 09:48:59 2022 -0800

    Updated dependencies to get rid of pulsar-io/jdbc related CVE-2020-13692 (#13753)
    
    * Updated dependencies to get rid of pulsar-io/jdbc related CVE-2020-13692
    
    Also upgraded clickhouse lib and suppressed wrongly detected clickhouse
    CVEs (client lib matched to server CVEs)
    
    * CR feedback
    
    (cherry picked from commit 8214da86b2bd2213a7d97e1d174e8d4e53c1b669)
---
 pom.xml                                     |  4 +-
 src/owasp-dependency-check-suppressions.xml | 74 ++++++++++++++++++++++++++++-
 2 files changed, 75 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 41ef32b..76cf36d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,8 +148,8 @@ flexible messaging model and an intuitive client API.</description>
     <jclouds.version>2.3.0</jclouds.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
-    <postgresql-jdbc.version>42.2.12</postgresql-jdbc.version>
-    <clickhouse-jdbc.version>0.2.4</clickhouse-jdbc.version>
+    <postgresql-jdbc.version>42.2.24</postgresql-jdbc.version>
+    <clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
     <mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
     <hdfs-offload-version3>3.3.0</hdfs-offload-version3>
     <elasticsearch.version>7.9.1</elasticsearch.version>
diff --git a/src/owasp-dependency-check-suppressions.xml b/src/owasp-dependency-check-suppressions.xml
index 139365d..838e142 100644
--- a/src/owasp-dependency-check-suppressions.xml
+++ b/src/owasp-dependency-check-suppressions.xml
@@ -41,4 +41,76 @@
     <gav regex="true">org\.apache\.zookeeper:.*:3\.6\.2</gav>
     <vulnerabilityName regex="true">.*</vulnerabilityName>
   </suppress>
-</suppressions>
\ No newline at end of file
+
+  <!-- clickhouse: security scan matches client lib to the server CVEs -->
+  <suppress>
+    <notes><![CDATA[
+    file name: avro-1.10.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/org\.apache\.avro/avro@.*$</packageUrl>
+    <cve>CVE-2021-43045</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2018-14668</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2018-14669</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2018-14670</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2018-14671</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2018-14672</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2019-15024</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2019-16535</cve>
+  </suppress>
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2019-18657</cve>
+  </suppress> 
+  <suppress>
+    <notes><![CDATA[
+    file name: clickhouse-jdbc-0.3.2.jar
+    ]]></notes>
+    <packageUrl regex="true">^pkg:maven/ru\.yandex\.clickhouse/clickhouse\-jdbc@.*$</packageUrl>
+    <cve>CVE-2021-25263</cve>
+  </suppress>
+</suppressions>

[pulsar] 10/13: [Transaction] Optimize transaction timeout (#14172)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de5dec4239fbd1c3a595e013a81f262059444489
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Feb 9 15:34:25 2022 +0800

    [Transaction] Optimize transaction timeout (#14172)
    
    (cherry picked from commit 5ee210a5a12573bf8d047962bbac82528091216c)
---
 .../pulsar/broker/transaction/TransactionTest.java | 27 +++++++++++++++++++++-
 .../impl/transaction/TransactionBuilderImpl.java   |  4 +---
 .../client/impl/transaction/TransactionImpl.java   |  6 +++++
 3 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 4aaadb2..b627438 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -34,6 +34,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import io.netty.buffer.Unpooled;
+import io.netty.util.Timeout;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashMap;
@@ -774,4 +775,28 @@ public class TransactionTest extends TransactionTestBase {
                 }));
         completableFuture.get(5, TimeUnit.SECONDS);
     }
-}
+
+    @Test
+    public void testCancelTxnTimeout() throws Exception{
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.commit().get();
+
+        Field field = TransactionImpl.class.getDeclaredField("timeout");
+        field.setAccessible(true);
+        Timeout timeout = (Timeout) field.get(transaction);
+        Assert.assertTrue(timeout.isCancelled());
+
+        transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        transaction.abort().get();
+        timeout = (Timeout) field.get(transaction);
+        Assert.assertTrue(timeout.isCancelled());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
index 3ac8676..9878264 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionBuilderImpl.java
@@ -69,10 +69,8 @@ public class TransactionBuilderImpl implements TransactionBuilder {
                         future.completeExceptionally(throwable);
                         return;
                     }
-                    TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
+                    TransactionImpl transaction = new TransactionImpl(client, timeUnit.toMillis(txnTimeout),
                             txnID.getLeastSigBits(), txnID.getMostSigBits());
-                    client.getTimer().newTimeout(transaction,
-                            txnTimeout, timeUnit);
                     future.complete(transaction);
                 });
         return future;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index ebcb20e..8adc162 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import com.google.common.collect.Lists;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -68,6 +69,7 @@ public class TransactionImpl implements Transaction , TimerTask {
     private volatile State state;
     private static final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
         AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");
+    private final Timeout timeout;
 
     @Override
     public void run(Timeout timeout) throws Exception {
@@ -100,6 +102,8 @@ public class TransactionImpl implements Transaction , TimerTask {
 
         this.sendFutureList = new ArrayList<>();
         this.ackFutureList = new ArrayList<>();
+        this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS);
+
     }
 
     // register the topics that will be modified by this transaction
@@ -164,6 +168,7 @@ public class TransactionImpl implements Transaction , TimerTask {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
+            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -192,6 +197,7 @@ public class TransactionImpl implements Transaction , TimerTask {
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
+            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     log.error(e.getMessage());

[pulsar] 11/13: Add test to ensure correct zk children cache invalidation (#14178)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8a360da29601a849ca15c00d361684e4ccfc7589
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Feb 9 01:41:24 2022 -0600

    Add test to ensure correct zk children cache invalidation (#14178)
    
    (cherry picked from commit 5886327b721b5af265d6a83d06b8a721ecde54d5)
---
 .../MultiBrokerMetadataConsistencyTest.java        | 82 ++++++++++++++++++++++
 1 file changed, 82 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/MultiBrokerMetadataConsistencyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/MultiBrokerMetadataConsistencyTest.java
new file mode 100644
index 0000000..15849a6
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/MultiBrokerMetadataConsistencyTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.zookeeper;
+
+import static org.testng.Assert.assertTrue;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.metadata.TestZKServer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class MultiBrokerMetadataConsistencyTest extends MultiBrokerBaseTest {
+    @Override
+    protected int numberOfAdditionalBrokers() {
+        return 2;
+    }
+
+    TestZKServer testZKServer;
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        testZKServer = new TestZKServer();
+    }
+
+    @Override
+    protected void onCleanup() {
+        super.onCleanup();
+        if (testZKServer != null) {
+            try {
+                testZKServer.close();
+            } catch (Exception e) {
+                log.error("Error in stopping ZK server", e);
+            }
+        }
+    }
+
+    @Override
+    protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+
+    @Override
+    protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
+        return MetadataStoreExtended.create(testZKServer.getConnectionString(), MetadataStoreConfig.builder().build());
+    }
+
+    @Test
+    public void newTopicShouldBeInTopicsList() throws PulsarAdminException {
+        List<PulsarAdmin> admins = getAllAdmins();
+        PulsarAdmin first = admins.get(0);
+        PulsarAdmin second = admins.get(1);
+        List<String> cacheMiss = second.topics().getList("public/default");
+        assertTrue(cacheMiss.isEmpty());
+        first.topics().createNonPartitionedTopic("persistent://public/default/my-topic");
+        List<String> topics = second.topics().getList("public/default");
+        assertTrue(topics.contains("persistent://public/default/my-topic"));
+    }
+}

[pulsar] 01/13: [C++] Fix pulsar client cpp build fail in gcc-4.8.5 (#14053)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 96f6f7b5d2c4011d7f1ba212d137fd5165474208
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Mon Jan 31 23:56:39 2022 +0800

    [C++] Fix pulsar client cpp build fail in gcc-4.8.5 (#14053)
    
    * Fix pulsar client cpp build fail in gcc-4.8.5
    
    * Enable test
    
    * Speedup make speed
    
    * Add a new line at the tail
    
    Add a new line at the tail of a file
    
    (cherry picked from commit caf5acf0d5e1971e4a9958da582b905793f75f1f)
---
 pulsar-client-cpp/docker-build-centos7.sh    | 2 +-
 pulsar-client-cpp/docker/centos-7/Dockerfile | 7 +++++++
 pulsar-client-cpp/tests/ConsumerTest.cc      | 2 +-
 3 files changed, 9 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/docker-build-centos7.sh b/pulsar-client-cpp/docker-build-centos7.sh
index 5ceeca9..e97e374 100755
--- a/pulsar-client-cpp/docker-build-centos7.sh
+++ b/pulsar-client-cpp/docker-build-centos7.sh
@@ -32,7 +32,7 @@ cd -
 
 VOLUME_OPTION=${VOLUME_OPTION:-"-v $ROOT_DIR:/pulsar"}
 COMMAND="cd /pulsar/pulsar-client-cpp && mkdir -p _builds && cd _builds &&
- /opt/cmake/cmake-3.4.0-Linux-x86_64/bin/cmake .. -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF && make"
+ /opt/cmake/cmake-3.4.0-Linux-x86_64/bin/cmake .. -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=ON && make -j8"
 
 DOCKER_CMD="docker run -i ${VOLUME_OPTION} ${IMAGE}"
 
diff --git a/pulsar-client-cpp/docker/centos-7/Dockerfile b/pulsar-client-cpp/docker/centos-7/Dockerfile
index 53ce9bd..690e8f1 100644
--- a/pulsar-client-cpp/docker/centos-7/Dockerfile
+++ b/pulsar-client-cpp/docker/centos-7/Dockerfile
@@ -35,3 +35,10 @@ RUN mkdir -p /opt/cmake
 WORKDIR /opt/cmake
 RUN curl -L -O https://cmake.org/files/v3.4/cmake-3.4.0-Linux-x86_64.tar.gz \
   && tar zxf cmake-3.4.0-Linux-x86_64.tar.gz
+
+# googletest
+RUN curl -O -L https://github.com/google/googletest/archive/refs/tags/release-1.10.0.tar.gz \
+  && tar zxf release-1.10.0.tar.gz \
+  && cd googletest-release-1.10.0 \
+  && mkdir build && cd build \
+  && /opt/cmake/cmake-3.4.0-Linux-x86_64/bin/cmake .. && make install
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index 2747b2d..100086e 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -660,7 +660,7 @@ TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) {
 
     // 2. MultiTopicsConsumerImpl
     Consumer consumer2;
-    ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2", consumer2));
+    ASSERT_EQ(ResultOk, client.subscribe(std::vector<std::string>{topic1, topic2}, "sub-2", consumer2));
 
     sendMessage(topic1, true);
     validateTopicName(consumer1, topic1);