You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/25 07:58:19 UTC

[pulsar] branch branch-2.10 updated (fa096be -> 487a060)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from fa096be  Make sure policies.is_allow_auto_update_schema not null (#14409)
     new b20b6f2  [CI]Upgrade Windows runner os to windows-2022 and generator to Visual Studio 17 2022 (#14368)
     new 40a44c5  Fix adding message to list potential issue (#14377)
     new 2b24d8c  [Broker] waitingCursors potential  heap memory leak  (#13939)
     new a382237  Fix print error in OpAddEntry (#14392)
     new efd9ca9  ZKMetadataStore Use ZK_SCHEME_IDENTIFIER instead (#14394)
     new 8d1d78e  Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
     new 177b797  [C++] Fix GCC compilation failure caused by warning macro (#14402)
     new 9f39d02  Exclude more files from src package (#14415)
     new acd9341  [Transaction] Adopt single thread pool in TC (#14238)
     new 6c410fc  [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
     new c14c949  fix NPE in method of probeAvailable (#14454)
     new 04c258e  Fix switchBackDelayNs failure problem in class of  AutoClusterFailover (#14442)
     new 487a060  Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-cpp-build-windows.yaml        |  10 +-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   7 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   4 +
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |   4 +-
 .../service/persistent/PersistentSubscription.java |   1 +
 .../buffer/impl/TopicTransactionBuffer.java        |   1 -
 .../broker/admin/CreateSubscriptionTest.java       |  18 ++
 .../pulsar/broker/transaction/TransactionTest.java |  25 ++
 pulsar-client-cpp/CMakeLists.txt                   |   2 +-
 .../pulsar/client/impl/AutoClusterFailover.java    |  15 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  78 ++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   7 +-
 .../pulsar/client/impl/schema/AbstractSchema.java  |   7 +-
 .../client/impl/AutoClusterFailoverTest.java       |  43 +++
 .../org/apache/pulsar/client/impl/MessageTest.java |  13 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |   4 +-
 .../impl/MLTransactionMetadataStore.java           | 328 ++++++++++++---------
 src/assembly-source-package.xml                    |   4 +-
 .../offload/jcloud/impl/MockManagedLedger.java     |   5 +
 19 files changed, 374 insertions(+), 202 deletions(-)

[pulsar] 04/13: Fix print error in OpAddEntry (#14392)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a3822373f4944bdf5ea383440293edc0f408d6b2
Author: liuchangqing <ca...@126.com>
AuthorDate: Mon Feb 21 22:21:01 2022 +0800

    Fix print error in OpAddEntry (#14392)
    
    ### Motivation
    
    Fix print error.
    
    ### Modifications
    
    ### Verifying this change
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
    Check the box below and label this PR (if you have committer privilege).
    
    Need to update docs?
    
    - [x] `no-need-doc`
    
    Co-authored-by: liu.changqing <ch...@kingdee.com>
    (cherry picked from commit e219afd4aa93d11c649ff111cfe8bfe8eb105007)
---
 .../src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index 226ad04..c25fa4f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -375,8 +375,8 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
         ManagedLedgerImpl ml = this.ml;
         LedgerHandle ledger = this.ledger;
         return "OpAddEntry{"
-                + "mlName=" + ml != null ? ml.getName() : "null"
-                + ", ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null"
+                + "mlName=" + (ml != null ? ml.getName() : "null")
+                + ", ledgerId=" + (ledger != null ? String.valueOf(ledger.getId()) : "null")
                 + ", entryId=" + entryId
                 + ", startTime=" + startTime
                 + ", dataLength=" + dataLength

[pulsar] 05/13: ZKMetadataStore Use ZK_SCHEME_IDENTIFIER instead (#14394)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit efd9ca9f4a54c2c311e5a57aca821032f0806f75
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 02:32:13 2022 +0800

    ZKMetadataStore Use ZK_SCHEME_IDENTIFIER instead (#14394)
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
    (cherry picked from commit c153dea98e654c16d11758fcbfdba87cc580acf3)
---
 .../main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index cd20295..6697934 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -81,8 +81,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
         super(metadataStoreConfig);
 
         try {
-            if (metadataURL.startsWith("zk:")) {
-                this.zkConnectString = metadataURL.substring(3);
+            if (metadataURL.startsWith(ZK_SCHEME_IDENTIFIER)) {
+                this.zkConnectString = metadataURL.substring(ZK_SCHEME_IDENTIFIER.length());
             } else {
                 this.zkConnectString = metadataURL;
             }

[pulsar] 03/13: [Broker] waitingCursors potential heap memory leak (#13939)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2b24d8ca366961aded44f1e27ddd3c845f8ff2f6
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 02:18:56 2022 +0800

    [Broker] waitingCursors potential  heap memory leak  (#13939)
    
    (cherry picked from commit 478fd36227c2ede3e1162dd9a4361cffc5dbfceb)
---
 .../org/apache/bookkeeper/mledger/ManagedLedger.java   |  7 +++++++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     |  4 ++++
 .../service/persistent/PersistentSubscription.java     |  1 +
 .../pulsar/broker/admin/CreateSubscriptionTest.java    | 18 ++++++++++++++++++
 .../mledger/offload/jcloud/impl/MockManagedLedger.java |  5 +++++
 5 files changed, 35 insertions(+)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 5c62dba..1f6e0d3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -293,6 +293,13 @@ public interface ManagedLedger {
     void deleteCursor(String name) throws InterruptedException, ManagedLedgerException;
 
     /**
+     * Remove a ManagedCursor from this ManagedLedger's waitingCursors.
+     *
+     * @param cursor the ManagedCursor
+     */
+    void removeWaitingCursor(ManagedCursor cursor);
+
+    /**
      * Open a ManagedCursor asynchronously.
      *
      * @see #openCursor(String)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 23fd63a..bfa3336 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3485,6 +3485,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    public void removeWaitingCursor(ManagedCursor cursor) {
+        this.waitingCursors.remove(cursor);
+    }
+
     public boolean isCursorActive(ManagedCursor cursor) {
         return activeCursors.get(cursor.getName()) != null;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 6d74b53..fc4d0f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -308,6 +308,7 @@ public class PersistentSubscription implements Subscription {
 
         if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
             deactivateCursor();
+            topic.getManagedLedger().removeWaitingCursor(cursor);
 
             if (!cursor.isDurable()) {
                 // If cursor is not durable, we need to clean up the subscription as well
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
index 59ebdd5..5b43419 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.io.IOException;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.core.Response.Status;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.config.RequestConfig;
@@ -39,13 +40,16 @@ import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -348,4 +352,18 @@ public class CreateSubscriptionTest extends ProducerConsumerBase {
 
         producer.close();
     }
+
+    @Test
+    public void testWaitingCurosrCausedMemoryLeak() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-topic";
+        for (int i = 0; i < 10; i ++) {
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                    .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe();
+            Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
+            consumer.close();
+        }
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger());
+        assertEquals(ml.getWaitingCursorsCount(), 0);
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index d025cd1..c92c2f8 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -138,6 +138,11 @@ public class MockManagedLedger implements ManagedLedger {
     }
 
     @Override
+    public void removeWaitingCursor(ManagedCursor cursor) {
+
+    }
+
+    @Override
     public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) {
 
     }

[pulsar] 13/13: Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 487a060e7844fa5f863b1e0937b634835ced5608
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Feb 25 15:23:40 2022 +0800

    Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14433)
    
    (cherry picked from commit 7a58aeba0b439479e1d68fa67c57e120f85687b0)
---
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 78 ++++++++++++----------
 1 file changed, 44 insertions(+), 34 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index c55cc95..9e2f505 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -25,9 +25,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
@@ -58,6 +56,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Getter(AccessLevel.PUBLIC)
@@ -119,52 +118,63 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
         }
+        CompletableFuture<Void> applyDLQConfig;
         if (conf.isRetryEnable() && conf.getTopicNames().size() > 0) {
             TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
-            String retryLetterTopic =
-                    topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String deadLetterTopic =
-                    topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
                     + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName()
                     + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            try {
-                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    retryLetterTopic = oldRetryLetterTopic;
-                }
-                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    deadLetterTopic = oldDeadLetterTopic;
-                }
-            } catch (InterruptedException | TimeoutException e) {
-                return FutureUtil.failedFuture(e);
-            } catch (ExecutionException e) {
-                return FutureUtil.failedFuture(e.getCause());
-            }
-
-            if (conf.getDeadLetterPolicy() == null) {
-                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
+            DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+            if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
+                    || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+                CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+                applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
+                        .thenAccept(__ -> {
+                            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+                            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+                            if (retryLetterTopicMetadata.join().partitions > 0) {
+                                retryLetterTopic = oldRetryLetterTopic;
+                            }
+                            if (deadLetterTopicMetadata.join().partitions > 0) {
+                                deadLetterTopic = oldDeadLetterTopic;
+                            }
+                            if (deadLetterPolicy == null) {
+                                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
                                         .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
                                         .retryLetterTopic(retryLetterTopic)
                                         .deadLetterTopic(deadLetterTopic)
                                         .build());
+                            } else {
+                                if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
+                                }
+                                if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
+                                }
+                            }
+                            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                        });
             } else {
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
-                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
-                }
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
-                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
-                }
+                conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                applyDLQConfig = CompletableFuture.completedFuture(null);
             }
-            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+        } else {
+            applyDLQConfig = CompletableFuture.completedFuture(null);
         }
-        return interceptorList == null || interceptorList.size() == 0
-                ? client.subscribeAsync(conf, schema, null)
-                : client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+        return applyDLQConfig.thenCompose(__ -> {
+            if (interceptorList == null || interceptorList.size() == 0) {
+                return client.subscribeAsync(conf, schema, null);
+            } else {
+                return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+            }
+        });
     }
 
     @Override

[pulsar] 10/13: [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6c410fc8e4187f010845e9d344a61e817954617c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:54:17 2022 +0800

    [Transaction] delete changeMaxReadPositionAndAddAbortTimes when checkIfNoSnapshot (#14276)
    
    (cherry picked from commit 0a9fd913528181951fd6ad97d3ba07e11e77cd70)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  1 -
 .../pulsar/broker/transaction/TransactionTest.java | 25 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index a64b0f7..d61bd28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -474,7 +474,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
                 maxReadPosition = position;
-                changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
                     maxReadPosition = position;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 2e7fc5a..07a7c95 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -796,4 +797,28 @@ public class TransactionTest extends TransactionTestBase {
         timeout = (Timeout) field.get(transaction);
         Assert.assertTrue(timeout.isCancelled());
     }
+
+    @Test
+    public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService()
+                .getTopic(NAMESPACE1 + "/test", true)
+                .get().get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
+        field.setAccessible(true);
+        AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) field.get(buffer);
+        Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
+        field1.setAccessible(true);
+
+        Awaitility.await().untilAsserted(() -> {
+                    TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
+                    Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
+        });
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+        buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
+        Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
+
+    }
 }
\ No newline at end of file

[pulsar] 08/13: Exclude more files from src package (#14415)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f39d02e4f513daad712a140a21aa3f3f8b3dd18
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Wed Feb 23 00:46:19 2022 +0100

    Exclude more files from src package (#14415)
    
    Source packages might include pom.xml.versionsBackup and
    dependency-reduced-pom.xml files. This change adds exclude rules to
    avoid including them.
    
    (cherry picked from commit 8e9fa9d378357044ea572a23245979ee512ea3f2)
---
 src/assembly-source-package.xml | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/assembly-source-package.xml b/src/assembly-source-package.xml
index 65438c3..f5df61f 100644
--- a/src/assembly-source-package.xml
+++ b/src/assembly-source-package.xml
@@ -74,7 +74,9 @@
         
         <!-- misc -->
         <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?cobertura\.ser]</exclude>
-        
+        <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?pom\.xml\.versionsBackup]</exclude>
+        <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?dependency-reduced-pom\.xml]</exclude>
+
         <!-- release-plugin temp files -->
         <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?pom\.xml\.releaseBackup]</exclude>
         <exclude>%regex[(?!((?!${project.build.directory}/)[^/]+/)*src/)(.*/)?release\.properties]</exclude>

[pulsar] 12/13: Fix switchBackDelayNs failure problem in class of AutoClusterFailover (#14442)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 04c258e1daaa86e1e993cdd5b38f03d57c8f75c8
Author: lin chen <15...@qq.com>
AuthorDate: Fri Feb 25 14:00:54 2022 +0800

    Fix switchBackDelayNs failure problem in class of  AutoClusterFailover (#14442)
    
    (cherry picked from commit 0ef7baa5131f0dee4a57e61eae1d7686f7f60f1e)
---
 .../pulsar/client/impl/AutoClusterFailover.java    |  6 ++-
 .../client/impl/AutoClusterFailoverTest.java       | 43 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 61324b1..baccc29 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -104,8 +104,10 @@ public class AutoClusterFailover implements ServiceUrlProvider {
                 probeAndUpdateServiceUrl(primary, primaryAuthentication, primaryTlsTrustCertsFilePath,
                         primaryTlsTrustStorePath, primaryTlsTrustStorePassword);
                 // secondary cluster is up, check whether need to switch back to primary or not
-                probeAndCheckSwitchBack(primary, primaryAuthentication, primaryTlsTrustCertsFilePath,
-                        primaryTlsTrustStorePath, primaryTlsTrustStorePassword);
+                if (!currentPulsarServiceUrl.equals(primary)) {
+                    probeAndCheckSwitchBack(primary, primaryAuthentication, primaryTlsTrustCertsFilePath,
+                            primaryTlsTrustStorePath, primaryTlsTrustStorePassword);
+                }
             }
         }), intervalMs, intervalMs, TimeUnit.MILLISECONDS);
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
index 6310d49..0469bd4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/AutoClusterFailoverTest.java
@@ -33,6 +33,7 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 @Test(groups = "broker-impl")
@@ -99,6 +100,48 @@ public class AutoClusterFailoverTest {
     }
 
     @Test
+    public void testInitialize() {
+        String primary = "pulsar://localhost:6650";
+        String secondary = "pulsar://localhost:6651";
+        long failoverDelay = 10;
+        long switchBackDelay = 10;
+        long checkInterval = 1_000;
+
+        ClientConfigurationData configurationData = new ClientConfigurationData();
+
+        ServiceUrlProvider provider = AutoClusterFailover.builder()
+                .primary(primary)
+                .secondary(Collections.singletonList(secondary))
+                .failoverDelay(failoverDelay, TimeUnit.MILLISECONDS)
+                .switchBackDelay(switchBackDelay, TimeUnit.MILLISECONDS)
+                .checkInterval(checkInterval, TimeUnit.MILLISECONDS)
+                .build();
+
+        AutoClusterFailover autoClusterFailover = Mockito.spy((AutoClusterFailover) provider);
+        PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+        Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
+        Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(secondary);
+        Mockito.doReturn(configurationData).when(pulsarClient).getConfiguration();
+
+        autoClusterFailover.initialize(pulsarClient);
+
+        for (int i = 0; i < 2; i++) {
+            Awaitility.await().untilAsserted(() ->
+                    Assert.assertEquals(secondary, autoClusterFailover.getServiceUrl()));
+            assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+
+            // primary cluster came back
+            Mockito.doReturn(true).when(autoClusterFailover).probeAvailable(primary);
+            Awaitility.await().untilAsserted(() ->
+                    Assert.assertEquals(primary, autoClusterFailover.getServiceUrl()));
+            assertEquals(-1, autoClusterFailover.getRecoverTimestamp());
+            assertEquals(-1, autoClusterFailover.getFailedTimestamp());
+
+            Mockito.doReturn(false).when(autoClusterFailover).probeAvailable(primary);
+        }
+    }
+
+    @Test
     public void testAutoClusterFailoverSwitchWithoutAuthentication() {
         String primary = "pulsar://localhost:6650";
         String secondary = "pulsar://localhost:6651";

[pulsar] 06/13: Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d1d78e1eecc082fe016da7452f5235b55126555
Author: 萧易客 <km...@live.com>
AuthorDate: Tue Feb 22 17:17:47 2022 +0800

    Fix send to deadLetterTopic not working when reach maxRedeliverCount (#14317)
    
    If a message reached maxRedeliverCount, it will send to deadLetterTopic, since 2.8.0, this mechanism is broken, it was introduced in #9970
    
    (cherry picked from commit 16beb9d97fdc64092c8f3fe6959d6bf20dd0aa13)
---
 .../apache/pulsar/client/impl/schema/AbstractSchema.java    |  7 +++----
 .../java/org/apache/pulsar/client/impl/MessageTest.java     | 13 ++++++++++++-
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index f1e6a2d..c9fd683 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -74,14 +74,13 @@ public abstract class AbstractSchema<T> implements Schema<T> {
      * @param schemaVersion the version
      * @return the schema at that specific version
      * @throws SchemaSerializationException in case of unknown schema version
-     * @throws NullPointerException in case of null schemaVersion
+     * @throws NullPointerException in case of null schemaVersion and supportSchemaVersioning is true
      */
     public Schema<?> atSchemaVersion(byte[] schemaVersion) throws SchemaSerializationException {
-        Objects.requireNonNull(schemaVersion);
         if (!supportSchemaVersioning()) {
             return this;
-        } else {
-            throw new SchemaSerializationException("Not implemented for " + this.getClass());
         }
+        Objects.requireNonNull(schemaVersion);
+        throw new SchemaSerializationException("Not implemented for " + this.getClass());
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 6d633e7..13cf4f6 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -22,8 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
-
 import java.nio.ByteBuffer;
+import java.util.Optional;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
@@ -81,4 +81,15 @@ public class MessageTest {
         assertFalse(topicMessage.isReplicated());
         assertNull(topicMessage.getReplicatedFrom());
     }
+
+    @Test
+    public void testMessageImplGetReaderSchema() {
+        MessageMetadata builder = new MessageMetadata();
+        builder.hasSchemaVersion();
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, Schema.BYTES, null);
+
+        Optional<Schema<?>> readerSchema = msg.getReaderSchema();
+        assertTrue(readerSchema.isPresent());
+    }
 }

[pulsar] 11/13: fix NPE in method of probeAvailable (#14454)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c14c9496936e171ead47d61a1d751af2d1923daf
Author: lin chen <15...@qq.com>
AuthorDate: Fri Feb 25 10:19:56 2022 +0800

    fix NPE in method of probeAvailable (#14454)
    
    ### Motivation
    
    fix NPE in method of probeAvailable.  uri.getHost() may return null:
    https://github.com/apache/pulsar/blob/ced57866700aaeae163bcc6670d9a8eb1ffe8c50/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java#L129-L131
    
    (cherry picked from commit 67160a512d7d18a7f42619719c83dc40e53e5132)
---
 .../java/org/apache/pulsar/client/impl/AutoClusterFailover.java  | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 726f9b1..61324b1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -22,9 +22,7 @@ import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowabl
 import com.google.common.base.Strings;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -39,7 +37,6 @@ import org.apache.pulsar.client.api.AutoClusterFailoverBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.ServiceUrlProvider;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-import org.apache.pulsar.common.net.ServiceURI;
 
 @Slf4j
 @Data
@@ -64,6 +61,7 @@ public class AutoClusterFailover implements ServiceUrlProvider {
     private long failedTimestamp;
     private final long intervalMs;
     private static final int TIMEOUT = 30_000;
+    private final PulsarServiceNameResolver resolver;
 
     private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) {
         this.primary = builder.primary;
@@ -79,6 +77,7 @@ public class AutoClusterFailover implements ServiceUrlProvider {
         this.recoverTimestamp = -1;
         this.failedTimestamp = -1;
         this.intervalMs = builder.checkIntervalMs;
+        this.resolver = new PulsarServiceNameResolver();
         this.executor = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory("pulsar-service-provider"));
     }
@@ -124,9 +123,9 @@ public class AutoClusterFailover implements ServiceUrlProvider {
 
     boolean probeAvailable(String url) {
         try {
-            URI uri = ServiceURI.create(url).getUri();
+            resolver.updateServiceUrl(url);
             Socket socket = new Socket();
-            socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), TIMEOUT);
+            socket.connect(resolver.resolveHost(), TIMEOUT);
             socket.close();
             return true;
         } catch (Exception e) {

[pulsar] 07/13: [C++] Fix GCC compilation failure caused by warning macro (#14402)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 177b797bffe52b82b46179528983a0e95032ec79
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Feb 23 05:41:28 2022 +0800

    [C++] Fix GCC compilation failure caused by warning macro (#14402)
    
    ### Motivation
    
    When I tried to build the C++ client with GCC 7.3 and when warnings are
    printed (because `BOOST_ARCH_X86_64` is not defined), the compilation
    failed with:
    
    ```
    #warning “BOOST_ARCH_X86_64 is not defined, CRC32C SSE4.2 will be disabled”
      ^~~~~~~
    cc1plus: error: unrecognized command line option '-Wno-stringop-truncation' [-Werror]
    cc1plus: all warnings being treated as errors
    ```
    
    It seems to be a bug before GCC 8.1. I added
    `-DCMAKE_VERBOSE_MAKEFILE=ON` to CMake command and see the full compile
    command:
    
    ```
    -Wno-error -Wall -Wformat-security -Wvla -Werror -Wno-sign-compare -Wno-deprecated-declarations -Wno-error=cpp -Wno-stringop-truncation
    ```
    
    See
    https://github.com/apache/pulsar/blob/b829a4ce121268f55748bbdd6f19ac36129e7dab/pulsar-client-cpp/CMakeLists.txt#L105-L106
    
    For GCC > 4.9, `-Wno-stringop-truncation` option was added. However,
    when a message was printed by `#warning` macro, it would fail with the
    strange error message.
    
    The simplest way to reproduce the bug is compiling following code:
    
    ```c++
    #warnings "hello"
    
    int main() {}
    ```
    
    You can paste the code above to https://godbolt.org/, select any GCC
    compiler whose version is lower than 8.0, then add the following
    options:
    
    ```
    -Werror -Wno-error=cpp -Wno-stringop-truncation
    ```
    
    The compilation failed for x86-64 gcc 7.5 while it succeeded for 8.1.
    
    ### Modifications
    
    Only add the `-Wno-stringop-truncation` option for GCC >= 8.1.
    
    (cherry picked from commit 958fc7820106c9b4da33f1b720d1dcce8ff772b1)
---
 pulsar-client-cpp/CMakeLists.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 3c43aa5..24db7da 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -102,7 +102,7 @@ else() # GCC or Clang are mostly compatible:
     # Options unique to Clang or GCC:
     if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
         add_compile_options(-Qunused-arguments) 
-    elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 4.9))
+    elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8.1))
         add_compile_options(-Wno-stringop-truncation)
     endif()
 endif()

[pulsar] 01/13: [CI]Upgrade Windows runner os to windows-2022 and generator to Visual Studio 17 2022 (#14368)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b20b6f24aaf825f7a9c144c4d4143ffa3350fcf9
Author: Lishen Yao <ya...@gmail.com>
AuthorDate: Fri Feb 18 18:37:27 2022 +0800

    [CI]Upgrade Windows runner os to windows-2022 and generator to Visual Studio 17 2022 (#14368)
    
    ### Motivation
    
    As github windows runner latest upgrade to [2022](https://github.com/actions/virtual-environments/issues/4856), the `ci-cpp-build-windows.yaml` workflow should be changed otherwise the action would be failed such as [this one](https://github.com/apache/pulsar/actions/runs/1860147632)
    
    ### Modifications
    
    - Upgrade os version to 2022
    - Change generator to `Visual Studio 17 2022`
    
    (cherry picked from commit 0facd24e8eec2df56f6f241ce2cf87eb98590a7e)
---
 .github/workflows/ci-cpp-build-windows.yaml | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/ci-cpp-build-windows.yaml b/.github/workflows/ci-cpp-build-windows.yaml
index a287a4c..c622b65 100644
--- a/.github/workflows/ci-cpp-build-windows.yaml
+++ b/.github/workflows/ci-cpp-build-windows.yaml
@@ -46,18 +46,18 @@ jobs:
       matrix:
         include:
           - name: 'Windows x64'
-            os: windows-latest
+            os: windows-2022
             triplet: x64-windows
             vcpkg_dir: 'C:\vcpkg'
             suffix: 'windows-win64'
-            generator: 'Visual Studio 16 2019'
+            generator: 'Visual Studio 17 2022'
             arch: '-A x64'
           - name: 'Windows x86'
-            os: windows-latest
+            os: windows-2022
             triplet: x86-windows
             vcpkg_dir: 'C:\vcpkg'
             suffix: 'windows-win32'
-            generator: 'Visual Studio 16 2019'
+            generator: 'Visual Studio 17 2022'
             arch: '-A Win32'
 
     steps:
@@ -65,7 +65,7 @@ jobs:
         uses: actions/checkout@v2
 
       - name: Detect changed files
-        id:   changes
+        id: changes
         uses: apache/pulsar-test-infra/paths-filter@master
         with:
           filters: .github/changes-filter.yaml

[pulsar] 02/13: Fix adding message to list potential issue (#14377)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 40a44c5cd50671fb8cf747ac305ac48aaa87884a
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800

    Fix adding message to list potential issue (#14377)
    
    (cherry picked from commit b22445f961da5cf2e7baaac4b3847007f4c6ed59)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e09a934..b5599c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1907,18 +1907,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
         List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
+        List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
         messageIds.forEach(messageId -> {
             CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
-            futures.add(future);
-            future.thenAccept(sendToDLQ -> {
+            futures.add(future.thenAccept(sendToDLQ -> {
                 if (!sendToDLQ) {
                     data.add(new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
                             .setEntryId(messageId.getEntryId()));
                 }
-            });
+            }));
         });
         return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
     }

[pulsar] 09/13: [Transaction] Adopt single thread pool in TC (#14238)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit acd934106182a8c250155f09842d3119c85e02d6
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Feb 24 21:57:38 2022 +0800

    [Transaction] Adopt single thread pool in TC (#14238)
    
    ### Motivation
    Optimize code and improve maintainability.
    ### Modification
    * Option 1 (the way I use)
    Create a thread pool at peer TC.
      * advantage
      Each TC has a single thread pool to perform its own tasks, and will not be blocked due to sharing a single  thread with other TCs
      * disadvantage
      Too many thread pools may be created
    * Option 2
    Create an ExecuteProvider in the TC service.  It create some single-threaded pools when the TC Service is created, and  then assign a single-threaded pool to TC when the TC is created
       * The advantages and disadvantages are opposite to the option one
    
    (cherry picked from commit ced57866700aaeae163bcc6670d9a8eb1ffe8c50)
---
 .../impl/MLTransactionMetadataStore.java           | 328 ++++++++++++---------
 1 file changed, 188 insertions(+), 140 deletions(-)

diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index a71d203..f109ec4 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.transaction.coordinator.impl;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -69,6 +72,7 @@ public class MLTransactionMetadataStore
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
+    private final ExecutorService internalPinnedExecutor;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
@@ -87,12 +91,16 @@ public class MLTransactionMetadataStore
         this.abortedTransactionCount = new LongAdder();
         this.transactionTimeoutCount = new LongAdder();
         this.appendLogCount = new LongAdder();
+        DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+                + tcID.toString() + "thread_factory");
+        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
 
         if (!changeToInitializingState()) {
             log.error("Managed ledger transaction metadata store change state error when init it");
             return;
         }
-        new Thread(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+        internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
 
             @Override
             public void replayComplete() {
@@ -125,7 +133,8 @@ public class MLTransactionMetadataStore
                                 long timeoutAt = transactionMetadataEntry.getTimeoutMs();
                                 txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
                                         openTimestamp, timeoutAt), positions));
-                                recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp);
+                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                        timeoutAt + openTimestamp);
                             }
                             break;
                         case ADD_PARTITION:
@@ -174,7 +183,7 @@ public class MLTransactionMetadataStore
                     log.error(e.getMessage(), e);
                 }
             }
-        })).start();
+        }));
     }
 
     @Override
@@ -195,167 +204,206 @@ public class MLTransactionMetadataStore
     }
 
     @Override
-    public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException
-                            .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
-        }
+    public CompletableFuture<TxnID> newTransaction(long timeOut) {
+        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+                return;
+            }
 
-        long mostSigBits = tcID.getId();
-        long leastSigBits = sequenceIdGenerator.generateSequenceId();
-        TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-        long currentTimeMillis = System.currentTimeMillis();
-        TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                .setTxnidMostBits(mostSigBits)
-                .setTxnidLeastBits(leastSigBits)
-                .setStartTime(currentTimeMillis)
-                .setTimeoutMs(timeOut)
-                .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                .setLastModificationTime(currentTimeMillis)
-                .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-        return transactionLog.append(transactionMetadataEntry)
-                .thenCompose(position -> {
-                    appendLogCount.increment();
-                    TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
-                    List<Position> positions = new ArrayList<>();
-                    positions.add(position);
-                    Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
-                    txnMetaMap.put(leastSigBits, pair);
-                    this.timeoutTracker.addTransaction(leastSigBits, timeOut);
-                    createdTransactionCount.increment();
-                    return CompletableFuture.completedFuture(txnID);
-                });
+            long mostSigBits = tcID.getId();
+            long leastSigBits = sequenceIdGenerator.generateSequenceId();
+            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+            long currentTimeMillis = System.currentTimeMillis();
+            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                    .setTxnidMostBits(mostSigBits)
+                    .setTxnidLeastBits(leastSigBits)
+                    .setStartTime(currentTimeMillis)
+                    .setTimeoutMs(timeOut)
+                    .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                    .setLastModificationTime(currentTimeMillis)
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            transactionLog.append(transactionMetadataEntry)
+                    .whenComplete((position, throwable) -> {
+                        if (throwable != null) {
+                            completableFuture.completeExceptionally(throwable);
+                        } else {
+                            appendLogCount.increment();
+                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+                            List<Position> positions = new ArrayList<>();
+                            positions.add(position);
+                            Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+                            txnMetaMap.put(leastSigBits, pair);
+                            this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+                            createdTransactionCount.increment();
+                            completableFuture.complete(txnID);
+                        }
+                    });
+        });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add produced partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
-                    .addAllPartitions(partitions)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture
+                        .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "add produced partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
+                        .addAllPartitions(partitions)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addProducedPartitions(partitions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add produced partition error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addProducedPartitions(partitions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add produced partition error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
+    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           List<TransactionSubscription> txnSubscriptions) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "add acked partition"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
-                    .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
+                return;
+            }
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
+                        .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry)
-                    .thenCompose(position -> {
-                        appendLogCount.increment();
-                        try {
-                            synchronized (txnMetaListPair.getLeft()) {
-                                txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
-                                txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, exception) -> {
+                            if (exception != null) {
+                                completableFuture.completeExceptionally(exception);
+                                return;
                             }
-                            return CompletableFuture.completedFuture(null);
-                        } catch (InvalidTxnStatusException e) {
-                            transactionLog.deletePosition(Collections.singletonList(position));
-                            log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                    + " add acked subscription error with TxnStatus : "
-                                    + txnMetaListPair.getLeft().status().name(), e);
-                            return FutureUtil.failedFuture(e);
-                        }
-                    });
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
+                                    txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
+                                }
+                                completableFuture.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add acked subscription error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                completableFuture.completeExceptionally(e);
+                            }
+                        });
+            });
         });
