You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rg...@apache.org on 2022/03/01 16:24:09 UTC

[pulsar] branch branch-2.9 updated (7792fac -> d87a230)

This is an automated email from the ASF dual-hosted git repository.

rgao pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 7792fac  [pulsar-io] pass client builder if no service url provided to debezium connector  (#12145) (#14041)
     new 1e17289  [security] Upgrade Postgre driver to 42.2.25 to get rid of CVE-2022-21724 (#14119)
     new b70f6cd  [Transaction] Optimize testEndTBRecoveringWhenManagerLedgerDisReadable (#14303)
     new 8279db9  Fix the wrong parameter in the log. (#14309)
     new c16083e  [Broker] Change broker producer fence log level (#14196)
     new 0e438a9  [Flaky-Test] BacklogQuotaManagerTest#testProducerExceptionAndThenUnblockSizeQuota (#14213)
     new d6867b4  [Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316)
     new 44408bf  Fix adding message to list potential issue (#14377)
     new e583b05  [Broker] waitingCursors potential  heap memory leak  (#13939)
     new 718a95b  Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
     new 3101b66  [C++] Fix GCC compilation failure caused by warning macro (#14402)
     new 982a11f  [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
     new 45c74fb  [Transaction] Adopt single thread pool in TC (#14238)
     new 6094879  Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)
     new 86fa496  [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)
     new a5a9991  [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)
     new 3089aa4  [Transaction] Fix end transaction at state of timeout (#14370)
     new 2bc8f48  Fix can't read the latest message of the compacted topic (#14449)
     new 4e79741  Validate rack name (#14336)
     new 56f1660  [pulsar-broker] Fix avg-messagePerEntry metrics for consumer (#14330)
     new 5047498  fix npe in ManagedLedgerImpl (#14481)
     new d87a230  [Broker] Fix ``Future.join()`` causing deadlock. (#14469)

The 21 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   6 +-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   7 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  11 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   8 +-
 .../mledger/impl/ManagedCursorContainerTest.java   |   3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |   6 +-
 .../mledger/impl/NonDurableCursorTest.java         |   2 +-
 pom.xml                                            |   3 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../broker/TransactionMetadataStoreService.java    | 152 +++++-----
 .../org/apache/pulsar/broker/admin/v2/Bookies.java |  16 +
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  11 +-
 .../SystemTopicBasedTopicPoliciesService.java      |   2 +-
 .../service/persistent/PersistentSubscription.java |  11 +-
 .../broker/service/persistent/PersistentTopic.java |  78 ++---
 .../buffer/impl/TopicTransactionBuffer.java        |   1 -
 .../apache/pulsar/broker/admin/BookiesApiTest.java |  58 +++-
 .../broker/admin/CreateSubscriptionTest.java       |  22 ++
 .../broker/service/BacklogQuotaManagerTest.java    |   2 +
 .../broker/service/PersistentTopicE2ETest.java     |  22 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   3 +-
 .../pulsar/broker/transaction/TransactionTest.java |  26 ++
 .../pulsar/compaction/CompactedTopicTest.java      |  50 ++++
 pulsar-client-cpp/CMakeLists.txt                   |   2 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  86 +++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  13 +-
 .../pulsar/client/impl/schema/AbstractSchema.java  |   7 +-
 .../client/impl/transaction/TransactionImpl.java   |   4 +-
 .../org/apache/pulsar/client/impl/MessageTest.java |  13 +-
 pulsar-io/debezium/postgres/pom.xml                |   7 +
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 202 +++++++------
 .../impl/MLTransactionMetadataStore.java           | 328 ++++++++++++---------
 .../org/apache/pulsar/websocket/ReaderHandler.java |  10 +-
 .../apache/pulsar/websocket/ReaderHandlerTest.java | 214 ++++++++++++++
 .../offload/jcloud/impl/MockManagedLedger.java     |   5 +
 36 files changed, 991 insertions(+), 407 deletions(-)
 create mode 100644 pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java

[pulsar] 11/21: [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 982a11fe26be5adb8e1f368fb6736a95c3eadadf
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:54:17 2022 +0800

    [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
    
    (cherry picked from commit 0a9fd913528181951fd6ad97d3ba07e11e77cd70)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  1 -
 .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 89c77d6..e0a6695 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -474,7 +474,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
                 maxReadPosition = position;
-                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
                     maxReadPosition = position;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index b882d26..28d4fcf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -800,4 +801,28 @@ public class TransactionTest extends TransactionTestBase {
         timeout = (Timeout) field.get(transaction);
         Assert.assertTrue(timeout.isCancelled());
     }
+
+    @Test
+    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(NAMESPACE1 + "/test", true)
+                .get().get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+        field.setAccessible(true);
+        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
+        Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
+        field1.setAccessible(true);
+
+        Awaitility.await().untilAsserted(() -> {
+                    TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
+                    Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
+        });
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+    }
 }
\ No newline at end of file

[pulsar] 09/21: Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 718a95b3f919465500dd15f6eacfca8352c9412c
Author: 萧易客 <km...@live.com>
AuthorDate: Tue Feb 22 17:17:47 2022 +0800

    Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
    
    If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970
    
    (cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13)
---
 .../apache/pulsar/client/impl/schema/AbstractSchema.java    |  7 +++----
 .../java/org/apache/pulsar/client/impl/MessageTest.java     | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index 8cf7a05..33c2ed1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -75,14 +75,13 @@ public abstract class AbstractSchema<T> implements Schema<T> {
      * @param schemaVersion the version
      * @return the schema at that specific version
      * @throws SchemaSerializationException in case of unknown schema version
-     * @throws NullPointerException in case of null schemaVersion
+     * @throws NullPointerException in case of null schemaVersion and supportSchemaVersioning is true
      */
     public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException {
-        Objects.requireNonNull(schemaVersion);
         if (!supportSchemaVersioning()) {
             return this;
-        } else {
-            throw new SchemaSerializationException("Not implemented for " + this.getClass());
         }
+        Objects.requireNonNull(schemaVersion);
+        throw new SchemaSerializationException("Not implemented for " + this.getClass());
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 6d633e7..13cf4f6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -81,4 +81,15 @@ public class MessageTest {
         assertFalse(topicMessage.isReplicated());
         assertNull(topicMessage.getReplicatedFrom());
     }
+
+    @Test
+    public void testMessageImplGetReaderSchema() {
+        MessageMetadata builder = new MessageMetadata();
+        builder.hasSchemaVersion();
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
+
+        Optional<Schema<?>> readerSchema = msg.getReaderSchema();
+        assertTrue(readerSchema.isPresent());
+    }
 }

[pulsar] 21/21: [Broker] Fix ``Future.join()`` causing deadlock. (#14469)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d87a2304fc55828aa1a7a905c2631f594b49b764
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Tue Mar 1 10:41:07 2022 +0800

    [Broker] Fix ``Future.join()`` causing deadlock. (#14469)
    
    Master issue #14438
    
    ### Motivation
    
    Invoking the ``join()`` method in the async method will cause some deadlock.
    
    ### Modifications
    
    - Refactor ``PersistentTopic#tryToDeletePartitionedMetadata`` to pure async.
    
    (cherry picked from commit 65318e83f8d5b4207a9398e100390800425d5433)
---
 .../broker/service/persistent/PersistentTopic.java | 78 ++++++++++++----------
 .../broker/service/PersistentTopicE2ETest.java     | 22 +++++-
 2 files changed, 63 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 00934ad..e96fdc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2278,42 +2278,48 @@ public class PersistentTopic extends AbstractTopic
             return CompletableFuture.completedFuture(null);
         }
         TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
-        try {
-            PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
-                    .getNamespaceResources()
-                    .getPartitionedTopicResources();
-            if (topicName.isPartitioned() && !partitionedTopicResources.partitionedTopicExists(topicName)) {
-                return CompletableFuture.completedFuture(null);
-            }
-            CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>();
-            getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
-                    .thenAccept((metadata -> {
-                        // make sure all sub partitions were deleted
-                        for (int i = 0; i < metadata.partitions; i++) {
-                            if (brokerService.getPulsar().getPulsarResources().getTopicResources()
-                                    .persistentTopicExists(topicName.getPartition(i)).join()) {
-                                throw new UnsupportedOperationException();
-                            }
-                        }
-                    }))
-                    .thenAccept((res) -> partitionedTopicResources.deletePartitionedTopicAsync(topicName)
-                            .thenAccept((r) -> {
-                        deleteMetadataFuture.complete(null);
-                    }).exceptionally(ex -> {
-                        deleteMetadataFuture.completeExceptionally(ex.getCause());
-                        return null;
-                    }))
-                    .exceptionally((e) -> {
-                        if (!(e.getCause() instanceof UnsupportedOperationException)) {
-                            log.error("delete metadata fail", e);
-                        }
-                        deleteMetadataFuture.complete(null);
-                        return null;
-                    });
-            return deleteMetadataFuture;
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
+        PartitionedTopicResources partitionedTopicResources = getBrokerService().pulsar().getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources();
+        return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
+                .thenCompose(partitionedTopicExist -> {
+                    if (!partitionedTopicExist) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return getBrokerService()
+                                .fetchPartitionedTopicMetadataAsync(topicName)
+                                .thenCompose((metadata -> {
+                                    List<CompletableFuture<Boolean>> persistentTopicExists =
+                                            new ArrayList<>(metadata.partitions);
+                                    for (int i = 0; i < metadata.partitions; i++) {
+                                        persistentTopicExists.add(brokerService.getPulsar()
+                                                .getPulsarResources().getTopicResources()
+                                                .persistentTopicExists(topicName.getPartition(i)));
+                                    }
+                                    List<CompletableFuture<Boolean>> unmodifiablePersistentTopicExists =
+                                            Collections.unmodifiableList(persistentTopicExists);
+                                    return FutureUtil.waitForAll(unmodifiablePersistentTopicExists)
+                                            .thenCompose(unused -> {
+                                                // make sure all sub partitions were deleted after all future complete
+                                                Optional<Boolean> anyExistPartition = unmodifiablePersistentTopicExists
+                                                        .stream()
+                                                        .map(CompletableFuture::join)
+                                                        .filter(topicExist -> topicExist)
+                                                        .findAny();
+                                                if (anyExistPartition.isPresent()) {
+                                                    log.error("[{}] Delete topic metadata failed because"
+                                                            + " another partition exist.", topicName);
+                                                    throw new UnsupportedOperationException(
+                                                            String.format("Another partition exists for [%s].",
+                                                                    topicName));
+                                                } else {
+                                                    return partitionedTopicResources
+                                                            .deletePartitionedTopicAsync(topicName);
+                                                }
+                                            });
+                                }));
+                    }
+                });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 42512b0..daf0ed7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -96,6 +96,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
         super.baseSetup();
     }
 
@@ -617,8 +618,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         runGC();
         assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
-    }
 
+        // write again, the topic will be available
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+        producer2.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        assertEquals(partitions, pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions);
+        runGC();
+        Awaitility.await().untilAsserted(()-> {
+            assertEquals(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                    TopicName.get(topicGc)).join().partitions, 0);
+        });
+    }
     @Data
     @ToString
     @EqualsAndHashCode

[pulsar] 20/21: fix npe in ManagedLedgerImpl (#14481)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5047498c04a2ef72a2b51188ab3eb39fbc3249df
Author: lin chen <15...@qq.com>
AuthorDate: Mon Feb 28 18:10:21 2022 +0800

    fix npe in ManagedLedgerImpl (#14481)
    
    (cherry picked from commit 3da048c8a6df7404df4e6c3301370feb96c6c14b)
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java    | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8728753..c07b3ce 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3183,9 +3183,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     totalEntriesInCurrentLedger = 0;
                 }
             } else {
-                totalEntriesInCurrentLedger = ledgers.get(currentLedgerId).getEntries();
+                LedgerInfo ledgerInfo = ledgers.get(currentLedgerId);
+                totalEntriesInCurrentLedger = ledgerInfo != null ? ledgerInfo.getEntries() : 0;
             }
 
+
             long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;
 
             if (unreadEntriesInCurrentLedger >= entriesToSkip) {

[pulsar] 17/21: Fix can't read the latest message of the compacted topic (#14449)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2bc8f487a71ea812b06d119f24fd7051c82eb6e8
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 25 21:38:47 2022 +0800

    Fix can't read the latest message of the compacted topic (#14449)
    
    If the reader enabled read compacted and all the data of topic has been compacted
    to the compacted ledger, the original topic does not have any data. In this case,
    the reader is not able to read the latest message of the compacted topic.
    
    ```java
    Reader<byte[]> reader = pulsarClient.newReader()
            .topic(topic)
            .startMessageId(MessageId.latest)
            .startMessageIdInclusive()
            .readCompacted(true)
            .create();
    ```
    
    The root cause is if the `startMessageIdInclusive` is true
    and the `startMessageId` is `latest`, the reader will get the
    last message ID from the broker and then seek to the last message.
    But, the seek method did not consider if there are messages in the
    compacted ledger, so not able to seek to last message of the compacted
    ledger.
    
    Add force reset option for the managed cursor, if the seek
    position < compaction horizon, we should force reset the cursor
    to the given position, so that the reader able to start read from
    the compacted ledger.
    
    (cherry picked from commit ddc51924e90550ef50fd3cdd099b6aec56aec260)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |  6 ++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 ++---
 .../mledger/impl/ManagedCursorContainerTest.java   |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  6 +--
 .../mledger/impl/NonDurableCursorTest.java         |  2 +-
 .../service/persistent/PersistentSubscription.java | 10 ++++-
 .../pulsar/broker/service/PersistentTopicTest.java |  3 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 50 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  6 ++-
 9 files changed, 82 insertions(+), 15 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1fb90a..f67cd96 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -533,10 +533,14 @@ public interface ManagedCursor {
      *
      * @param position
      *            position to move the cursor to
+     * @param forceReset
+     *            whether to force reset the position even if the position is no longer in the managed ledger,
+     *            this is used by compacted topic which has data in the compacted ledger, to ensure the cursor can
+     *            read data from the compacted ledger.
      * @param callback
      *            callback object
      */
-    void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback);
+    void asyncResetCursor(Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback);
 
     /**
      * Read the specified set of positions from ManagedLedger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8c345026..8c96f0e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1135,7 +1135,7 @@ public class ManagedCursorImpl implements ManagedCursor {
     }
 
     @Override
-    public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) {
+    public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) {
         checkArgument(newPos instanceof PositionImpl);
         final PositionImpl newPosition = (PositionImpl) newPos;
 
@@ -1143,9 +1143,10 @@ public class ManagedCursorImpl implements ManagedCursor {
         ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
             PositionImpl actualPosition = newPosition;
 
-            if (!ledger.isValidPosition(actualPosition) &&
-                !actualPosition.equals(PositionImpl.earliest) &&
-                !actualPosition.equals(PositionImpl.latest)) {
+            if (!ledger.isValidPosition(actualPosition)
+                && !actualPosition.equals(PositionImpl.earliest)
+                && !actualPosition.equals(PositionImpl.latest)
+                && !forceReset) {
                 actualPosition = ledger.getNextValidPosition(actualPosition);
 
                 if (actualPosition == null) {
@@ -1168,7 +1169,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         final Result result = new Result();
         final CountDownLatch counter = new CountDownLatch(1);
 
-        asyncResetCursor(newPos, new AsyncCallbacks.ResetCursorCallback() {
+        asyncResetCursor(newPos, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 counter.countDown();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index af30c9c..56de803 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -237,7 +237,8 @@ public class ManagedCursorContainerTest {
         }
 
         @Override
-        public void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback) {
+        public void asyncResetCursor(final Position position, boolean forceReset,
+                AsyncCallbacks.ResetCursorCallback callback) {
 
         }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 676e92f..892d6b3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -687,7 +687,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
@@ -738,7 +738,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
                     final PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(),
                             lastPosition.getEntryId() - (5 * idx));
 
-                    cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+                    cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
                         @Override
                         public void resetComplete(Object ctx) {
                             moveStatus.set(true);
@@ -787,7 +787,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
 
         long lastActive = cursor.getLastActive();
 
-        cursor.asyncResetCursor(lastPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(lastPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 4c2944f..5d0271d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -371,7 +371,7 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         CountDownLatch countDownLatch = new CountDownLatch(1);
         PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2);
 
-        cursor.asyncResetCursor(resetPosition, new AsyncCallbacks.ResetCursorCallback() {
+        cursor.asyncResetCursor(resetPosition, false, new AsyncCallbacks.ResetCursorCallback() {
             @Override
             public void resetComplete(Object ctx) {
                 moveStatus.set(true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8637ecd..9e57580 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -659,7 +659,15 @@ public class PersistentSubscription implements Subscription {
                     topicName, subName);
 
             try {
-                cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
+                boolean forceReset = false;
+                if (topic.getCompactedTopic() != null && topic.getCompactedTopic().getCompactionHorizon().isPresent()) {
+                    PositionImpl horizon = (PositionImpl) topic.getCompactedTopic().getCompactionHorizon().get();
+                    PositionImpl resetTo = (PositionImpl) finalPosition;
+                    if (horizon.compareTo(resetTo) >= 0) {
+                        forceReset = true;
+                    }
+                }
+                cursor.asyncResetCursor(finalPosition, forceReset, new AsyncCallbacks.ResetCursorCallback() {
                     @Override
                     public void resetComplete(Object ctx) {
                         if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d94320f..2314acd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.anyString;
@@ -2177,7 +2178,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((AsyncCallbacks.ResetCursorCallback) invocationOnMock.getArguments()[1]).resetComplete(null);
             return null;
-        }).when(mockCursor).asyncResetCursor(any(), any());
+        }).when(mockCursor).asyncResetCursor(any(), anyBoolean(), any());
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
             return null;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index accce4ef..4ae699b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -734,6 +734,7 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    @Test
     public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
         String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" +
                 UUID.randomUUID();
@@ -789,4 +790,53 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
             Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages));
         }
     }
+
+    @Test
+    public void testReadCompactedLatestMessageWithInclusive() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testLedgerRollover-" +
+                UUID.randomUUID();
+        final int numMessages = 1;
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().unload(topic);
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+
+        Awaitility.await()
+                .pollInterval(3, TimeUnit.SECONDS)
+                .atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+                    admin.topics().unload(topic);
+                    Assert.assertTrue(admin.topics().getInternalStats(topic).lastConfirmedEntry.endsWith("-1"));
+                });
+
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.latest)
+                .readCompacted(true)
+                .create();
+
+        Assert.assertTrue(reader.hasMessageAvailable());
+        Assert.assertEquals(reader.readNext().getMessageId(), lastMessage.get());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e5a5745..856a775 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -2012,8 +2012,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     MessageIdImpl markDeletePosition = MessageIdImpl
                             .convertToMessageIdImpl(response.markDeletePosition);
 
-                    if (markDeletePosition != null) {
-                        // we only care about comparing ledger ids and entry ids as mark delete position doesn't have other ids such as batch index
+                    if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0
+                            && markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
+                        // we only care about comparing ledger ids and entry ids as mark delete position doesn't have
+                        // other ids such as batch index
                         int result = ComparisonChain.start()
                                 .compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
                                 .compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())

[pulsar] 06/21: [Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d6867b44f2747c4fdb1b09e14b2a356931f49ff9
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Feb 17 13:23:03 2022 +0800

    [Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316)
    
    (cherry picked from commit 7a7cf54b01420aeac855eea91529ea13bd753e52)
---
 .../org/apache/pulsar/websocket/ReaderHandler.java |  10 +-
 .../apache/pulsar/websocket/ReaderHandlerTest.java | 214 +++++++++++++++++++++
 2 files changed, 222 insertions(+), 2 deletions(-)

diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index ef0279d..2b87802 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
 import org.apache.pulsar.client.impl.ReaderImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -103,8 +104,13 @@ public class ReaderHandler extends AbstractWebSocketHandler {
             }
 
             this.reader = builder.create();
-
-            this.subscription = ((ReaderImpl<?>) this.reader).getConsumer().getSubscription();
+            if (reader instanceof MultiTopicsReaderImpl) {
+                this.subscription = ((MultiTopicsReaderImpl<?>) reader).getMultiTopicsConsumer().getSubscription();
+            } else if (reader instanceof ReaderImpl) {
+                this.subscription = ((ReaderImpl<?>) reader).getConsumer().getSubscription();
+            } else {
+                throw new IllegalArgumentException(String.format("Illegal Reader Type %s", reader.getClass()));
+            }
             if (!this.service.addReader(this)) {
                 log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(),
                         request.getRemotePort(), topic);
diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
new file mode 100644
index 0000000..0d2a13d
--- /dev/null
+++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ReaderHandlerTest {
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCreateReaderImp() throws IOException {
+        final String subName = "readerImpSubscription";
+        // mock data
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+        ReaderImpl<byte[]> mockedReader = mock(ReaderImpl.class);
+        when(mockedReaderBuilder.create()).thenReturn(mockedReader);
+        ConsumerImpl<byte[]> consumerImp = mock(ConsumerImpl.class);
+        when(consumerImp.getSubscription()).thenReturn(subName);
+        when(mockedReader.getConsumer()).thenReturn(consumerImp);
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        // create reader handler
+        HttpServletResponse response = spy(HttpServletResponse.class);
+        ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+        ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
+        // verify success
+        Assert.assertEquals(readerHandler.getSubscription(), subName);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCreateMultipleTopicReaderImp() throws IOException {
+        final String subName = "multipleTopicReaderImpSubscription";
+        // mock data
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+        MultiTopicsReaderImpl<byte[]> mockedReader = mock(MultiTopicsReaderImpl.class);
+        when(mockedReaderBuilder.create()).thenReturn(mockedReader);
+        MultiTopicsConsumerImpl<byte[]> consumerImp = mock(MultiTopicsConsumerImpl.class);
+        when(consumerImp.getSubscription()).thenReturn(subName);
+        when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp);
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        // create reader handler
+        HttpServletResponse response = spy(HttpServletResponse.class);
+        ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+        ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
+        // verify success
+        Assert.assertEquals(readerHandler.getSubscription(), subName);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testCreateIllegalReaderImp() throws IOException {
+        // mock data
+        WebSocketService wss = mock(WebSocketService.class);
+        PulsarClient mockedClient = mock(PulsarClient.class);
+        when(wss.getPulsarClient()).thenReturn(mockedClient);
+        ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
+        when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
+        when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder);
+        IllegalReader illegalReader = new IllegalReader();
+        when(mockedReaderBuilder.create()).thenReturn(illegalReader);
+        HttpServletRequest request = mock(HttpServletRequest.class);
+        when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
+        // create reader handler
+        HttpServletResponse response = spy(HttpServletResponse.class);
+        ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response);
+        new ReaderHandler(wss, request, servletUpgradeResponse);
+        // verify get error
+        verify(response, times(1)).sendError(anyInt(), anyString());
+    }
+
+
+    static class IllegalReader implements Reader<byte[]> {
+
+        @Override
+        public String getTopic() {
+            return null;
+        }
+
+        @Override
+        public Message<byte[]> readNext() throws PulsarClientException {
+            return null;
+        }
+
+        @Override
+        public Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Message<byte[]>> readNextAsync() {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            return null;
+        }
+
+        @Override
+        public boolean hasReachedEndOfTopic() {
+            return false;
+        }
+
+        @Override
+        public boolean hasMessageAvailable() {
+            return false;
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+            return null;
+        }
+
+        @Override
+        public boolean isConnected() {
+            return false;
+        }
+
+        @Override
+        public void seek(MessageId messageId) throws PulsarClientException {
+
+        }
+
+        @Override
+        public void seek(long timestamp) throws PulsarClientException {
+
+        }
+
+        @Override
+        public void seek(Function<String, Object> function) throws PulsarClientException {
+
+        }
+
+        @Override
+        public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> seekAsync(MessageId messageId) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> seekAsync(long timestamp) {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+
+        }
+    }
+}

[pulsar] 07/21: Fix adding message to list potential issue (#14377)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44408bf3268c36abd5b8040ff7c1fcebd27134c8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800

    Fix adding message to list potential issue (#14377)
    
    (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9042788..e5a5745 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1771,18 +1771,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
         List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
+        List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
         messageIds.forEach(messageId -> {
             CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
-            futures.add(future);
-            future.thenAccept(sendToDLQ -> {
+            futures.add(future.thenAccept(sendToDLQ -> {
                 if (!sendToDLQ) {
                     data.add(new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
                             .setEntryId(messageId.getEntryId()));
                 }
-            });
+            }));
         });
         return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
     }

[pulsar] 04/21: [Broker] Change broker producer fence log level (#14196)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c16083e95a9c3407860a00c7cabe2d0db2708347
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Wed Feb 16 22:57:00 2022 +0800

    [Broker] Change broker producer fence log level (#14196)
    
    (cherry picked from commit 141ea9b6dd3897d492cf468b6ded29ce9f7cf73f)
---
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index beb713c..2ba51d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1335,8 +1335,15 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
             producers.remove(producerId, producerFuture);
         }).exceptionally(ex -> {
-            log.error("[{}] Failed to add producer to topic {}: producerId={}, {}",
-                    remoteAddress, topicName, producerId, ex.getMessage());
+            if (ex.getCause() instanceof BrokerServiceException.ProducerFencedException) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Failed to add producer to topic {}: producerId={}, {}",
+                            remoteAddress, topicName, producerId, ex.getCause().getMessage());
+                }
+            } else {
+                log.warn("[{}] Failed to add producer to topic {}: producerId={}, {}",
+                        remoteAddress, topicName, producerId, ex.getCause().getMessage());
+            }
 
             producer.closeNow(true);
             if (producerFuture.completeExceptionally(ex)) {

[pulsar] 08/21: [Broker] waitingCursors potential heap memory leak (#13939)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e583b055915bcd9b295bbed0d1b99f6d139ec568
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 02:18:56 2022 +0800

    [Broker] waitingCursors potential  heap memory leak  (#13939)
    
    (cherry picked from commit 478fd36227c2ede3e1162dd9a4361cffc5dbfceb)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  7 +++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  4 ++++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/admin/CreateSubscriptionTest.java       | 22 ++++++++++++++++++++++
 .../offload/jcloud/impl/MockManagedLedger.java     |  5 +++++
 5 files changed, 39 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 0200e25..cd39919 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -289,6 +289,13 @@ public interface ManagedLedger {
     void deleteCursor(String name) throws InterruptedException, ManagedLedgerException;
 
     /**
+     * Remove a ManagedCursor from this ManagedLedger's waitingCursors.
+     *
+     * @param cursor the ManagedCursor
+     */
+    void removeWaitingCursor(ManagedCursor cursor);
+
+    /**
      * Open a ManagedCursor asynchronously.
      *
      * @see #openCursor(String)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5288160..8728753 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3415,6 +3415,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    public void removeWaitingCursor(ManagedCursor cursor) {
+        this.waitingCursors.remove(cursor);
+    }
+
     public boolean isCursorActive(ManagedCursor cursor) {
         return activeCursors.get(cursor.getName()) != null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f69db68..8637ecd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -299,6 +299,7 @@ public class PersistentSubscription implements Subscription {
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
+            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the subscription as well
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index e0d1720..09f2c91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -19,15 +19,22 @@
 package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.core.Response.Status;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -127,4 +134,19 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
                     Lists.newArrayList("sub-1"));
         }
     }
+
+    @Test
+    public void testWaitingCurosrCausedMemoryLeak() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-topic";
+        for (int i = 0; i < 10; i ++) {
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                    .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
+            Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
+            consumer.close();
+        }
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
+        assertEquals(ml.getWaitingCursorsCount(), 0);
+    }
+
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 229cd66..907aba6 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -138,6 +138,11 @@ public class MockManagedLedger implements ManagedLedger {
     }
 
     @Override
+    public void removeWaitingCursor(ManagedCursor cursor) {
+
+    }
+
+    @Override
     public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
 
     }

[pulsar] 05/21: [Flaky-Test] BacklogQuotaManagerTest#testProducerExceptionAndThenUnblockSizeQuota (#14213)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0e438a982ef74328495f0cf6102546f545fa4b58
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Wed Feb 16 22:58:42 2022 +0800

    [Flaky-Test] BacklogQuotaManagerTest#testProducerExceptionAndThenUnblockSizeQuota (#14213)
    
    (cherry picked from commit cd172e7455f90955ac951d713a28c4a580fdf75d)
---
 .../java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java  | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index f531119..21fcffa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -1123,6 +1123,8 @@ public class BacklogQuotaManagerTest {
         }
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
         // publish should work now
+        producer.close();
+        producer = createProducer(client, topic1);
         Exception sendException = null;
         gotException = false;
         try {

[pulsar] 12/21: [Transaction] Adopt single thread pool in TC (#14238)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 45c74fb6b46231acd8f584ca0d45c7f4d6992cc4
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:57:38 2022 +0800

    [Transaction] Adopt single thread pool in TC (#14238)
    
    ### Motivation
    Optimize code and improve maintainability.
    ### Modification
    * Option 1 (the way I use)
    Create a thread pool at peer TC.
      * advantage
      Each TC has a single thread pool to perform its own tasks, and will not be blocked due to sharing a single  thread with other TCs
      * disadvantage
      Too many thread pools may be created
    * Option 2
    Create an ExecuteProvider in the TC service.  It create some single-threaded pools when the TC Service is created, and  then assign a single-threaded pool to TC when the TC is created
       * The advantages and disadvantages are opposite to the option one
    
    (cherry picked from commit ced57866700aaeae163bcc6670d9a8eb1ffe8c50)
---
 .../impl/MLTransactionMetadataStore.java           | 328 ++++++++++++---------
 1 file changed, 188 insertions(+), 140 deletions(-)

diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index a71d203..f109ec4 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -69,6 +72,7 @@ public class MLTransactionMetadataStore
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
+    private final ExecutorService internalPinnedExecutor;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
@@ -87,12 +91,16 @@ public class MLTransactionMetadataStore
         this.abortedTransactionCount = new LongAdder();
         this.transactionTimeoutCount = new LongAdder();
         this.appendLogCount = new LongAdder();
+        DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+                + tcID.toString() + "thread_factory");
+        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
 
         if (!changeToInitializingState()) {
             log.error("Managed ledger transaction metadata store change state error when init it");
             return;
         }
-        new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+        internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
 
             @Override
             public void replayComplete() {
@@ -125,7 +133,8 @@ public class MLTransactionMetadataStore
                                 long timeoutAt = transactionMetadataEntry.getTimeoutMs();
                                 txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
                                         openTimestamp, timeoutAt), positions));
-                                recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp);
+                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                        timeoutAt + openTimestamp);
                             }
                             break;
                         case ADD_PARTITION:
@@ -174,7 +183,7 @@ public class MLTransactionMetadataStore
                     log.error(e.getMessage(), e);
                 }
             }
-        })).start();
+        }));
     }
 
     @Override
@@ -195,167 +204,206 @@ public class MLTransactionMetadataStore
     }
 
     @Override
-    public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException
-                            .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
-        }
+    public CompletableFuture<TxnID> newTransaction(long timeOut) {
+        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+                return;
+            }
 
-        long mostSigBits = tcID.getId();
-        long leastSigBits = sequenceIdGenerator.generateSequenceId();
-        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-        long currentTimeMillis = System.currentTimeMillis();
-        TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                .setTxnidMostBits(mostSigBits)
-                .setTxnidLeastBits(leastSigBits)
-                .setStartTime(currentTimeMillis)
-                .setTimeoutMs(timeOut)
-                .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                .setLastModificationTime(currentTimeMillis)
-                .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-        return transactionLog.append(transactionMetadataEntry)
-                .thenCompose(position -> {
-                    appendLogCount.increment();
-                    TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
-                    List<Position> positions = new ArrayList<>();
-                    positions.add(position);
-                    Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
-                    txnMetaMap.put(leastSigBits, pair);
-                    this.timeoutTracker.addTransaction(leastSigBits, timeOut);
-                    createdTransactionCount.increment();
-                    return CompletableFuture.completedFuture(txnID);
-                });
+            long mostSigBits = tcID.getId();
+            long leastSigBits = sequenceIdGenerator.generateSequenceId();
+            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+            long currentTimeMillis = System.currentTimeMillis();
+            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                    .setTxnidMostBits(mostSigBits)
+                    .setTxnidLeastBits(leastSigBits)
+                    .setStartTime(currentTimeMillis)
+                    .setTimeoutMs(timeOut)
+                    .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                    .setLastModificationTime(currentTimeMillis)
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            transactionLog.append(transactionMetadataEntry)
+                    .whenComplete((position, throwable) -> {
+                        if (throwable != null) {
+                            completableFuture.completeExceptionally(throwable);
+                        } else {
+                            appendLogCount.increment();
+                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+                            List<Position> positions = new ArrayList<>();
+                            positions.add(position);
+                            Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+                            txnMetaMap.put(leastSigBits, pair);
+                            this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+                            createdTransactionCount.increment();
+                            completableFuture.complete(txnID);
+                        }
+                    });
+        });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add produced partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
-                    .addAllPartitions(partitions)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture
+                        .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "add produced partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+                        .addAllPartitions(partitions)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addProducedPartitions(partitions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add produced partition error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addProducedPartitions(partitions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add produced partition error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           List<TransactionSubscription> txnSubscriptions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add acked partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
-                    .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+                        .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add acked subscription error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add acked subscription error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
+    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
                                                                 TxnStatus expectedStatus, boolean isTimeout) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "update transaction status"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            if (txnMetaListPair.getLeft().status() == newStatus) {
-                return CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "update transaction status"));
+                return;
             }
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setExpectedStatus(expectedStatus)
-                    .setMetadataOp(TransactionMetadataOp.UPDATE)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setNewStatus(newStatus)
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                if (txnMetaListPair.getLeft().status() == newStatus) {
+                    completableFuture.complete(null);
+                    return;
+                }
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setExpectedStatus(expectedStatus)
+                        .setMetadataOp(TransactionMetadataOp.UPDATE)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setNewStatus(newStatus)
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
-                appendLogCount.increment();
-                try {
-                    synchronized (txnMetaListPair.getLeft()) {
-                        txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
-                        txnMetaListPair.getRight().add(position);
+                transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
+                    if (throwable != null) {
+                        completableFuture.completeExceptionally(throwable);
+                        return;
                     }
-                    if (newStatus == TxnStatus.ABORTING && isTimeout) {
-                        this.transactionTimeoutCount.increment();
-                    }
-                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                        return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
-                            this.transactionMetadataStoreStats
-                                    .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                            - txnMetaListPair.getLeft().getOpenTimestamp());
-                            if (newStatus == TxnStatus.COMMITTED) {
-                                committedTransactionCount.increment();
-                            } else {
-                                abortedTransactionCount.increment();
-                            }
-                            txnMetaMap.remove(txnID.getLeastSigBits());
-                            return CompletableFuture.completedFuture(null);
-                        });
+                    appendLogCount.increment();
+                    try {
+                        synchronized (txnMetaListPair.getLeft()) {
+                            txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+                            txnMetaListPair.getRight().add(position);
+                        }
+                        if (newStatus == TxnStatus.ABORTING && isTimeout) {
+                            this.transactionTimeoutCount.increment();
+                        }
+                        if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                            transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
+                                if (exception != null) {
+                                    completableFuture.completeExceptionally(exception);
+                                    return;
+                                }
+                                this.transactionMetadataStoreStats
+                                        .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                - txnMetaListPair.getLeft().getOpenTimestamp());
+                                if (newStatus == TxnStatus.COMMITTED) {
+                                    committedTransactionCount.increment();
+                                } else {
+                                    abortedTransactionCount.increment();
+                                }
+                                txnMetaMap.remove(txnID.getLeastSigBits());
+                                completableFuture.complete(null);
+                            });
+                        }
+                        completableFuture.complete(null);
+                    } catch (InvalidTxnStatusException e) {
+                        transactionLog.deletePosition(Collections.singletonList(position));
+                        log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                + " add update txn status error with TxnStatus : "
+                                + txnMetaListPair.getLeft().status().name(), e);
+                        completableFuture.completeExceptionally(e);
                     }
-                    return CompletableFuture.completedFuture(null);
-                } catch (InvalidTxnStatusException e) {
-                    transactionLog.deletePosition(Collections.singletonList(position));
-                    log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                            + " add update txn status error with TxnStatus : "
-                            + txnMetaListPair.getLeft().status().name(), e);
-                    return FutureUtil.failedFuture(e);
-                }
+                });
             });
         });
+       return completableFuture;
     }
 
     @Override

[pulsar] 16/21: [Transaction] Fix end transaction at state of timeout (#14370)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3089aa4040cc6c42c05898fab00c98332a9cb395
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:15:53 2022 +0800

    [Transaction] Fix end transaction at state of timeout (#14370)
    
    ### Motivation
    For concurrency problems, timeout may change the status to timeout before commit/abort changes the status to committing/aborting.
    
    ### Modification
    Cancel timeout when commit or abort and then check the state.
    
    (cherry picked from commit 4b480450f32dc6ce5337d0b3d68a35111ddf474e)
---
 .../org/apache/pulsar/client/impl/transaction/TransactionImpl.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 8adc162..bba5331 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -165,10 +165,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> commit() {
+        timeout.cancel();
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
@@ -194,10 +194,10 @@ public class TransactionImpl implements Transaction , TimerTask {
 
     @Override
     public CompletableFuture<Void> abort() {
+        timeout.cancel();
         return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
-            timeout.cancel();
             allOpComplete().whenComplete((v, e) -> {
                 if (e != null) {
                     log.error(e.getMessage());

[pulsar] 02/21: [Transaction] Optimize testEndTBRecoveringWhenManagerLedgerDisReadable (#14303)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b70f6cdcd3a48550795c2e0e4de86669867b0586
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Feb 16 11:50:41 2022 +0800

    [Transaction] Optimize testEndTBRecoveringWhenManagerLedgerDisReadable (#14303)
    
    ### Motivation
    The time used by clean up is too long.
    This is because the broker service fails to execute the `unloadserviceunit`, and can only rely on the timeout mechanism to cancel the `unloadserviceunit` this time. As a result, the clean up takes too much time. The root cause is that there is a mock cursor in `ManagerLedger.cursors`.
    ### Modification
    Remove the cursor from  `ManagerLedger.cursors` after test.
    
    (cherry picked from commit a43fab0045a93ee864da6bc386bcc0e8bf17bf11)
---
 .../test/java/org/apache/pulsar/broker/transaction/TransactionTest.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index b627438..b882d26 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -562,6 +562,7 @@ public class TransactionTest extends TransactionTestBase {
         TransactionBuffer buffer2 = new TopicTransactionBuffer(persistentTopic);
         Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
                 assertEquals(buffer2.getStats().state, "Ready"));
+        managedCursors.removeCursor("transaction-buffer-sub");
     }
 
     @Test

[pulsar] 18/21: Validate rack name (#14336)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4e797416ab9bec5f8b9d93f67ac48555655bcdea
Author: Hang Chen <ch...@apache.org>
AuthorDate: Sun Feb 27 11:53:41 2022 +0800

    Validate rack name (#14336)
    
    If the rack name set in the following case:
    -  Enabled rack aware placement policy, and user set rack name which contains "/" in addition to the head and tail of the string. Such as "/r/a" or "r/a/b"
    - Enabled region aware placement policy, and user set the rack name which contains multiple "/" in addition to the head and tail of the string. Such as "/region/region/a" or "region/a/rack/b"
    
    (cherry picked from commit 25408f5078f29372cc2c2133abc8dbeb0b9a30cf)
---
 .../org/apache/pulsar/broker/admin/v2/Bookies.java | 16 ++++++
 .../apache/pulsar/broker/admin/BookiesApiTest.java | 58 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
index 1af839f..05eb1dd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java
@@ -43,6 +43,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.discover.RegistrationClient;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.net.BookieId;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.policies.data.BookieInfo;
@@ -55,6 +56,7 @@ import org.apache.pulsar.common.policies.data.RawBookieInfo;
 @Produces(MediaType.APPLICATION_JSON)
 @Slf4j
 public class Bookies extends AdminResource {
+    private static final String PATH_SEPARATOR = "/";
 
     @GET
     @Path("/racks-info")
@@ -162,6 +164,20 @@ public class Bookies extends AdminResource {
             throw new RestException(Status.PRECONDITION_FAILED, "Bookie 'group' parameters is missing");
         }
 
+        // validate rack name
+        int separatorCnt = StringUtils.countMatches(
+            StringUtils.strip(bookieInfo.getRack(), PATH_SEPARATOR), PATH_SEPARATOR);
+        boolean isRackEnabled = pulsar().getConfiguration().isBookkeeperClientRackawarePolicyEnabled();
+        boolean isRegionEnabled = pulsar().getConfiguration().isBookkeeperClientRegionawarePolicyEnabled();
+        if (isRackEnabled && ((isRegionEnabled && separatorCnt != 1) || (!isRegionEnabled && separatorCnt != 0))) {
+            asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Bookie 'rack' parameter is invalid, "
+                + "When `RackawareEnsemblePlacementPolicy` is enabled, the rack name is not allowed to contain "
+                + "slash (`/`) except for the beginning and end of the rack name string. "
+                + "When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain "
+                + "one slash (`/`) except for the beginning and end of the rack name string."));
+            return;
+        }
+
         getPulsarResources().getBookieResources()
                 .update(optionalBookiesRackConfiguration -> {
                     BookiesRackConfiguration brc = optionalBookiesRackConfiguration
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java
index 644d47d..a3bd52b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BookiesApiTest.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.BookieInfo;
@@ -123,6 +124,61 @@ public class BookiesApiTest extends MockedPulsarServiceBaseTest {
                 .get()
                 .getValue()
                 .size());
+
+        // test invalid rack name
+        // use rack aware placement policy
+        String errorMsg = "Bookie 'rack' parameter is invalid, When `RackawareEnsemblePlacementPolicy` is enabled, "
+            + "the rack name is not allowed to contain slash (`/`) except for the beginning and end of the rack name "
+            + "string. When `RegionawareEnsemblePlacementPolicy` is enabled, the rack name can only contain "
+            + "one slash (`/`) except for the beginning and end of the rack name string.";
+
+        BookieInfo newInfo3 = BookieInfo.builder()
+            .rack("/rack/a")
+            .hostname("127.0.0.2")
+            .build();
+        try {
+            admin.bookies().updateBookieRackInfo(bookie0, "default", newInfo3);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(412, e.getStatusCode());
+            assertEquals(errorMsg, e.getMessage());
+        }
+
+        BookieInfo newInfo4 = BookieInfo.builder()
+            .rack("/rack")
+            .hostname("127.0.0.2")
+            .build();
+        try {
+            admin.bookies().updateBookieRackInfo(bookie0, "default", newInfo4);
+        } catch (PulsarAdminException e) {
+            fail();
+        }
+
+        // enable region aware placement policy
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setBookkeeperClientRegionawarePolicyEnabled(true);
+        doReturn(configuration).when(pulsar).getConfiguration();
+        BookieInfo newInfo5 = BookieInfo.builder()
+            .rack("/region/rack/a")
+            .hostname("127.0.0.2")
+            .build();
+        try {
+            admin.bookies().updateBookieRackInfo(bookie0, "default", newInfo5);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(412, e.getStatusCode());
+            assertEquals(errorMsg, e.getMessage());
+        }
+
+        BookieInfo newInfo6 = BookieInfo.builder()
+            .rack("/region/rack/")
+            .hostname("127.0.0.2")
+            .build();
+        try {
+            admin.bookies().updateBookieRackInfo(bookie0, "default", newInfo6);
+        } catch (PulsarAdminException e) {
+            fail();
+        }
     }
 
 }

[pulsar] 19/21: [pulsar-broker] Fix avg-messagePerEntry metrics for consumer (#14330)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56f1660b2aa6b8c5eaa9480da448541d7b603cb1
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Feb 27 17:36:51 2022 -0800

    [pulsar-broker] Fix avg-messagePerEntry metrics for consumer (#14330)
    
    (cherry picked from commit 7b10bd040e947497113db702ebcb2381d9dfe0fc)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index bfaa660..bf906a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -266,7 +266,7 @@ public class Consumer {
 
         // calculate avg message per entry
         int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
-        tmpAvgMessagesPerEntry = (int) Math.round(tmpAvgMessagesPerEntry * avgPercent
+        tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent
                 + (1 - avgPercent) * totalMessages / entries.size());
         AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
 

[pulsar] 01/21: [security] Upgrade Postgre driver to 42.2.25 to get rid of CVE-2022-21724 (#14119)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1e17289e6ab9bb7b70034b807fa04e0f9232fc1b
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Feb 7 10:04:22 2022 +0100

    [security] Upgrade Postgre driver to 42.2.25 to get rid of CVE-2022-21724 (#14119)
    
    http://web.nvd.nist.gov/view/vuln/detail?vulnId=CVE-2022-21724
    
    Upgrade both `jdbc` and `debezium` Postgre java driver dependency to 42.2.25 (from 42.2.24 and 42.2.22).
    Note: the version is not shared on purpose because we should leave the driver dependencies separated since the two connectors are used in different ways. (For example, when we'll upgrade Debezium to 1.8.x we'll need to remove the override and keep the 42.3.x version)
    
    For cherry-picks, branch-2.9 and branch-2.8 are compatible since:
    * branch-2.9 has the same debezium version
    * branch-2.8 has 1.0.0 but it uses [pg driver 42.2.x](https://search.maven.org/artifact/io.debezium/debezium-parent/1.0.0.Final/pom) as well
    
    - [x] `no-need-doc`
    
    (cherry picked from commit 64818458727df20384463bbedf9cb7c92c0f9216)
---
 pom.xml                             | 3 ++-
 pulsar-io/debezium/postgres/pom.xml | 7 +++++++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 333f912..30df8e6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,7 +148,7 @@ flexible messaging model and an intuitive client API.</description>
     <jclouds.version>2.3.0</jclouds.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
     <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
-    <postgresql-jdbc.version>42.2.24</postgresql-jdbc.version>
+    <postgresql-jdbc.version>42.2.25</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.3.2</clickhouse-jdbc.version>
     <mariadb-jdbc.version>2.6.0</mariadb-jdbc.version>
     <hdfs-offload-version3>3.3.0</hdfs-offload-version3>
@@ -157,6 +157,7 @@ flexible messaging model and an intuitive client API.</description>
     <scala.binary.version>2.13</scala.binary.version>
     <scala-library.version>2.13.6</scala-library.version>
     <debezium.version>1.7.1.Final</debezium.version>
+    <debezium.postgresql.version>42.2.25</debezium.postgresql.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <hbase.version>2.3.0</hbase.version>
diff --git a/pulsar-io/debezium/postgres/pom.xml b/pulsar-io/debezium/postgres/pom.xml
index ad331e3..5bcf079 100644
--- a/pulsar-io/debezium/postgres/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -44,6 +44,13 @@
       <version>${debezium.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${debezium.postgresql.version}</version>
+      <scope>runtime</scope>
+    </dependency>
+
   </dependencies>
 
   <build>

[pulsar] 10/21: [C++] Fix GCC compilation failure caused by warning macro (#14402)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3101b662f04d113946c73a11940a3211ea809c65
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Feb 23 05:41:28 2022 +0800

    [C++] Fix GCC compilation failure caused by warning macro (#14402)
    
    ### Motivation
    
    When I tried to build the C++ client with GCC 7.3 and when warnings are
    printed (because `BOOST_ARCH_X86_64` is not defined), the compilation
    failed with:
    
    ```
    #warning “BOOST_ARCH_X86_64 is not defined, CRC32C SSE4.2 will be disabled”
      ^~~~~~~
    cc1plus: error: unrecognized command line option '-Wno-stringop-truncation' [-Werror]
    cc1plus: all warnings being treated as errors
    ```
    
    It seems to be a bug before GCC 8.1. I added
    `-DCMAKE_VERBOSE_MAKEFILE=ON` to CMake command and see the full compile
    command:
    
    ```
    -Wno-error -Wall -Wformat-security -Wvla -Werror -Wno-sign-compare -Wno-deprecated-declarations -Wno-error=cpp -Wno-stringop-truncation
    ```
    
    See
    https://github.com/apache/pulsar/blob/b829a4ce121268f55748bbdd6f19ac36129e7dab/pulsar-client-cpp/CMakeLists.txt#L105-L106
    
    For GCC > 4.9, `-Wno-stringop-truncation` option was added. However,
    when a message was printed by `#warning` macro, it would fail with the
    strange error message.
    
    The simplest way to reproduce the bug is compiling following code:
    
    ```c++
    #warnings "hello"
    
    int main() {}
    ```
    
    You can paste the code above to https://godbolt.org/, select any GCC
    compiler whose version is lower than 8.0, then add the following
    options:
    
    ```
    -Werror -Wno-error=cpp -Wno-stringop-truncation
    ```
    
    The compilation failed for x86-64 gcc 7.5 while it succeeded for 8.1.
    
    ### Modifications
    
    Only add the `-Wno-stringop-truncation` option for GCC >= 8.1.
    
    (cherry picked from commit 958fc7820106c9b4da33f1b720d1dcce8ff772b1)
---
 pulsar-client-cpp/CMakeLists.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 3fadb05..2d98281 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -99,7 +99,7 @@ else() # GCC or Clang are mostly compatible:
     # Options unique to Clang or GCC:
     if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
         add_compile_options(-Qunused-arguments) 
-    elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9))
+    elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8.1))
         add_compile_options(-Wno-stringop-truncation)
     endif()
 endif()

[pulsar] 15/21: [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5a99911434904f0cdfccaf50ad723a2cb9bbf44
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Feb 25 16:14:52 2022 +0800

    [Transaction] Adopt Single_thread to handle TcClient connecting (#13969)
    
    ### Motivation
    
    The broker will only reconnect the same TC once at the same time, and other connection requests during the reconnection period will be processed together after the connection is completed.
    There may be concurrency problems in the queue for request addition and the clearing of the queue.
    
    ### Modification
    
    Use SingleThread to deal TcClient connecting.
    
    (cherry picked from commit 29259e1b5c33856cd6bd9413e331f4592fd3007c)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   5 +
 .../broker/TransactionMetadataStoreService.java    | 152 ++++++++++++---------
 2 files changed, 91 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index adeec4c..7fcf0d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -403,6 +403,11 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 brokerAdditionalServlets = null;
             }
 
+            if (this.transactionMetadataStoreService != null) {
+                this.transactionMetadataStoreService.close();
+                this.transactionMetadataStoreService = null;
+            }
+
             GracefulExecutorServicesShutdown executorServicesShutdown =
                     GracefulExecutorServicesShutdown
                             .initiate()
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 9b60679..b3ced3c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTI
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
@@ -32,7 +33,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -84,9 +88,14 @@ public class TransactionMetadataStoreService {
     // one connect request open the transactionMetaStore the other request will add to the queue, when the open op
     // finished the request will be poll and complete the future
     private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
+    private final ExecutorService internalPinnedExecutor;
 
     private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
+    private final ThreadFactory threadFactory =
+            new DefaultThreadFactory("transaction-coordinator-thread-factory");
+
+
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
                                            PulsarService pulsarService, TransactionBufferClient tbClient,
                                            HashedWheelTimer timer) {
@@ -98,6 +107,7 @@ public class TransactionMetadataStoreService {
         this.transactionOpRetryTimer = timer;
         this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
         this.pendingConnectRequests = new ConcurrentLongHashMap<>();
+        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     @Deprecated
@@ -152,80 +162,86 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
-        if (stores.get(tcId) != null) {
-            return CompletableFuture.completedFuture(null);
-        } else {
-            return pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
-                    .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString()).thenCompose(v -> {
-                        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-                final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
-                        .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
-                Deque<CompletableFuture<Void>> deque = pendingConnectRequests
-                        .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
-                if (tcLoadSemaphore.tryAcquire()) {
-                    // when tcLoadSemaphore.release(), this command will acquire semaphore, so we should jude the store
-                    // exist again.
-                    if (stores.get(tcId) != null) {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    openTransactionMetadataStore(tcId).thenAccept((store) -> {
-                        stores.put(tcId, store);
-                        LOG.info("Added new transaction meta store {}", tcId);
-                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // complete queue request future
-                                    future.complete(null);
-                                } else {
-                                    break;
-                                }
-                            } else {
-                                deque.clear();
-                                break;
-                            }
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (stores.get(tcId) != null) {
+                completableFuture.complete(null);
+            } else {
+                pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
+                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
+                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
+                    if (tcLoadSemaphore.tryAcquire()) {
+                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
+                        // so we should jude the store exist again.
+                        if (stores.get(tcId) != null) {
+                            completableFuture.complete(null);
                         }
 
-                        completableFuture.complete(null);
-                        tcLoadSemaphore.release();
-                    }).exceptionally(e -> {
-                        completableFuture.completeExceptionally(e.getCause());
-                        // release before handle request queue, in order to client reconnect infinite loop
-                        tcLoadSemaphore.release();
-                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
-                        while (true) {
-                            // prevent thread in a busy loop.
-                            if (System.currentTimeMillis() < endTime) {
-                                CompletableFuture<Void> future = deque.poll();
-                                if (future != null) {
-                                    // this means that this tc client connection connect fail
-                                    future.completeExceptionally(e);
+                        openTransactionMetadataStore(tcId).thenAccept((store) -> internalPinnedExecutor.execute(() -> {
+                            stores.put(tcId, store);
+                            LOG.info("Added new transaction meta store {}", tcId);
+                            long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                            while (true) {
+                                // prevent thread in a busy loop.
+                                if (System.currentTimeMillis() < endTime) {
+                                    CompletableFuture<Void> future = deque.poll();
+                                    if (future != null) {
+                                        // complete queue request future
+                                        future.complete(null);
+                                    } else {
+                                        break;
+                                    }
                                 } else {
+                                    deque.clear();
                                     break;
                                 }
-                            } else {
-                                deque.clear();
-                                break;
                             }
+
+                            completableFuture.complete(null);
+                            tcLoadSemaphore.release();
+                        })).exceptionally(e -> {
+                            internalPinnedExecutor.execute(() -> {
+                                        completableFuture.completeExceptionally(e.getCause());
+                                        // release before handle request queue,
+                                        //in order to client reconnect infinite loop
+                                        tcLoadSemaphore.release();
+                                        long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
+                                        while (true) {
+                                            // prevent thread in a busy loop.
+                                            if (System.currentTimeMillis() < endTime) {
+                                                CompletableFuture<Void> future = deque.poll();
+                                                if (future != null) {
+                                                    // this means that this tc client connection connect fail
+                                                    future.completeExceptionally(e);
+                                                } else {
+                                                    break;
+                                                }
+                                            } else {
+                                                deque.clear();
+                                                break;
+                                            }
+                                        }
+                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+                                    });
+                                    return null;
+                                });
+                    } else {
+                        // only one command can open transaction metadata store,
+                        // other will be added to the deque, when the op of openTransactionMetadataStore finished
+                        // then handle the requests witch in the queue
+                        deque.add(completableFuture);
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
                         }
-                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
-                        return null;
-                    });
-                } else {
-                    // only one command can open transaction metadata store,
-                    // other will be added to the deque, when the op of openTransactionMetadataStore finished
-                    // then handle the requests witch in the queue
-                    deque.add(completableFuture);
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
                     }
-                }
-                return completableFuture;
-            });
-        }
+                }));
+            }
+        });
+        return completableFuture;
     }
 
     public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
@@ -537,4 +553,8 @@ public class TransactionMetadataStoreService {
     public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
         return Collections.unmodifiableMap(stores);
     }
+
+    public void close () {
+        this.internalPinnedExecutor.shutdown();
+    }
 }

[pulsar] 14/21: [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86fa49620191fd265dc27055f6840a100253aea8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Feb 25 16:07:42 2022 +0800

    [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379)
    
    (cherry picked from commit a96a1584ba2e9d19c6919b7597a0f344a2af1a35)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 202 +++++++++++----------
 1 file changed, 111 insertions(+), 91 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index d839e05..56ae17b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -45,7 +45,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
@@ -110,6 +112,7 @@ public class PulsarRecordCursor implements RecordCursor {
     private final long splitSize;
     private long entriesProcessed = 0;
     private int partition = -1;
+    private volatile Throwable deserializingError;
 
 
     private PulsarSqlSchemaInfoProvider schemaInfoProvider;
@@ -236,113 +239,125 @@ public class PulsarRecordCursor implements RecordCursor {
     }
 
     @VisibleForTesting
-    class DeserializeEntries implements Runnable {
+    class DeserializeEntries extends Thread {
 
-        protected boolean isRunning = false;
+        private final AtomicBoolean isRunning;
 
-        private final Thread thread;
+        private final CompletableFuture<Void> closeHandle;
 
         public DeserializeEntries() {
-            this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
+            super("deserialize-thread-split-" + pulsarSplit.getSplitId());
+            this.isRunning = new AtomicBoolean(false);
+            this.closeHandle = new CompletableFuture<>();
         }
 
-        public void interrupt() {
-            isRunning = false;
-            thread.interrupt();
+        @Override
+        public void start() {
+            if (isRunning.compareAndSet(false, true)) {
+                super.start();
+            }
         }
 
-        public void start() {
-            this.thread.start();
+        public CompletableFuture<Void> close() {
+            if (isRunning.compareAndSet(true, false)) {
+                super.interrupt();
+            }
+            return closeHandle;
         }
 
         @Override
         public void run() {
-            isRunning = true;
-            while (isRunning) {
-
-                 int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
-                    @Override
-                    public void accept(Entry entry) {
-
-                        try {
-                            entryQueueCacheSizeAllocator.release(entry.getLength());
-
-                            long bytes = entry.getDataBuffer().readableBytes();
-                            completedBytes += bytes;
-                            // register stats for bytes read
-                            metricsTracker.register_BYTES_READ(bytes);
-
-                            // check if we have processed all entries in this split
-                            // and no incomplete chunked messages exist
-                            if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
-                                return;
-                            }
-
-                            // set start time for time deserializing entries for stats
-                            metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+            try {
+                while (isRunning.get()) {
+                    int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() {
+                        @Override
+                        public void accept(Entry entry) {
 
                             try {
-                                MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
-                                        entry.getDataBuffer(), (message) -> {
-                                            try {
-                                                // start time for message queue read
-                                                metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-
-                                                if (message.getNumChunksFromMsg() > 1)  {
-                                                    message = processChunkedMessages(message);
-                                                } else if (entryExceedSplitEndPosition(entry)) {
-                                                    // skip no chunk or no multi chunk message
-                                                    // that exceed split end position
-                                                    message.release();
-                                                    message = null;
-                                                }
-                                                if (message != null) {
-                                                    while (true) {
-                                                        if (!haveAvailableCacheSize(
-                                                                messageQueueCacheSizeAllocator, messageQueue)
-                                                                || !messageQueue.offer(message)) {
-                                                            Thread.sleep(1);
-                                                        } else {
-                                                            messageQueueCacheSizeAllocator.allocate(
-                                                                    message.getData().readableBytes());
-                                                            break;
+                                entryQueueCacheSizeAllocator.release(entry.getLength());
+
+                                long bytes = entry.getDataBuffer().readableBytes();
+                                completedBytes += bytes;
+                                // register stats for bytes read
+                                metricsTracker.register_BYTES_READ(bytes);
+
+                                // check if we have processed all entries in this split
+                                // and no incomplete chunked messages exist
+                                if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) {
+                                    return;
+                                }
+
+                                // set start time for time deserializing entries for stats
+                                metricsTracker.start_ENTRY_DESERIALIZE_TIME();
+
+                                try {
+                                    MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
+                                            entry.getDataBuffer(), (message) -> {
+                                                try {
+                                                    // start time for message queue read
+                                                    metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+
+                                                    if (message.getNumChunksFromMsg() > 1)  {
+                                                        message = processChunkedMessages(message);
+                                                    } else if (entryExceedSplitEndPosition(entry)) {
+                                                        // skip no chunk or no multi chunk message
+                                                        // that exceed split end position
+                                                        message.release();
+                                                        message = null;
+                                                    }
+                                                    if (message != null) {
+                                                        while (true) {
+                                                            if (!haveAvailableCacheSize(
+                                                                    messageQueueCacheSizeAllocator, messageQueue)
+                                                                    || !messageQueue.offer(message)) {
+                                                                Thread.sleep(1);
+                                                            } else {
+                                                                messageQueueCacheSizeAllocator.allocate(
+                                                                        message.getData().readableBytes());
+                                                                break;
+                                                            }
                                                         }
                                                     }
-                                                }
 
-                                                // stats for how long a read from message queue took
-                                                metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
-                                                // stats for number of messages read
-                                                metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                                            } catch (InterruptedException e) {
-                                                //no-op
-                                            }
-                                        }, pulsarConnectorConfig.getMaxMessageSize());
-                            } catch (IOException e) {
-                                log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
-                                throw new RuntimeException(e);
-                            }
-                            // stats for time spend deserializing entries
-                            metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+                                                    // stats for how long a read from message queue took
+                                                    metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
+                                                    // stats for number of messages read
+                                                    metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
 
-                            // stats for num messages per entry
-                            metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
-
-                        } finally {
-                            entriesProcessed++;
-                            entry.release();
+                                                } catch (InterruptedException e) {
+                                                    //no-op
+                                                }
+                                            }, pulsarConnectorConfig.getMaxMessageSize());
+                                } catch (IOException e) {
+                                    log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
+                                    throw new RuntimeException(e);
+                                }
+                                // stats for time spend deserializing entries
+                                metricsTracker.end_ENTRY_DESERIALIZE_TIME();
+
+                                // stats for num messages per entry
+                                metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
+
+                            } finally {
+                                entriesProcessed++;
+                                entry.release();
+                            }
                         }
-                    }
-                });
+                    });
 
-                if (read <= 0) {
-                    try {
-                        Thread.sleep(1);
-                    } catch (InterruptedException e) {
-                        return;
+                    if (read <= 0) {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            return;
+                        }
                     }
                 }
+                closeHandle.complete(null);
+            } catch (Throwable ex) {
+                log.error(ex, "Stop running DeserializeEntries");
+                closeHandle.completeExceptionally(ex);
+                throw ex;
             }
         }
     }
@@ -468,6 +483,9 @@ public class PulsarRecordCursor implements RecordCursor {
         if (readEntries == null) {
             // start deserialize thread
             deserializeEntries = new DeserializeEntries();
+            deserializeEntries.setUncaughtExceptionHandler((t, ex) -> {
+                deserializingError = ex;
+            });
             deserializeEntries.start();
 
             readEntries = new ReadEntries();
@@ -492,6 +510,8 @@ public class PulsarRecordCursor implements RecordCursor {
             if (currentMessage != null) {
                 messageQueueCacheSizeAllocator.release(currentMessage.getData().readableBytes());
                 break;
+            } else if (deserializingError != null) {
+                throw new RuntimeException(deserializingError);
             } else {
                 try {
                     Thread.sleep(1);
@@ -503,7 +523,7 @@ public class PulsarRecordCursor implements RecordCursor {
             }
         }
 
-        //start time for deseralizing record
+        //start time for deserializing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
         SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
@@ -714,12 +734,12 @@ public class PulsarRecordCursor implements RecordCursor {
             messageQueue.drain(RawMessage::release);
         }
 
-        if (entryQueue != null) {
-            entryQueue.drain(Entry::release);
-        }
-
         if (deserializeEntries != null) {
-            deserializeEntries.interrupt();
+            deserializeEntries.close().whenComplete((r, t) -> {
+                if (entryQueue != null) {
+                    entryQueue.drain(Entry::release);
+                }
+            });
         }
 
         if (this.cursor != null) {

[pulsar] 13/21: Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6094879419001af2625d18555e55e267323a2b03
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Feb 25 15:23:40 2022 +0800

    Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)
    
    (cherry picked from commit 7a58aeba0b439479e1d68fa67c57e120f85687b0)
---
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 86 +++++++++++++---------
 1 file changed, 50 insertions(+), 36 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cbfc27d..471d4ba 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,9 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
@@ -56,6 +54,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Getter(AccessLevel.PUBLIC)
@@ -117,48 +116,63 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
         }
-        if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
+        CompletableFuture<Void> applyDLQConfig;
+        if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
             TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
-            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
-            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            try {
-                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    retryLetterTopic = oldRetryLetterTopic;
-                }
-                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    deadLetterTopic = oldDeadLetterTopic;
-                }
-            } catch (InterruptedException | TimeoutException e) {
-                return FutureUtil.failedFuture(e);
-            } catch (ExecutionException e) {
-                return FutureUtil.failedFuture(e.getCause());
-            }
-
-            if(conf.getDeadLetterPolicy() == null) {
-                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
+            String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+            String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
+                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+            DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+            if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
+                    || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+                CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+                applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
+                        .thenAccept(__ -> {
+                            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+                            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+                            if (retryLetterTopicMetadata.join().partitions > 0) {
+                                retryLetterTopic = oldRetryLetterTopic;
+                            }
+                            if (deadLetterTopicMetadata.join().partitions > 0) {
+                                deadLetterTopic = oldDeadLetterTopic;
+                            }
+                            if (deadLetterPolicy == null) {
+                                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
                                         .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
                                         .retryLetterTopic(retryLetterTopic)
                                         .deadLetterTopic(deadLetterTopic)
                                         .build());
+                            } else {
+                                if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
+                                }
+                                if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
+                                }
+                            }
+                            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                        });
             } else {
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
-                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
-                }
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
-                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
-                }
+                conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                applyDLQConfig = CompletableFuture.completedFuture(null);
             }
-            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+        } else {
+            applyDLQConfig = CompletableFuture.completedFuture(null);
         }
-        return interceptorList == null || interceptorList.size() == 0 ?
-                client.subscribeAsync(conf, schema, null) :
-                client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+        return applyDLQConfig.thenCompose(__ -> {
+            if (interceptorList == null || interceptorList.size() == 0) {
+                return client.subscribeAsync(conf, schema, null);
+            } else {
+                return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+            }
+        });
     }
 
     @Override
@@ -332,7 +346,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         conf.setAutoAckOldestChunkedMessageOnQueueFull(autoAckOldestChunkedMessageOnQueueFull);
         return this;
     }
-    
+
     @Override
     public ConsumerBuilder<T> property(String key, String value) {
         checkArgument(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value),

[pulsar] 03/21: Fix the wrong parameter in the log. (#14309)

Posted by rg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8279db990479d5ebc8c00c3bf7dde4cdcb05643d
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Feb 16 14:05:53 2022 +0800

    Fix the wrong parameter in the log. (#14309)
    
    ### Motivation
    
    Bad parameters in the log will always print "null", which can confuse users.
    
    ### Modifications
    
    - Correct exception parameter.
    
    (cherry picked from commit 9b003f96f1deb8e02a1654e30fbddd211d787e8e)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 838edfe..ddf926d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -323,7 +323,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 reader.readNextAsync().whenComplete((msg, e) -> {
                     if (e != null) {
                         log.error("[{}] Failed to read event from the system topic.",
-                                reader.getSystemTopic().getTopicName(), ex);
+                                reader.getSystemTopic().getTopicName(), e);
                         future.completeExceptionally(e);
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                         policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());