+        return completableFuture;
     }
 
     @Override
-    public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
+    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
                                                                 TxnStatus expectedStatus, boolean isTimeout) {
-        if (!checkIfReady()) {
-            return FutureUtil.failedFuture(
-                    new CoordinatorException.TransactionMetadataStoreStateException(tcID,
-                            State.Ready, getState(), "update transaction status"));
-        }
-        return getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
-            if (txnMetaListPair.getLeft().status() == newStatus) {
-                return CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        internalPinnedExecutor.execute(() -> {
+            if (!checkIfReady()) {
+                completableFuture.completeExceptionally(new CoordinatorException
+                        .TransactionMetadataStoreStateException(tcID,
+                        State.Ready, getState(), "update transaction status"));
+                return;
             }
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(txnID.getMostSigBits())
-                    .setTxnidLeastBits(txnID.getLeastSigBits())
-                    .setExpectedStatus(expectedStatus)
-                    .setMetadataOp(TransactionMetadataOp.UPDATE)
-                    .setLastModificationTime(System.currentTimeMillis())
-                    .setNewStatus(newStatus)
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+                if (txnMetaListPair.getLeft().status() == newStatus) {
+                    completableFuture.complete(null);
+                    return;
+                }
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(txnID.getMostSigBits())
+                        .setTxnidLeastBits(txnID.getLeastSigBits())
+                        .setExpectedStatus(expectedStatus)
+                        .setMetadataOp(TransactionMetadataOp.UPDATE)
+                        .setLastModificationTime(System.currentTimeMillis())
+                        .setNewStatus(newStatus)
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-            return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
-                appendLogCount.increment();
-                try {
-                    synchronized (txnMetaListPair.getLeft()) {
-                        txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
-                        txnMetaListPair.getRight().add(position);
+                transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
+                    if (throwable != null) {
+                        completableFuture.completeExceptionally(throwable);
+                        return;
                     }
-                    if (newStatus == TxnStatus.ABORTING && isTimeout) {
-                        this.transactionTimeoutCount.increment();
-                    }
-                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                        return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
-                            this.transactionMetadataStoreStats
-                                    .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                            - txnMetaListPair.getLeft().getOpenTimestamp());
-                            if (newStatus == TxnStatus.COMMITTED) {
-                                committedTransactionCount.increment();
-                            } else {
-                                abortedTransactionCount.increment();
-                            }
-                            txnMetaMap.remove(txnID.getLeastSigBits());
-                            return CompletableFuture.completedFuture(null);
-                        });
+                    appendLogCount.increment();
+                    try {
+                        synchronized (txnMetaListPair.getLeft()) {
+                            txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+                            txnMetaListPair.getRight().add(position);
+                        }
+                        if (newStatus == TxnStatus.ABORTING && isTimeout) {
+                            this.transactionTimeoutCount.increment();
+                        }
+                        if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                            transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
+                                if (exception != null) {
+                                    completableFuture.completeExceptionally(exception);
+                                    return;
+                                }
+                                this.transactionMetadataStoreStats
+                                        .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                - txnMetaListPair.getLeft().getOpenTimestamp());
+                                if (newStatus == TxnStatus.COMMITTED) {
+                                    committedTransactionCount.increment();
+                                } else {
+                                    abortedTransactionCount.increment();
+                                }
+                                txnMetaMap.remove(txnID.getLeastSigBits());
+                                completableFuture.complete(null);
+                            });
+                        }
+                        completableFuture.complete(null);
+                    } catch (InvalidTxnStatusException e) {
+                        transactionLog.deletePosition(Collections.singletonList(position));
+                        log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                + " add update txn status error with TxnStatus : "
+                                + txnMetaListPair.getLeft().status().name(), e);
+                        completableFuture.completeExceptionally(e);
                     }
-                    return CompletableFuture.completedFuture(null);
-                } catch (InvalidTxnStatusException e) {
-                    transactionLog.deletePosition(Collections.singletonList(position));
-                    log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                            + " add update txn status error with TxnStatus : "
-                            + txnMetaListPair.getLeft().status().name(), e);
-                    return FutureUtil.failedFuture(e);
-                }
+                });
             });
         });
+       return completableFuture;
     }
 
     @Override