You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 00:56:46 UTC

[pulsar] branch branch-2.9 updated (e35b561 -> 626b1d3)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from e35b561  Remove unnecessary boxing unboxing (#12790)
     new 5277984  [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)
     new bd0a40c  Fix znode leakage caused by deleting tenant (#12711)
     new 67d210d  Improve exception info for invaild time-related option (#12828)
     new b9629cd  The problem of two exception handling (#12744)
     new cd2c4c1  Fix TopicPoliciesCacheNotInitException issue. (#12773)
     new 012e1b1  [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
     new 28dcdc3  JavaInstanceTest should be AssertEquals (#12836)
     new 1d58b1f  Add error log when new jetty client (#12840)
     new 1869864  Remove unused listeners if it have no listeners. (#12654)
     new a0db36f  Fix deleting tenants with active namespaces with 500. (#12848)
     new 68d5af2  [Authorization] Support GET_BACKLOG_SIZE topic op after enable auth (#12850)
     new 3d7fe33  Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)
     new f3bdaec  [Transaction]stop TC replaying with exception (#12705)
     new 33ff93f  Handle exception double (#12881)
     new 626b1d3  [pulsar-perf]Support listenerThreads configuration. (#12892)

The 15 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   4 +
 conf/standalone.conf                               |   4 +
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    |  59 ++++--------
 .../bookkeeper/mledger/impl/EntryCacheManager.java |  16 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |   8 ++
 .../authorization/PulsarAuthorizationProvider.java |   1 +
 .../pulsar/broker/resources/BaseResources.java     |  72 ++++++++++----
 .../broker/resources/LocalPoliciesResources.java   |   4 +
 .../broker/resources/NamespaceResources.java       |  70 ++++++++++++--
 .../pulsar/broker/resources/TenantResources.java   |   4 +-
 .../broker/TransactionMetadataStoreService.java    |   5 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 100 +++++++------------
 .../pulsar/broker/admin/impl/TenantsBase.java      |  73 ++++++--------
 .../pulsar/broker/service/AbstractTopic.java       |  38 +++++---
 .../SystemTopicBasedTopicPoliciesService.java      |  65 +++++++++----
 .../pulsar/utils/auth/tokens/TokensCliUtils.java   |  10 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 ++
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  90 +++++++++++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  76 ++++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java |  81 +++++++++++++++-
 .../api/AuthorizationProducerConsumerTest.java     |  11 ++-
 .../SchemaCompatibilityCheckTest.java              |  81 ++++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |   2 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  39 +++++++-
 .../pulsar/admin/cli/CmdPersistentTopics.java      |  10 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  32 +++++-
 .../apache/pulsar/admin/cli/CmdTransactions.java   |   9 +-
 .../functions/instance/JavaInstanceTest.java       |   4 +-
 .../pulsar/proxy/server/AdminProxyHandler.java     |   7 +-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 .../pulsar/testclient/PerformanceConsumer.java     |   5 +
 .../pulsar/testclient/PerformanceReader.java       |   5 +
 .../coordinator/impl/MLTransactionLogImpl.java     |  18 +++-
 site2/docs/reference-cli-tools.md                  |   2 +
 site2/website-next/docs/reference-cli-tools.md     |   2 +
 37 files changed, 910 insertions(+), 263 deletions(-)

[pulsar] 03/15: Improve exception info for invaild time-related option (#12828)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 67d210d2594db69a471905b5d70e23f814ff99f3
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Nov 17 16:52:05 2021 +0800

    Improve exception info for invaild time-related option (#12828)
    
    (cherry picked from commit 60f5475740f129d02d3896b31bddf001b9ea6704)
---
 .../pulsar/utils/auth/tokens/TokensCliUtils.java   | 10 ++++--
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 39 +++++++++++++++++++---
 .../pulsar/admin/cli/CmdPersistentTopics.java      | 10 ++++--
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 32 +++++++++++++++---
 .../apache/pulsar/admin/cli/CmdTransactions.java   |  9 +++--
 5 files changed, 84 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
index c089fa0..47364bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/auth/tokens/TokensCliUtils.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.DefaultUsageFormatter;
 import com.beust.jcommander.IUsageFormatter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.google.common.base.Charsets;
 import io.jsonwebtoken.Claims;
@@ -155,8 +156,13 @@ public class TokensCliUtils {
 
             Optional<Date> optExpiryTime = Optional.empty();
             if (expiryTime != null) {
-                long relativeTimeMillis = TimeUnit.SECONDS
-                        .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(expiryTime));
+                long relativeTimeMillis;
+                try {
+                    relativeTimeMillis = TimeUnit.SECONDS.toMillis(
+                            RelativeTimeUtil.parseRelativeTimeInSeconds(expiryTime));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
                 optExpiryTime = Optional.of(new Date(System.currentTimeMillis() + relativeTimeMillis));
             }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a326f82..f9f94c3 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -697,7 +697,12 @@ public class CmdNamespaces extends CmdBase {
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             long sizeLimit = validateSizeString(limitStr);
-            long retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
+            long retentionTimeInSec;
+            try {
+                retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             final int retentionTimeInMin;
             if (retentionTimeInSec != -1) {
@@ -1392,7 +1397,13 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+            long maxInactiveDurationInSeconds;
+            try {
+                maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
                 throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
@@ -1425,7 +1436,13 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+            long delayedDeliveryTimeInMills;
+            try {
+                delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             if (enable == disable) {
                 throw new ParameterException("Need to specify either --enable or --disable");
@@ -1795,7 +1812,13 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            getAdmin().namespaces().setOffloadDeleteLag(namespace, RelativeTimeUtil.parseRelativeTimeInSeconds(lag),
+            long lagInSec;
+            try {
+                lagInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(lag);
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
+            getAdmin().namespaces().setOffloadDeleteLag(namespace, lagInSec,
                     TimeUnit.SECONDS);
         }
     }
@@ -2120,7 +2143,13 @@ public class CmdNamespaces extends CmdBase {
 
             Long offloadAfterElapsedInMillis = OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS;
             if (StringUtils.isNotEmpty(offloadAfterElapsedStr)) {
-                Long offloadAfterElapsed = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+                Long offloadAfterElapsed;
+                try {
+                    offloadAfterElapsed = TimeUnit.SECONDS.toMillis(
+                            RelativeTimeUtil.parseRelativeTimeInSeconds(offloadAfterElapsedStr));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
                 if (positiveCheck("OffloadAfterElapsed", offloadAfterElapsed)
                         && maxValueCheck("OffloadAfterElapsed", offloadAfterElapsed, Long.MAX_VALUE)) {
                     offloadAfterElapsedInMillis = offloadAfterElapsed;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 759b602..611c1b0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.admin.cli;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.CommaParameterSplitter;
 import com.google.gson.Gson;
@@ -528,8 +529,13 @@ public class CmdPersistentTopics extends CmdBase {
                 MessageId messageId = validateMessageIdString(resetMessageIdStr);
                 getPersistentTopics().resetCursor(persistentTopic, subName, messageId);
             } else if (isNotBlank(resetTimeStr)) {
-                long resetTimeInMillis = TimeUnit.SECONDS
-                        .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
+                long resetTimeInMillis;
+                try {
+                    resetTimeInMillis = TimeUnit.SECONDS.toMillis(
+                            RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
                 // now - go back time
                 long timestamp = System.currentTimeMillis() - resetTimeInMillis;
                 getPersistentTopics().resetCursor(persistentTopic, subName, timestamp);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index d0ef10c..e37954d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -792,8 +792,13 @@ public class CmdTopics extends CmdBase {
                     getTopics().resetCursor(persistentTopic, subName, messageId);
                 }
             } else if (isNotBlank(resetTimeStr)) {
-                long resetTimeInMillis = TimeUnit.SECONDS
-                        .toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
+                long resetTimeInMillis;
+                try {
+                    resetTimeInMillis = TimeUnit.SECONDS.toMillis(
+                            RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
+                } catch (IllegalArgumentException exception) {
+                    throw new ParameterException(exception.getMessage());
+                }
                 // now - go back time
                 long timestamp = System.currentTimeMillis() - resetTimeInMillis;
                 getTopics().resetCursor(persistentTopic, subName, timestamp);
@@ -1280,7 +1285,13 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String topicName = validateTopicName(params);
-            long delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+            long delayedDeliveryTimeInMills;
+            try {
+                delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             if (enable == disable) {
                 throw new ParameterException("Need to specify either --enable or --disable");
@@ -1430,7 +1441,12 @@ public class CmdTopics extends CmdBase {
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
             long sizeLimit = validateSizeString(limitStr);
-            long retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
+            long retentionTimeInSec;
+            try {
+                retentionTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(retentionTimeStr);
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             final int retentionTimeInMin;
             if (retentionTimeInSec != -1) {
@@ -2287,7 +2303,13 @@ public class CmdTopics extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            long maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+            long maxInactiveDurationInSeconds;
+            try {
+                maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
 
             if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
                 throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 7faf006..e695381 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.admin.cli;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
@@ -133,8 +134,12 @@ public class CmdTransactions extends CmdBase {
 
         @Override
         void run() throws Exception {
-            long timeout =
-                    TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(timeoutStr));
+            long timeout;
+            try {
+                timeout = TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(timeoutStr));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
             if (coordinatorId != null) {
                 print(getAdmin().transactions().getSlowTransactionsByCoordinatorId(coordinatorId,
                         timeout, TimeUnit.MILLISECONDS));

[pulsar] 05/15: Fix TopicPoliciesCacheNotInitException issue. (#12773)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cd2c4c1de2dcaea74b6e48bfe9b408a57e9be5e8
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Nov 18 13:44:42 2021 +0800

    Fix TopicPoliciesCacheNotInitException issue. (#12773)
    
    ### Motivation
    
    Sometimes, we may get `TopicPoliciesCacheNotInitException` with below stack trace:
    ```
    15:45:47.020 [pulsar-web-41-3] INFO  org.eclipse.jetty.server.RequestLog - 10.0.0.42 - - [10/Nov/2021:15:45:47 +0000] "GET /status.html HTTP/1.1" 200 2 "-" "kube-probe/1.19+" 1
    15:45:51.221 [pulsar-2-15] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to perform getRetention on topic persistent://public/default/UpdateNodeCharts
    java.lang.RuntimeException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
    	at org.apache.pulsar.broker.service.TopicPoliciesService.lambda$getTopicPoliciesAsyncWithRetry$0(TopicPoliciesService.java:84) ~[io.streamnative-pulsar-broker-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.executeWithRetry(RetryUtil.java:50) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at org.apache.pulsar.client.util.RetryUtil.lambda$executeWithRetry$1(RetryUtil.java:63) ~[io.streamnative-pulsar-client-original-2.8.1.21.jar:2.8.1.21]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    ```
    
    This is because : https://github.com/apache/pulsar/blob/c3da1452a444c9599cb85562a3faa82ddfdecec8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L298-L312
    
    when `reader.readNextAsync()` throws exceptions, the msg will be null which will throw NPE without any catch block.
    
    (cherry picked from commit 11298144ac118cda951deffa092ab17110d254b7)
---
 .../broker/TransactionMetadataStoreService.java    |  5 ++-
 .../SystemTopicBasedTopicPoliciesService.java      | 42 ++++++++++++++--------
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 40 +++++++++++++++++++++
 .../coordinator/impl/MLTransactionLogImpl.java     |  8 +++--
 4 files changed, 76 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 607f05e..240c6c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker;
 
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName;
 import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING;
 import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING;
 import com.google.common.annotations.VisibleForTesting;
@@ -63,7 +64,6 @@ import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -230,8 +230,7 @@ public class TransactionMetadataStoreService {
 
     public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
         return pulsarService.getBrokerService()
-                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
-                        .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+                .getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
                             TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
                             TransactionRecoverTracker recoverTracker =
                                     new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 70556826..4dbffec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -160,6 +160,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException {
+        if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) {
+            NamespaceName namespace = topicName.getNamespaceObject();
+            prepareInitPoliciesCache(namespace, new CompletableFuture<>());
+        }
         if (policyCacheInitMap.containsKey(topicName.getNamespaceObject())
                 && !policyCacheInitMap.get(topicName.getNamespaceObject())) {
             throw new TopicPoliciesCacheNotInitException();
@@ -198,24 +202,29 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 result.complete(null);
             } else {
                 ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
-                policyCacheInitMap.put(namespace, false);
-                CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
-                        creatSystemTopicClientWithRetry(namespace);
-                readerCaches.put(namespace, readerCompletableFuture);
-                readerCompletableFuture.whenComplete((reader, ex) -> {
-                    if (ex != null) {
-                        log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
-                        result.completeExceptionally(ex);
-                    } else {
-                        initPolicesCache(reader, result);
-                        result.thenRun(() -> readMorePolicies(reader));
-                    }
-                });
+                prepareInitPoliciesCache(namespace, result);
             }
         }
         return result;
     }
 
+    private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) {
+        if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
+            CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
+                    creatSystemTopicClientWithRetry(namespace);
+            readerCaches.put(namespace, readerCompletableFuture);
+            readerCompletableFuture.whenComplete((reader, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
+                    result.completeExceptionally(ex);
+                } else {
+                    initPolicesCache(reader, result);
+                    result.thenRun(() -> readMorePolicies(reader));
+                }
+            });
+        }
+    }
+
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
         SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
@@ -284,6 +293,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                         reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                reader.closeAsync();
+                return;
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
@@ -292,6 +304,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 reader.getSystemTopic().getTopicName(), ex);
                         future.completeExceptionally(e);
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        policyCacheInitMap.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
+                        reader.closeAsync();
+                        return;
                     }
                     refreshTopicPoliciesCache(msg);
                     if (log.isDebugEnabled()) {
@@ -306,7 +321,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 }
                 policyCacheInitMap.computeIfPresent(
                         reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
-
                 // replay policy message
                 policiesCache.forEach(((topicName, topicPolicies) -> {
                     if (listeners.get(topicName) != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 49d8699..80f3dc9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -32,8 +32,12 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
@@ -279,4 +283,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
 
         assertEquals(reader1, reader);
     }
+
+    @Test
+    public void testGetTopicPoliciesWithRetry() throws Exception {
+        Field initMapField = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("policyCacheInitMap");
+        initMapField.setAccessible(true);
+        Map<NamespaceName, Boolean> initMap = (Map)initMapField.get(systemTopicBasedTopicPoliciesService);
+        initMap.remove(NamespaceName.get(NAMESPACE1));
+        Field readerCaches = SystemTopicBasedTopicPoliciesService.class.getDeclaredField("readerCaches");
+        readerCaches.setAccessible(true);
+        Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readers = (Map)readerCaches.get(systemTopicBasedTopicPoliciesService);
+        readers.remove(NamespaceName.get(NAMESPACE1));
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(500, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
+                .setMax(1000, TimeUnit.MILLISECONDS)
+                .create();
+        TopicPolicies initPolicy = TopicPolicies.builder()
+                .maxConsumerPerTopic(10)
+                .build();
+        ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
+        executors.schedule(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
+                } catch (Exception ignore) {}
+            }
+        }, 2000, TimeUnit.MILLISECONDS);
+        Awaitility.await().untilAsserted(() -> {
+            Optional<TopicPolicies> topicPolicies = systemTopicBasedTopicPoliciesService.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get();
+            Assert.assertTrue(topicPolicies.isPresent());
+            if (topicPolicies.isPresent()) {
+                Assert.assertEquals(topicPolicies.get(), initPolicy);
+            }
+        });
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index f2324af..c044275 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -73,8 +73,7 @@ public class MLTransactionLogImpl implements TransactionLog {
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
                                 ManagedLedgerConfig managedLedgerConfig) {
-        this.topicName = TopicName.get(TopicDomain.persistent.value(),
-                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
+        this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
         managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
@@ -83,6 +82,11 @@ public class MLTransactionLogImpl implements TransactionLog {
         this.entryQueue = new SpscArrayQueue<>(2000);
     }
 
+    public static TopicName getMLTransactionLogName(TransactionCoordinatorID tcID) {
+        return TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX + tcID.getId());
+    }
+
     @Override
     public CompletableFuture<Void> initialize() {
         CompletableFuture<Void> future = new CompletableFuture<>();

[pulsar] 11/15: [Authorization] Support GET_BACKLOG_SIZE topic op after enable auth (#12850)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 68d5af20befbff5b87772efe3041cd2192778b94
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Fri Nov 19 22:55:57 2021 +0800

    [Authorization] Support GET_BACKLOG_SIZE topic op after enable auth (#12850)
    
    (cherry picked from commit 5eb43eac8e9d238963f29be4184ac90e95a04523)
---
 .../broker/authorization/PulsarAuthorizationProvider.java     |  1 +
 .../pulsar/client/api/AuthorizationProducerConsumerTest.java  | 11 ++++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b50d7de..61d77a7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -581,6 +581,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
             case EXPIRE_MESSAGES:
             case PEEK_MESSAGES:
             case RESET_CURSOR:
+            case GET_BACKLOG_SIZE:
             case SET_REPLICATED_SUBSCRIPTION_STATUS:
                 isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription());
                 break;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 5eeac53..c9d76b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -218,6 +218,13 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
             assertTrue(e.getMessage().startsWith(
                     "Unauthorized to validateTopicOperation for operation [GET_STATS]"));
         }
+        try {
+            sub1Admin.topics().getBacklogSizeByMessageId(topicName, MessageId.earliest);
+            fail("should have failed with authorization exception");
+        } catch (Exception e) {
+            assertTrue(e.getMessage().startsWith(
+                    "Unauthorized to validateTopicOperation for operation"));
+        }
 
         // grant topic consume authorization to the subscriptionRole
         tenantAdmin.topics().grantPermission(topicName, subscriptionRole,
@@ -239,8 +246,10 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         assertEquals(subscriptions.size(), 2);
 
         // now, subscriptionRole have consume authorization on topic, so it will successfully get topic internal stats
-        PersistentTopicInternalStats internalStats = superAdmin.topics().getInternalStats(topicName, true);
+        PersistentTopicInternalStats internalStats = sub1Admin.topics().getInternalStats(topicName, true);
         assertNotNull(internalStats);
+        Long backlogSize = sub1Admin.topics().getBacklogSizeByMessageId(topicName, MessageId.earliest);
+        assertEquals(backlogSize.longValue(), 0);
 
         // verify tenant is able to perform all subscription-admin api
         tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);

[pulsar] 12/15: Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3d7fe33d7d52446185d1e928cb158918119a11a6
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Nov 19 07:35:47 2021 +0800

    Remove readerCaches and close reader when exception occurs in SystemTopicBasedTopicPoliciesService. (#12873)
    
    (cherry picked from commit bcc219b5308379fb354b32eb858b4f54868721d5)
---
 .../pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java     | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 902784e..12ba88e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -218,6 +218,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 if (ex != null) {
                     log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
                     result.completeExceptionally(ex);
+                    readerCaches.remove(namespace);
+                    reader.closeAsync();
                 } else {
                     initPolicesCache(reader, result);
                     result.thenRun(() -> readMorePolicies(reader));

[pulsar] 10/15: Fix deleting tenants with active namespaces with 500. (#12848)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a0db36fd5c54e05180709f19317c0dfcdbb31063
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Nov 19 22:56:42 2021 +0800

    Fix deleting tenants with active namespaces with 500. (#12848)
    
    (cherry picked from commit 1646be2e29ed1109cc76b11081e829f30809362d)
---
 .../java/org/apache/pulsar/broker/resources/TenantResources.java  | 4 ++--
 .../java/org/apache/pulsar/broker/admin/impl/TenantsBase.java     | 6 +++++-
 .../test/java/org/apache/pulsar/broker/admin/AdminApiTest.java    | 8 ++++++++
 3 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
index 9582057..36c88cf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TenantResources.java
@@ -134,7 +134,7 @@ public class TenantResources extends BaseResources<TenantInfo> {
                             }
                             if (children != null && !children.isEmpty()) {
                                 checkNs.completeExceptionally(
-                                        new IllegalStateException("Tenant has active namespace"));
+                                        new IllegalStateException("The tenant still has active namespaces"));
                                 return;
                             }
                             String namespace = NamespaceName.get(tenant, clusterOrNamespace).toString();
@@ -145,7 +145,7 @@ public class TenantResources extends BaseResources<TenantInfo> {
                             getAsync(joinPath(BASE_POLICIES_PATH, namespace)).thenApply(data -> {
                                 if (data.isPresent()) {
                                     checkNs.completeExceptionally(new IllegalStateException(
-                                            "Tenant has active namespace"));
+                                            "The tenant still has active namespaces"));
                                 } else {
                                     checkNs.complete(null);
                                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index c5f5402..209a532 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -268,7 +268,11 @@ public class TenantsBase extends PulsarWebResource {
                     .whenComplete((ignore, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex);
-                            asyncResponse.resume(new RestException(ex));
+                            if (ex.getCause() instanceof IllegalStateException) {
+                                asyncResponse.resume(new RestException(Status.CONFLICT, ex.getCause()));
+                            } else {
+                                asyncResponse.resume(new RestException(ex));
+                            }
                         } else {
                             log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
                             asyncResponse.resume(Response.noContent().build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 2c3f704..3e87f13 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -670,6 +670,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.tenants().getTenantInfo("prop-xyz"), newTenantAdmin);
 
+        try {
+            admin.tenants().deleteTenant("prop-xyz");
+            fail("should have failed");
+        } catch (PulsarAdminException e) {
+            assertTrue(e instanceof ConflictException);
+            assertEquals(e.getStatusCode(), 409);
+            assertEquals(e.getMessage(), "The tenant still has active namespaces");
+        }
         admin.namespaces().deleteNamespace("prop-xyz/ns1");
         admin.tenants().deleteTenant("prop-xyz");
         assertEquals(admin.tenants().getTenants(), Lists.newArrayList());

[pulsar] 08/15: Add error log when new jetty client (#12840)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1d58b1fd86d79408e543d5d8df3996fd6e1156d7
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Thu Nov 18 16:05:23 2021 +0800

    Add error log when new jetty client (#12840)
    
    (cherry picked from commit c18063f44684052ef07f0259084743a7a45e5656)
---
 .../java/org/apache/pulsar/proxy/server/AdminProxyHandler.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 13f9372..7d3c658 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -256,7 +256,7 @@ class AdminProxyHandler extends ProxyServlet {
 
             if (config.isTlsEnabledWithBroker()) {
                 try {
-                    X509Certificate trustCertificates[] = SecurityUtility
+                    X509Certificate[] trustCertificates = SecurityUtility
                         .loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());
 
                     SSLContext sslCtx;
@@ -281,6 +281,7 @@ class AdminProxyHandler extends ProxyServlet {
 
                     return new JettyHttpClient(contextFactory);
                 } catch (Exception e) {
+                    LOG.error("new jetty http client exception ", e);
                     try {
                         auth.close();
                     } catch (IOException ioe) {
@@ -303,7 +304,7 @@ class AdminProxyHandler extends ProxyServlet {
 
         boolean isFunctionsRestRequest = false;
         String requestUri = request.getRequestURI();
-        for (String routePrefix: functionRoutes) {
+        for (String routePrefix : functionRoutes) {
             if (requestUri.startsWith(routePrefix)) {
                 isFunctionsRestRequest = true;
                 break;
@@ -324,7 +325,7 @@ class AdminProxyHandler extends ProxyServlet {
 
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("[{}:{}] Selected active broker is {}", request.getRemoteAddr(), request.getRemotePort(),
-                            url.toString());
+                            url);
                 }
             } catch (Exception e) {
                 LOG.warn("[{}:{}] Failed to get next active broker {}", request.getRemoteAddr(),

[pulsar] 02/15: Fix znode leakage caused by deleting tenant (#12711)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bd0a40ce06fb4077a9f0e45ad59a9f489d4ddc1e
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Wed Nov 17 17:00:24 2021 +0800

    Fix znode leakage caused by deleting tenant (#12711)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    Fixes #12710
    
    ### Motivation
    
    1. According to the previous code logic, if the tenant resource is not forcibly deleted, the zk-node resource will be leaked, because under the condition of **!force**, the `clearTenantPersistence` code logic will not be called.
    
    ```
        protected void internalDeleteTenant(AsyncResponse asyncResponse, String tenant, boolean force) {
            if (force) {
                internalDeleteTenantForcefully(asyncResponse, tenant);
            } else {
                internalDeleteTenant(asyncResponse, tenant);
            }
        }
    ```
    2. Clear  resource of `/namespace/xxx` z-node
    
    (cherry picked from commit f68bec4d2a921de00730fce21dd6a78121f7dd3d)
---
 .../pulsar/broker/resources/BaseResources.java     |  72 +++++++++++----
 .../broker/resources/LocalPoliciesResources.java   |   4 +
 .../broker/resources/NamespaceResources.java       |  70 +++++++++++++--
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 100 +++++++--------------
 .../pulsar/broker/admin/impl/TenantsBase.java      |  69 ++++++--------
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  90 +++++++++++++++++++
 6 files changed, 273 insertions(+), 132 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 1e463fa..a6972cd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -181,37 +182,70 @@ public class BaseResources<T> {
         return sb.toString();
     }
 
+    protected static CompletableFuture<Void> deleteRecursiveAsync(BaseResources resources, final String pathRoot) {
+        PathUtils.validatePath(pathRoot);
 
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        listSubTreeBFSAsync(resources, pathRoot).whenComplete((tree, ex) -> {
+            if (ex == null) {
+                log.debug("Deleting {} with size {}", tree, tree.size());
+
+                final List<CompletableFuture<Void>> futures = new ArrayList<>();
+                for (int i = tree.size() - 1; i >= 0; --i) {
+                    // Delete the leaves first and eventually get rid of the root
+                    futures.add(resources.deleteAsync(tree.get(i)));
+                }
+
+                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    if (exception != null) {
+                        log.error("Failed to remove partitioned topics", exception);
+                        return completableFuture.completeExceptionally(exception.getCause());
+                    }
+                    return completableFuture.complete(null);
+                });
+            } else {
+                log.warn("Failed to delete partitioned topics z-node [{}]", pathRoot, ex.getCause());
+            }
+        });
 
-    protected static void deleteRecursive(BaseResources resources, final String pathRoot) throws MetadataStoreException {
-        PathUtils.validatePath(pathRoot);
-        List<String> tree = listSubTreeBFS(resources, pathRoot);
-        log.debug("Deleting {} with size {}", tree, tree.size());
-        log.debug("Deleting " + tree.size() + " subnodes ");
-        for (int i = tree.size() - 1; i >= 0; --i) {
-            // Delete the leaves first and eventually get rid of the root
-            resources.delete(tree.get(i));
-        }
+        return completableFuture;
     }
 
-    protected static List<String> listSubTreeBFS(BaseResources resources, final String pathRoot)
-            throws MetadataStoreException {
+    protected static CompletableFuture<List<String>> listSubTreeBFSAsync(BaseResources resources,
+            final String pathRoot) {
         Deque<String> queue = new LinkedList<>();
         List<String> tree = new ArrayList<>();
         queue.add(pathRoot);
         tree.add(pathRoot);
-        while (true) {
+        CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
+        final List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (int i = 0; i < queue.size(); i++) {
             String node = queue.pollFirst();
             if (node == null) {
                 break;
             }
-            List<String> children = resources.getChildren(node);
-            for (final String child : children) {
-                final String childPath = node + "/" + child;
-                queue.add(childPath);
-                tree.add(childPath);
-            }
+            futures.add(resources.getChildrenAsync(node)
+                    .whenComplete((children, ex) -> {
+                        if (ex == null) {
+                            for (final String child : (List<String>) children) {
+                                final String childPath = node + "/" + child;
+                                queue.add(childPath);
+                                tree.add(childPath);
+                            }
+                        } else {
+                            log.warn("Failed to get data error from z-node [{}]", node);
+                        }
+                    }));
         }
-        return tree;
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                log.error("Failed to get partitioned topics", exception);
+                return completableFuture.completeExceptionally(exception.getCause());
+            }
+            return completableFuture.complete(tree);
+        });
+
+        return completableFuture;
     }
 }
\ No newline at end of file
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
index 0023e5f..29a6a46 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java
@@ -79,6 +79,10 @@ public class LocalPoliciesResources extends BaseResources<LocalPolicies> {
         delete(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
     }
 
+    public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
+        return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
+    }
+
     public static boolean isLocalPoliciesPath(String path) {
         return path.startsWith(LOCAL_POLICIES_ROOT);
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index dd33ee8..190d251 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -26,7 +26,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.Getter;
-
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -39,9 +38,14 @@ import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Getter
 public class NamespaceResources extends BaseResources<Policies> {
+    private static final Logger log = LoggerFactory.getLogger(NamespaceResources.class);
+
     private final IsolationPolicyResources isolationPolicies;
     private final PartitionedTopicResources partitionedTopicResources;
     private final MetadataStore configurationStore;
@@ -49,6 +53,7 @@ public class NamespaceResources extends BaseResources<Policies> {
     private final MetadataCache<LocalPolicies> localPoliciesCache;
 
     private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
+    private static final String NAMESPACE_BASE_PATH = "/namespace";
 
     public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
         super(configurationStore, Policies.class, operationTimeoutSec);
@@ -97,6 +102,10 @@ public class NamespaceResources extends BaseResources<Policies> {
         delete(joinPath(BASE_POLICIES_PATH, ns.toString()));
     }
 
+    public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
+        return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
+    }
+
     public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreException{
         return get(joinPath(BASE_POLICIES_PATH, ns.toString()));
     }
@@ -122,6 +131,40 @@ public class NamespaceResources extends BaseResources<Policies> {
                 && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
     }
 
+    // clear resource of `/namespace/{namespaceName}` for zk-node
+    public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
+        final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
+        CompletableFuture<Void> future = new CompletableFuture<Void>();
+        deleteAsync(namespacePath).whenComplete((ignore, ex) -> {
+            if (ex != null && ex.getCause().getCause() instanceof KeeperException.NoNodeException) {
+                future.complete(null);
+            } else if (ex != null) {
+                future.completeExceptionally(ex);
+            } else {
+                future.complete(null);
+            }
+        });
+
+        return future;
+    }
+
+    // clear resource of `/namespace/{tenant}` for zk-node
+    public CompletableFuture<Void> deleteTenantAsync(String tenant) {
+        final String tenantPath = joinPath(NAMESPACE_BASE_PATH, tenant);
+        CompletableFuture<Void> future = new CompletableFuture<Void>();
+        deleteAsync(tenantPath).whenComplete((ignore, ex) -> {
+            if (ex != null && ex.getCause().getCause() instanceof KeeperException.NoNodeException) {
+                future.complete(null);
+            } else if (ex != null) {
+                future.completeExceptionally(ex);
+            } else {
+                future.complete(null);
+            }
+        });
+
+        return future;
+    }
+
     public static NamespaceName namespaceFromPath(String path) {
         return NamespaceName.get(path.substring(BASE_POLICIES_PATH.length() + 1));
     }
@@ -215,12 +258,27 @@ public class NamespaceResources extends BaseResources<Policies> {
                     tn.getEncodedLocalName()));
         }
 
-        public void clearPartitionedTopicMetadata(NamespaceName namespaceName) throws MetadataStoreException {
+        public CompletableFuture<Void> clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
             final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
-            // check whether partitioned topics metadata node exist
-            if (exists(globalPartitionedPath)) {
-                deleteRecursive(this, globalPartitionedPath);
-            }
+
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+
+            deleteRecursiveAsync(this, globalPartitionedPath)
+                    .thenAccept(ignore -> {
+                        log.info("Clear partitioned topic metadata [{}] success.", namespaceName);
+                        completableFuture.complete(null);
+                    }).exceptionally(ex -> {
+                if (ex.getCause().getCause() instanceof KeeperException.NoNodeException) {
+                    completableFuture.complete(null);
+                } else {
+                    log.error("Clear partitioned topic metadata failed.");
+                    completableFuture.completeExceptionally(ex.getCause());
+                    return null;
+                }
+                return null;
+            });
+
+            return completableFuture;
         }
     }
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index dd02f67..87c091a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -298,46 +298,41 @@ public abstract class NamespacesBase extends AdminResource {
                 }
             }
 
-            try {
-                namespaceResources().getPartitionedTopicResources().clearPartitionedTopicMetadata(namespaceName);
-
-                try {
-                    pulsar().getPulsarResources().getTopicResources()
-                            .clearDomainPersistence(namespaceName).get();
-                    pulsar().getPulsarResources().getTopicResources()
-                            .clearNamespacePersistence(namespaceName).get();
-                } catch (ExecutionException | InterruptedException e) {
-                    // warn level log here since this failure has no side effect besides left a un-used metadata
-                    // and also will not affect the re-creation of namespace
-                    log.warn("[{}] Failed to remove managed-ledger for {}", clientAppId(), namespaceName, e);
-                }
-
-                // we have successfully removed all the ownership for the namespace, the policies znode can be deleted
-                // now
-                namespaceResources().deletePolicies(namespaceName);
-
-                try {
-                    namespaceResources().deletePolicies(namespaceName);
-                } catch (NotFoundException e) {
-                    // If the node with the modified information is not there anymore, we're already good
-                }
-
-                try {
-                    getLocalPolicies().deleteLocalPolicies(namespaceName);
-                } catch (NotFoundException nne) {
-                    // If the z-node with the modified information is not there anymore, we're already good
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to remove owned namespace {} from metadata", clientAppId(), namespaceName, e);
-                asyncResponse.resume(new RestException(e));
-                return null;
-            }
+            internalClearZkSources(asyncResponse);
 
-            asyncResponse.resume(Response.noContent().build());
             return null;
         });
     }
 
+    // clear zk-node resources for deleting namespace
+    protected void internalClearZkSources(AsyncResponse asyncResponse) {
+        // clear resource of `/namespace/{namespaceName}` for zk-node
+        namespaceResources().deleteNamespaceAsync(namespaceName)
+                .thenCompose(ignore -> namespaceResources().getPartitionedTopicResources()
+                        .clearPartitionedTopicMetadataAsync(namespaceName))
+                // clear resource for manager-ledger z-node
+                .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources()
+                        .clearDomainPersistence(namespaceName))
+                .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources()
+                        .clearNamespacePersistence(namespaceName))
+                // we have successfully removed all the ownership for the namespace, the policies
+                // z-node can be deleted now
+                .thenCompose(ignore -> namespaceResources().deletePoliciesAsync(namespaceName))
+                // clear z-node of local policies
+                .thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
+                .whenComplete((ignore, ex) -> {
+                    if (ex != null) {
+                        log.warn("[{}] Failed to remove namespace or managed-ledger for {}",
+                                clientAppId(), namespaceName, ex);
+                        asyncResponse.resume(new RestException(ex));
+                    } else {
+                        log.info("[{}] Remove namespace or managed-ledger successfully {}",
+                                clientAppId(), namespaceName);
+                        asyncResponse.resume(Response.noContent().build());
+                    }
+                });
+    }
+
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, boolean authoritative) {
         validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
@@ -474,7 +469,7 @@ public abstract class NamespacesBase extends AdminResource {
                 }
             }
         } catch (Exception e) {
-            log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to remove forcefully owned namespace {}", clientAppId(), namespaceName, e);
             asyncResponse.resume(new RestException(e));
             return;
         }
@@ -485,42 +480,15 @@ public abstract class NamespacesBase extends AdminResource {
                     asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
                     return null;
                 } else {
-                    log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception);
+                    log.error("[{}] Failed to remove forcefully owned namespace {}",
+                            clientAppId(), namespaceName, exception);
                     asyncResponse.resume(new RestException(exception.getCause()));
                     return null;
                 }
             }
 
-            try {
-                // remove partitioned topics znode
-                pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                        .clearPartitionedTopicMetadata(namespaceName);
-
-                try {
-                    pulsar().getPulsarResources().getTopicResources().clearDomainPersistence(namespaceName).get();
-                    pulsar().getPulsarResources().getTopicResources().clearNamespacePersistence(namespaceName).get();
-                } catch (ExecutionException | InterruptedException e) {
-                    // warn level log here since this failure has no side effect besides left a un-used metadata
-                    // and also will not affect the re-creation of namespace
-                    log.warn("[{}] Failed to remove managed-ledger for {}", clientAppId(), namespaceName, e);
-                }
-
-                // we have successfully removed all the ownership for the namespace, the policies znode can be deleted
-                // now
-                namespaceResources().deletePolicies(namespaceName);
+            internalClearZkSources(asyncResponse);
 
-                try {
-                    getLocalPolicies().deleteLocalPolicies(namespaceName);
-                } catch (NotFoundException nne) {
-                    // If the z-node with the modified information is not there anymore, we're already good
-                }
-            } catch (Exception e) {
-                log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e);
-                asyncResponse.resume(new RestException(e));
-                return null;
-            }
-
-            asyncResponse.resume(Response.noContent().build());
             return null;
         });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index 978e59e..c5f5402 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.admin.impl;
 
 import com.google.common.collect.Lists;
@@ -28,7 +29,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -60,8 +60,8 @@ public class TenantsBase extends PulsarWebResource {
 
     @GET
     @ApiOperation(value = "Get the list of existing tenants.", response = String.class, responseContainer = "List")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "Tenant doesn't exist") })
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "Tenant doesn't exist")})
     public void getTenants(@Suspended final AsyncResponse asyncResponse) {
         final String clientAppId = clientAppId();
         try {
@@ -86,8 +86,8 @@ public class TenantsBase extends PulsarWebResource {
     @GET
     @Path("/{tenant}")
     @ApiOperation(value = "Get the admin configuration for a given tenant.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "Tenant does not exist") })
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            @ApiResponse(code = 404, message = "Tenant does not exist")})
     public void getTenantAdmin(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant) {
         final String clientAppId = clientAppId();
@@ -112,11 +112,11 @@ public class TenantsBase extends PulsarWebResource {
     @PUT
     @Path("/{tenant}")
     @ApiOperation(value = "Create a new tenant.", notes = "This operation requires Pulsar super-user privileges.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
             @ApiResponse(code = 412, message = "Tenant name is not valid"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
-            @ApiResponse(code = 412, message = "Clusters do not exist") })
+            @ApiResponse(code = 412, message = "Clusters do not exist")})
     public void createTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl tenantInfo) {
@@ -153,7 +153,7 @@ public class TenantsBase extends PulsarWebResource {
                     return;
                 }
             }
-            tenantResources().tenantExistsAsync(tenant).thenAccept(exist ->{
+            tenantResources().tenantExistsAsync(tenant).thenAccept(exist -> {
                 if (exist) {
                     asyncResponse.resume(new RestException(Status.CONFLICT, "Tenant already exist"));
                     return;
@@ -177,12 +177,12 @@ public class TenantsBase extends PulsarWebResource {
     @POST
     @Path("/{tenant}")
     @ApiOperation(value = "Update the admins for a tenant.",
-    notes = "This operation requires Pulsar super-user privileges.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+            notes = "This operation requires Pulsar super-user privileges.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 409, message = "Tenant already exists"),
             @ApiResponse(code = 412, message = "Clusters can not be empty"),
-            @ApiResponse(code = 412, message = "Clusters do not exist") })
+            @ApiResponse(code = 412, message = "Clusters do not exist")})
     public void updateTenant(@Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "The tenant name") @PathParam("tenant") String tenant,
             @ApiParam(value = "TenantInfo") TenantInfoImpl newTenantAdmin) {
@@ -227,10 +227,10 @@ public class TenantsBase extends PulsarWebResource {
     @DELETE
     @Path("/{tenant}")
     @ApiOperation(value = "Delete a tenant and all namespaces and topics under it.")
-    @ApiResponses(value = { @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 404, message = "Tenant does not exist"),
             @ApiResponse(code = 405, message = "Broker doesn't allow forced deletion of tenants"),
-            @ApiResponse(code = 409, message = "The tenant still has active namespaces") })
+            @ApiResponse(code = 409, message = "The tenant still has active namespaces")})
     public void deleteTenant(@Suspended final AsyncResponse asyncResponse,
             @PathParam("tenant") @ApiParam(value = "The tenant name") String tenant,
             @QueryParam("force") @DefaultValue("false") boolean force) {
@@ -258,27 +258,22 @@ public class TenantsBase extends PulsarWebResource {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, "Tenant doesn't exist"));
                 return null;
             }
-            return hasActiveNamespace(tenant).thenAccept(ns -> {
-                try {
-                    tenantResources().deleteTenantAsync(tenant)
-                            .thenAccept(t -> {
-                                log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
-                                asyncResponse.resume(Response.noContent().build());
-                            }).exceptionally(ex -> {
-                        log.error("Failed to delete tenant {}", tenant, ex.getCause());
-                        asyncResponse.resume(new RestException(ex));
-                        return null;
+
+            return hasActiveNamespace(tenant)
+                    .thenCompose(ignore -> tenantResources().deleteTenantAsync(tenant))
+                    .thenCompose(ignore -> pulsar().getPulsarResources().getTopicResources()
+                            .clearTenantPersistence(tenant))
+                    .thenCompose(ignore -> pulsar().getPulsarResources().getNamespaceResources()
+                            .deleteTenantAsync(tenant))
+                    .whenComplete((ignore, ex) -> {
+                        if (ex != null) {
+                            log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, ex);
+                            asyncResponse.resume(new RestException(ex));
+                        } else {
+                            log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
+                            asyncResponse.resume(Response.noContent().build());
+                        }
                     });
-                    log.info("[{}] Deleted tenant {}", clientAppId(), tenant);
-                } catch (Exception e) {
-                    log.error("[{}] Failed to delete tenant {}", clientAppId(), tenant, e);
-                    asyncResponse.resume(new RestException(e));
-                }
-            }).exceptionally(ex -> {
-                log.error("Failed to delete tenant due to active namespace {}", tenant, ex.getCause());
-                asyncResponse.resume(new RestException(ex));
-                return null;
-            });
         });
     }
 
@@ -319,14 +314,6 @@ public class TenantsBase extends PulsarWebResource {
                 return null;
             }
 
-
-            try {
-                pulsar().getPulsarResources().getTopicResources().clearTenantPersistence(tenant).get();
-            } catch (ExecutionException | InterruptedException e) {
-                // warn level log here since this failure has no side effect besides left a un-used metadata
-                // and also will not affect the re-creation of tenant
-                log.warn("[{}] Failed to remove managed-ledger for {}", clientAppId(), tenant, e);
-            }
             // delete tenant normally
             internalDeleteTenant(asyncResponse, tenant);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 33efa2b..bc1ab25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1267,6 +1267,96 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 9);
     }
 
+    @Test
+    public void testDeleteTenant() throws Exception {
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        String tenant = "test-tenant-1";
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        // create tenant
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
+        assertTrue(admin.tenants().getTenants().contains(tenant));
+
+        // create namespace
+        String namespace = tenant + "/test-ns-1";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertEquals(admin.namespaces().getNamespaces(tenant), Lists.newArrayList(namespace));
+
+        // create topic
+        String topic = namespace + "/test-topic-1";
+        admin.topics().createPartitionedTopic(topic, 10);
+        assertFalse(admin.topics().getList(namespace).isEmpty());
+
+        try {
+            admin.namespaces().deleteNamespace(namespace, false);
+            fail("should have failed due to namespace not empty");
+        } catch (PulsarAdminException e) {
+            // Expected: cannot delete non-empty tenant
+        }
+
+        // delete topic
+        admin.topics().deletePartitionedTopic(topic);
+        assertTrue(admin.topics().getList(namespace).isEmpty());
+
+        // delete namespace
+        admin.namespaces().deleteNamespace(namespace, false);
+        assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+        // delete tenant
+        admin.tenants().deleteTenant(tenant);
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        final String managedLedgersPath = "/managed-ledgers/" + tenant;
+        assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+    }
+
+    @Test
+    public void testDeleteNamespace() throws Exception {
+        pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+        String tenant = "test-tenant";
+        assertFalse(admin.tenants().getTenants().contains(tenant));
+
+        // create tenant
+        admin.tenants().createTenant(tenant,
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")));
+        assertTrue(admin.tenants().getTenants().contains(tenant));
+
+        // create namespace
+        String namespace = tenant + "/test-ns";
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertEquals(admin.namespaces().getNamespaces(tenant), Lists.newArrayList(namespace));
+
+        // create topic
+        String topic = namespace + "/test-topic";
+        admin.topics().createPartitionedTopic(topic, 10);
+        assertFalse(admin.topics().getList(namespace).isEmpty());
+
+        try {
+            admin.namespaces().deleteNamespace(namespace, false);
+            fail("should have failed due to namespace not empty");
+        } catch (PulsarAdminException e) {
+            // Expected: cannot delete non-empty tenant
+        }
+
+        // delete topic
+        admin.topics().deletePartitionedTopic(topic);
+        assertTrue(admin.topics().getList(namespace).isEmpty());
+
+        // delete namespace
+        admin.namespaces().deleteNamespace(namespace, false);
+        assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
+        assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
+
+
+        final String managedLedgersPath = "/managed-ledgers/" + namespace;
+        assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+    }
+
+
     @Test(timeOut = 30000)
     public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {
         final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-" + UUID.randomUUID().toString();

[pulsar] 09/15: Remove unused listeners if it have no listeners. (#12654)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 18698648d0f62e89262a7cc525f2d6e380afde62
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Fri Nov 19 18:38:45 2021 +0800

    Remove unused listeners if it have no listeners. (#12654)
    
    (cherry picked from commit d74af88a6aed5a7da3139a4228ae29f793ec72b2)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 21 +++++++++++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 36 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4dbffec..902784e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -69,7 +69,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     @VisibleForTesting
     final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
 
-    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
 
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
@@ -472,12 +473,26 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     @Override
     public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).add(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners == null) {
+                topicListeners = Lists.newCopyOnWriteArrayList();
+            }
+            topicListeners.add(listener);
+            return topicListeners;
+        });
     }
 
     @Override
     public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
-        listeners.computeIfAbsent(topicName, k -> Lists.newCopyOnWriteArrayList()).remove(listener);
+        listeners.compute(topicName, (k, topicListeners) -> {
+            if (topicListeners != null){
+                topicListeners.remove(listener);
+                if (topicListeners.isEmpty()) {
+                    topicListeners = null;
+                }
+            }
+            return topicListeners;
+        });
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 80f3dc9..be52f9a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -76,7 +76,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
     private static final TopicName TOPIC5 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-1");
     private static final TopicName TOPIC6 = TopicName.get("persistent", NamespaceName.get(NAMESPACE3), "topic-2");
 
-    private NamespaceEventsSystemTopicFactory systemTopicFactory;
     private SystemTopicBasedTopicPoliciesService systemTopicBasedTopicPoliciesService;
 
     @BeforeMethod(alwaysRun = true)
@@ -95,6 +94,40 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
     }
 
     @Test
+    public void testConcurrentlyRegisterUnregisterListeners() throws ExecutionException, InterruptedException {
+        TopicName topicName = TopicName.get("test");
+        class TopicPolicyListenerImpl implements TopicPolicyListener<TopicPolicies> {
+
+            @Override
+            public void onUpdate(TopicPolicies data) {
+                //no op.
+            }
+        }
+
+        CompletableFuture<Void> f = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
+            for (int i = 0; i < 100; i++) {
+                TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+                systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+                Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+                Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+                systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+            }
+        });
+
+        for (int i = 0; i < 100; i++) {
+            TopicPolicyListener<TopicPolicies> listener = new TopicPolicyListenerImpl();
+            systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+            Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
+            Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
+            systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
+        }
+
+        f.get();
+        //Some system topics will be added to the listeners. Just check if it contains topicName.
+        Assert.assertFalse(systemTopicBasedTopicPoliciesService.listeners.containsKey(topicName));
+    }
+
+    @Test
     public void testGetPolicy() throws ExecutionException, InterruptedException, TopicPoliciesCacheNotInitException {
 
         // Init topic policies
@@ -239,7 +272,6 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         admin.lookups().lookupTopic(TOPIC4.toString());
         admin.lookups().lookupTopic(TOPIC5.toString());
         admin.lookups().lookupTopic(TOPIC6.toString());
-        systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }
 

[pulsar] 13/15: [Transaction]stop TC replaying with exception (#12705)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3bdaec1f38142d1e7e624424a924769b7382594
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Nov 20 00:15:52 2021 +0800

    [Transaction]stop TC replaying with exception (#12705)
    
    When MLTransactionLogImpl replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLTransactionLogImpl will not stop recovering and continue to report the exception.
    
    End replaying when there is no ledger to read or the managerLedger is fenced.
    
    (cherry picked from commit 06f1a91c05d1e11cd9ce8c85e042224e57495390)
---
 .../pulsar/broker/transaction/TransactionTest.java | 81 ++++++++++++++++++++--
 .../coordinator/impl/MLTransactionLogImpl.java     | 10 ++-
 2 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index a237314..e4975d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -21,6 +21,12 @@ package org.apache.pulsar.broker.transaction;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -28,6 +34,8 @@ import static org.testng.Assert.fail;
 
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -37,6 +45,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -73,6 +83,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -411,11 +427,11 @@ public class TransactionTest extends TransactionTestBase {
     }
 
     @Test
-    public void testMaxReadPositionForNormalPublish() throws Exception{
+    public void testMaxReadPositionForNormalPublish() throws Exception {
         String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
         admin.topics().createNonPartitionedTopic(topic);
         PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
-                  .getTopic(topic, false).get().get();
+                .getTopic(topic, false).get().get();
 
         TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
         PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
@@ -443,7 +459,7 @@ public class TransactionTest extends TransactionTestBase {
                 .sendTimeout(0, TimeUnit.SECONDS)
                 .create();
 
-        Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
         //test publishing txn messages will not change maxReadPosition if don`t commit or abort.
         Transaction transaction = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
@@ -483,5 +499,62 @@ public class TransactionTest extends TransactionTestBase {
         Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
         Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
 
-        }
+    }
+
+    @Test
+    public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
+        String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable";
+        admin.topics().createNonPartitionedTopic(topic);
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(topic, false).get().get();
+        persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
+        Map<String, String> map = new HashMap<>();
+        map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
+        persistentTopic.getManagedLedger().setProperties(map);
+
+        ManagedCursor managedCursor = mock(ManagedCursor.class);
+        doReturn(true).when(managedCursor).hasMoreEntries();
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"),
+                    null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        MLTransactionLogImpl mlTransactionLog =
+                new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
+                        persistentTopic.getManagedLedger().getConfig());
+        Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
+        Field field = mlTransactionLogClass.getDeclaredField("cursor");
+        field.setAccessible(true);
+        field.set(mlTransactionLog, managedCursor);
+        field = mlTransactionLogClass.getDeclaredField("managedLedger");
+        field.setAccessible(true);
+        field.set(mlTransactionLog, persistentTopic.getManagedLedger());
+
+        TransactionRecoverTracker transactionRecoverTracker = mock(TransactionRecoverTracker.class);
+        doNothing().when(transactionRecoverTracker).appendOpenTransactionToTimeoutTracker();
+        doNothing().when(transactionRecoverTracker).handleCommittingAndAbortingTransaction();
+        TransactionTimeoutTracker timeoutTracker = mock(TransactionTimeoutTracker.class);
+        doNothing().when(timeoutTracker).start();
+        MLTransactionMetadataStore metadataStore1 =
+                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
+
+        doAnswer(invocation -> {
+            AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
+            callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
+            return null;
+        }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
+
+        MLTransactionMetadataStore metadataStore2 =
+                new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
+                        mlTransactionLog, timeoutTracker, transactionRecoverTracker);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index c044275..2d11d98 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -281,18 +281,19 @@ public class MLTransactionLogImpl implements TransactionLog {
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
         private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private boolean isReadable = true;
 
         boolean fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
                 if (cursor.hasMoreEntries()) {
                     outstandingReadsRequests.incrementAndGet();
                     readAsync(100, this);
-                    return true;
+                    return isReadable;
                 } else {
                     return false;
                 }
             } else {
-                return true;
+                return isReadable;
             }
         }
 
@@ -313,6 +314,11 @@ public class MLTransactionLogImpl implements TransactionLog {
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            if (managedLedgerConfig.isAutoSkipNonRecoverableData()
+                    && exception instanceof ManagedLedgerException.NonRecoverableLedgerException
+                    || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+                isReadable = false;
+            }
             log.error("Transaction log init fail error!", exception);
             outstandingReadsRequests.decrementAndGet();
         }

[pulsar] 15/15: [pulsar-perf]Support listenerThreads configuration. (#12892)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 626b1d366d5fd396484cecf220a2eba172c32b94
Author: mingyifei <38...@users.noreply.github.com>
AuthorDate: Sat Nov 20 03:47:47 2021 +0800

    [pulsar-perf]Support listenerThreads configuration. (#12892)
    
    * Support listenerThreads configuration.
    
    * Support listenerThreads configuration.
    
    * Modified to short option.
    
    * Add listenerThreads configuration document.
    
    Co-authored-by: mingyifei <mi...@accesscorporate.com.cn>
    (cherry picked from commit ec715f280d20910fbeefe9e2bdd436799173a008)
---
 .../main/java/org/apache/pulsar/testclient/PerformanceConsumer.java  | 5 +++++
 .../main/java/org/apache/pulsar/testclient/PerformanceReader.java    | 5 +++++
 site2/docs/reference-cli-tools.md                                    | 2 ++
 site2/website-next/docs/reference-cli-tools.md                       | 2 ++
 4 files changed, 14 insertions(+)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index db69778..a220c9f 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -180,6 +180,10 @@ public class PerformanceConsumer {
                 "used for handling connections to brokers, default is 1 thread")
         public int ioThreads = 1;
 
+        @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
+                + " to be used for message listeners")
+        public int listenerThreads = 1;
+
         @Parameter(names = {"--batch-index-ack" }, description = "Enable or disable the batch index acknowledgment")
         public boolean batchIndexAck = false;
 
@@ -340,6 +344,7 @@ public class PerformanceConsumer {
                 .connectionsPerBroker(arguments.maxConnections) //
                 .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
                 .ioThreads(arguments.ioThreads) //
+                .listenerThreads(arguments.listenerThreads)
                 .enableBusyWait(arguments.enableBusyWait)
                 .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
         if (isNotBlank(arguments.authPluginClassName)) {
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index a7f66ad..44e5555 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -134,6 +134,10 @@ public class PerformanceReader {
         @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
                 "used for handling connections to brokers, default is 1 thread")
         public int ioThreads = 1;
+
+        @Parameter(names = {"-lt", "--num-listener-threads"}, description = "Set the number of threads"
+                + " to be used for message listeners")
+        public int listenerThreads = 1;
     }
 
     public static void main(String[] args) throws Exception {
@@ -252,6 +256,7 @@ public class PerformanceReader {
                 .connectionsPerBroker(arguments.maxConnections) //
                 .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
                 .ioThreads(arguments.ioThreads) //
+                .listenerThreads(arguments.listenerThreads)
                 .enableTls(arguments.useTls) //
                 .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath);
 
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index 1911fe3..f4ae136 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -447,6 +447,7 @@ Options
 |`-mc`, `--max_chunked_msg`|Max pending chunk messages|0|
 |`-n`, `--num-consumers`|Number of consumers (per topic)|1|
 |`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-ns`, `--num-subscriptions`|Number of subscriptions (per topic)|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-pm`, `--pool-messages`|Use the pooled message|true|
@@ -541,6 +542,7 @@ Options
 |`-n`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
 |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
 |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|
diff --git a/site2/website-next/docs/reference-cli-tools.md b/site2/website-next/docs/reference-cli-tools.md
index cbdf717..a2eda14 100644
--- a/site2/website-next/docs/reference-cli-tools.md
+++ b/site2/website-next/docs/reference-cli-tools.md
@@ -533,6 +533,7 @@ Options
 |`-mc`, `--max_chunked_msg`|Max pending chunk messages|0|
 |`-n`, `--num-consumers`|Number of consumers (per topic)|1|
 |`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-ns`, `--num-subscriptions`|Number of subscriptions (per topic)|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-pm`, `--pool-messages`|Use the pooled message|true|
@@ -633,6 +634,7 @@ Options
 |`-n`, `--num-messages`|Number of messages to consume in total. If the value is equal to or smaller than 0, it keeps consuming messages.|0|
 |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
 |`-ioThreads`, `--num-io-threads`|Set the number of threads to be used for handling connections to brokers|1|
+|`-lt`, `--num-listener-threads`|Set the number of threads to be used for message listeners|1|
 |`-t`, `--num-topics`|The number of topics|1|
 |`-r`, `--rate`|Simulate a slow message reader (rate in msg/s)|0|
 |`-q`, `--receiver-queue-size`|Size of the receiver queue|1000|

[pulsar] 07/15: JavaInstanceTest should be AssertEquals (#12836)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 28dcdc380c409e09408079c4d34abd724d51bc25
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Thu Nov 18 16:04:22 2021 +0800

    JavaInstanceTest should be AssertEquals (#12836)
    
    (cherry picked from commit 4a4c1de6174c41ccf630fd17ea12d033adde9dd9)
---
 .../java/org/apache/pulsar/functions/instance/JavaInstanceTest.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 03c0da5..9931ddf 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.JavaInstance.AsyncFuncRequest;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -226,7 +227,7 @@ public class JavaInstanceTest {
 
         for (int i = 0; i < 3; i++) {
             AsyncFuncRequest request = instance.getPendingAsyncRequests().poll();
-            assertNotNull(testString + "-lambda", (String) request.getProcessResult().get());
+            Assert.assertEquals(request.getProcessResult().get(), testString + "-lambda");
         }
 
         long endTime = System.currentTimeMillis();
@@ -235,7 +236,6 @@ public class JavaInstanceTest {
         instance.close();
     }
     
-    @SuppressWarnings("serial")
 	private static class UserException extends Exception {
     	public UserException(String msg) {
     		super(msg);

[pulsar] 01/15: [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5277984b2a42e6ace54b86a70195d2a3729a36a3
Author: ran <ga...@126.com>
AuthorDate: Tue Nov 16 13:16:58 2021 +0800

    [Pulsar SQL] Handle message null schema version in PulsarRecordCursor (#12809)
    
    ### Motivation
    
    Currently, if the schema version of the message is null, the Pulsar SQL will encounter an NPE problem.
    
    ### Modifications
    
    Adjust logic for null schema version in `PulsarRecordCursor`.
    
    1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
    2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
    3. If the schema version of the message is null, use the latest schema of the topic.
    4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
    5. If the final schema is null throw a runtime exception.
    
    (cherry picked from commit e5619cffce702d9f446c27e69927148e45797b28)
---
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  42 ++++++--
 .../sql/presto/PulsarSqlSchemaInfoProvider.java    |   9 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  | 107 ++++++++++++++++++++-
 3 files changed, 146 insertions(+), 12 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index f1e2bdb..b1230d3 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.api.raw.RawMessage;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -479,14 +480,7 @@ public class PulsarRecordCursor implements RecordCursor {
         //start time for deseralizing record
         metricsTracker.start_RECORD_DESERIALIZE_TIME();
 
-        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
-        try {
-            if (schemaInfo == null) {
-                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
+        SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit);
 
         Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();
 
@@ -600,6 +594,38 @@ public class PulsarRecordCursor implements RecordCursor {
         return true;
     }
 
+    /**
+     * Get the schemaInfo of the message.
+     *
+     * 1. If the schema type of pulsarSplit is NONE or BYTES, use the BYTES schema.
+     * 2. If the schema type of pulsarSplit is BYTEBUFFER, use the BYTEBUFFER schema.
+     * 3. If the schema version of the message is null, use the schema info of pulsarSplit.
+     * 4. If the schema version of the message is not null, get the specific version schema by PulsarAdmin.
+     * 5. If the final schema is null throw a runtime exception.
+     */
+    private SchemaInfo getSchemaInfo(PulsarSplit pulsarSplit) {
+        SchemaInfo schemaInfo = getBytesSchemaInfo(pulsarSplit.getSchemaType(), pulsarSplit.getSchemaName());
+        if (schemaInfo != null) {
+            return schemaInfo;
+        }
+        try {
+            if (this.currentMessage.getSchemaVersion() == null) {
+                schemaInfo = pulsarSplit.getSchemaInfo();
+            } else {
+                schemaInfo =  schemaInfoProvider.getSchemaByVersion(this.currentMessage.getSchemaVersion()).get();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+        if (schemaInfo == null) {
+            String schemaVersion = this.currentMessage.getSchemaVersion() == null
+                    ? "null" : BytesSchemaVersion.of(this.currentMessage.getSchemaVersion()).toString();
+            throw new RuntimeException("The specific version (" + schemaVersion + ") schema of the table "
+                    + pulsarSplit.getTableName() + " is null");
+        }
+        return schemaInfo;
+    }
+
     private SchemaInfo getBytesSchemaInfo(SchemaType schemaType, String schemaName) {
         if (!schemaType.equals(SchemaType.BYTES) && !schemaType.equals(SchemaType.NONE)) {
             return null;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
index 3a9233c..828ceef 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSqlSchemaInfoProvider.java
@@ -102,8 +102,13 @@ public class PulsarSqlSchemaInfoProvider implements SchemaInfoProvider {
         ClassLoader originalContextLoader = Thread.currentThread().getContextClassLoader();
         try {
             Thread.currentThread().setContextClassLoader(InjectionManagerFactory.class.getClassLoader());
-            return pulsarAdmin.schemas()
-                    .getSchemaInfo(topicName.toString(), ByteBuffer.wrap(bytesSchemaVersion.get()).getLong());
+            long version = ByteBuffer.wrap(bytesSchemaVersion.get()).getLong();
+            SchemaInfo schemaInfo = pulsarAdmin.schemas().getSchemaInfo(topicName.toString(), version);
+            if (schemaInfo == null) {
+                throw new RuntimeException(
+                        "The specific version (" + version + ") schema of the topic " + topicName + " is null");
+            }
+            return schemaInfo;
         } finally {
             Thread.currentThread().setContextClassLoader(originalContextLoader);
         }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 610a1c2..d60ff20 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -34,18 +34,31 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -56,6 +69,8 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -65,6 +80,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -323,9 +339,14 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
 
                                     MessageMetadata messageMetadata =
                                             new MessageMetadata()
-                                                    .setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                                    .setProducerName("test-producer")
+                                                    .setSequenceId(positions.get(topic))
                                                     .setPublishTime(System.currentTimeMillis());
 
+                                    if (i % 2 == 0) {
+                                        messageMetadata.setSchemaVersion(new LongSchemaVersion(1L).bytes());
+                                    }
+
                                     if (KeyValueEncodingType.SEPARATED.equals(schema.getKeyValueEncodingType())) {
                                         messageMetadata
                                                 .setPartitionKey(new String(schema
@@ -380,7 +401,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         PulsarSplit split = new PulsarSplit(0, pulsarConnectorId.toString(),
                 topicName.getNamespace(), topicName.getLocalName(), topicName.getLocalName(),
                 entriesNum,
-                new String(schema.getSchemaInfo().getSchema()),
+                new String(schema.getSchemaInfo().getSchema(),  "ISO8859-1"),
                 schema.getSchemaInfo().getType(),
                 0, entriesNum,
                 0, 0, TupleDomain.all(),
@@ -416,4 +437,86 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         private Double field3;
     }
 
+    @Test
+    public void testGetSchemaInfo() throws Exception {
+        String topic = "get-schema-test";
+        PulsarSplit pulsarSplit = Mockito.mock(PulsarSplit.class);
+        Mockito.when(pulsarSplit.getTableName()).thenReturn(TopicName.get(topic).getLocalName());
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn("public/default");
+        PulsarAdmin pulsarAdmin = Mockito.mock(PulsarAdmin.class);
+        Schemas schemas = Mockito.mock(Schemas.class);
+        Mockito.when(pulsarAdmin.schemas()).thenReturn(schemas);
+        PulsarConnectorConfig connectorConfig = spy(new PulsarConnectorConfig());
+        Mockito.when(connectorConfig.getPulsarAdmin()).thenReturn(pulsarAdmin);
+        PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(
+                new ArrayList<>(), pulsarSplit, connectorConfig, Mockito.mock(ManagedLedgerFactory.class),
+                new ManagedLedgerConfig(), null, null));
+
+        Class<PulsarRecordCursor> clazz =  PulsarRecordCursor.class;
+        Method getSchemaInfo = clazz.getDeclaredMethod("getSchemaInfo", PulsarSplit.class);
+        getSchemaInfo.setAccessible(true);
+        Field currentMessage = clazz.getDeclaredField("currentMessage");
+        currentMessage.setAccessible(true);
+        RawMessage rawMessage = Mockito.mock(RawMessage.class);
+        currentMessage.set(pulsarRecordCursor, rawMessage);
+
+        // If the schemaType of pulsarSplit is NONE or BYTES, using bytes schema
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.NONE);
+        SchemaInfo schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.BYTES);
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        Mockito.when(pulsarSplit.getSchemaName()).thenReturn(Schema.BYTEBUFFER.getSchemaInfo().getName());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.BYTES, schemaInfo.getType());
+
+        // If the schemaVersion of the message is not null, try to get the schema.
+        Mockito.when(pulsarSplit.getSchemaType()).thenReturn(SchemaType.AVRO);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(0).bytes());
+        Mockito.when(schemas.getSchemaInfo(anyString(), eq(0L)))
+                .thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(SchemaType.AVRO, schemaInfo.getType());
+
+        String schemaTopic = "persistent://public/default/" + topic;
+
+        // If the schemaVersion of the message is null and the schema of pulsarSplit is null, throw runtime exception.
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(null);
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The message schema version is null and the latest schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the table " + topic + " is null"));
+        }
+
+        // If the schemaVersion of the message is null, try to get the latest schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(null);
+        Mockito.when(pulsarSplit.getSchemaInfo()).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+
+        // If the specific version schema is null, throw runtime exception.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(1L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 1)).thenReturn(null);
+        try {
+            schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+            fail("The specific version " + 1 + " schema is null, should fail.");
+        } catch (InvocationTargetException e) {
+            String schemaVersion = BytesSchemaVersion.of(new LongSchemaVersion(1L).bytes()).toString();
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertTrue(e.getCause().getMessage().contains("schema of the topic " + schemaTopic + " is null"));
+        }
+
+        // Get the specific version schema.
+        Mockito.when(rawMessage.getSchemaVersion()).thenReturn(new LongSchemaVersion(2L).bytes());
+        Mockito.when(schemas.getSchemaInfo(schemaTopic, 2)).thenReturn(Schema.AVRO(Foo.class).getSchemaInfo());
+        schemaInfo = (SchemaInfo) getSchemaInfo.invoke(pulsarRecordCursor, pulsarSplit);
+        assertEquals(Schema.AVRO(Foo.class).getSchemaInfo(), schemaInfo);
+    }
+
 }

[pulsar] 06/15: [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 012e1b1a5541d0ca17c65efc1f4bb905ad76ea7b
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Nov 18 20:37:37 2021 +0800

    [Issue 12757][broker] add broker config isAllowAutoUpdateSchema (#12786)
    
    (cherry picked from commit fa7be236efcc6772e0aac05f25f8d5f3cf0ad741)
---
 conf/broker.conf                                   |  4 ++
 conf/standalone.conf                               |  4 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++
 .../pulsar/broker/service/AbstractTopic.java       | 38 ++++++----
 .../SchemaCompatibilityCheckTest.java              | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/Policies.java      |  2 +-
 6 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 3dd8590..2d7df90 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -270,6 +270,10 @@ brokerMaxConnections=0
 # The maximum number of connections per IP. If it exceeds, new connections are rejected.
 brokerMaxConnectionsPerIp=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 878a852..577a6ff 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -176,6 +176,10 @@ defaultNumberOfNamespaceBundles=4
 # Using a value of 0, is disabling maxTopicsPerNamespace-limit check.
 maxTopicsPerNamespace=0
 
+# Allow schema to be auto updated at broker level. User can override this by
+# 'is_allow_auto_update_schema' of namespace policy.
+isAllowAutoUpdateSchemaEnabled=true
+
 # Enable check for minimum allowed client library version
 clientLibraryVersionCheckEnabled=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 69a3d35..0d578f4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -573,6 +573,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private int brokerMaxConnectionsPerIp = 0;
 
     @FieldContext(
+        category = CATEGORY_POLICIES,
+        dynamic = true,
+        doc = "Allow schema to be auto updated at broker level. User can override this by 'is_allow_auto_update_schema'"
+            + " of namespace policy. This is enabled by default."
+    )
+    private boolean isAllowAutoUpdateSchemaEnabled = true;
+
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Enable check for minimum allowed client library version"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9463be9..30c0fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -95,7 +95,7 @@ public abstract class AbstractTopic implements Topic {
     protected volatile boolean isEncryptionRequired = false;
     protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
             SchemaCompatibilityStrategy.FULL;
-    protected volatile boolean isAllowAutoUpdateSchema = true;
+    protected volatile Boolean isAllowAutoUpdateSchema;
     // schema validation enforced flag
     protected volatile boolean schemaValidationEnforced = false;
 
@@ -328,20 +328,28 @@ public abstract class AbstractTopic implements Topic {
         String base = TopicName.get(getName()).getPartitionedTopicName();
         String id = TopicName.get(base).getSchemaName();
         SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
-        return isAllowAutoUpdateSchema ? schemaRegistryService
-                .putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
-                : schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
-                schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
-                        .thenCompose(schemaVersion -> {
-                    if (schemaVersion == null) {
-                        return FutureUtil
-                                .failedFuture(
-                                        new IncompatibleSchemaException(
-                                                "Schema not found and schema auto updating is disabled."));
-                    } else {
-                        return CompletableFuture.completedFuture(schemaVersion);
-                    }
-                }));
+
+        if (allowAutoUpdateSchema()) {
+            return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
+        } else {
+            return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
+                    schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
+                            .thenCompose(schemaVersion -> {
+                                if (schemaVersion == null) {
+                                    return FutureUtil.failedFuture(new IncompatibleSchemaException(
+                                            "Schema not found and schema auto updating is disabled."));
+                                } else {
+                                    return CompletableFuture.completedFuture(schemaVersion);
+                                }
+                            }));
+        }
+    }
+
+    private boolean allowAutoUpdateSchema() {
+        if (isAllowAutoUpdateSchema == null) {
+            return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
+        }
+        return isAllowAutoUpdateSchema;
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 293f71d..80168b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -218,6 +218,87 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
+    public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy)
+            throws Exception {
+
+        final String tenant = PUBLIC_TENANT;
+        final String topic = "test-consumer-compatibility";
+        String namespace = "test-namespace-" + randomName(16);
+        String fqtn = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topic
+        ).toString();
+
+        NamespaceName namespaceName = NamespaceName.get(tenant, namespace);
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()),
+                SchemaCompatibilityStrategy.FULL);
+
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
+        admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
+
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+        ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
+                .newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtn);
+        try {
+            producerThreeBuilder.create();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
+        }
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
+        ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
+                        SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                                        (false).withSupportSchemaVersioning(true).
+                                withPojo(Schemas.PersonTwo.class).build()))
+                .subscriptionName("test")
+                .topic(fqtn);
+
+        Producer<Schemas.PersonTwo> producer = producerThreeBuilder.create();
+        Consumer<Schemas.PersonTwo> consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        Message<Schemas.PersonTwo> message = consumerTwo.receive();
+
+        Schemas.PersonTwo personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        producer.close();
+        consumerTwo.close();
+
+        pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
+
+        producer = producerThreeBuilder.create();
+        consumerTwo = comsumerBuilder.subscribe();
+
+        producer.send(new Schemas.PersonTwo(2, "Lucy"));
+        message = consumerTwo.receive();
+
+        personTwo = message.getValue();
+        consumerTwo.acknowledge(message);
+
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
+
+        consumerTwo.close();
+        producer.close();
+    }
+
     @Test(dataProvider =  "AllCheckSchemaCompatibilityStrategy")
     public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
         final String tenant = PUBLIC_TENANT;
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 631675f..d26c61e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -108,7 +108,7 @@ public class Policies {
     public SchemaCompatibilityStrategy schema_compatibility_strategy = SchemaCompatibilityStrategy.UNDEFINED;
 
     @SuppressWarnings("checkstyle:MemberName")
-    public boolean is_allow_auto_update_schema = true;
+    public Boolean is_allow_auto_update_schema = null;
 
     @SuppressWarnings("checkstyle:MemberName")
     public boolean schema_validation_enforced = false;

[pulsar] 14/15: Handle exception double (#12881)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33ff93f401a51f7599f77ffe8d8c6d6851dae317
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Sat Nov 20 00:20:46 2021 +0800

    Handle exception double (#12881)
    
    Fixes https://github.com/apache/pulsar/pull/12744
    
    ### Motivation
    The exception here was handled twice, resulting in a null pointer exception.
    And the position will be updated twice.
    ### Modifications
    Keep the exceptionally() part and convert the whenComplete() into thenAccept().
    
    (cherry picked from commit dbac1217ee6b8247ea970e3a70e928259a389244)
---
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    | 44 ++++++++++------------
 .../bookkeeper/mledger/impl/EntryCacheManager.java | 13 +++----
 2 files changed, 26 insertions(+), 31 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 6d54abb..085923e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -209,14 +209,8 @@ public class EntryCacheImpl implements EntryCache {
             manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
             callback.readEntryComplete(cachedEntry, ctx);
         } else {
-            lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
-                    (ledgerEntries, exception) -> {
-                        if (exception != null) {
-                            ml.invalidateLedgerHandle(lh);
-                            callback.readEntryFailed(createManagedLedgerException(exception), ctx);
-                            return;
-                        }
-
+            lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(
+                    ledgerEntries -> {
                         try {
                             Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
                             if (iterator.hasNext()) {
@@ -234,7 +228,11 @@ public class EntryCacheImpl implements EntryCache {
                         } finally {
                             ledgerEntries.close();
                         }
-                    }, ml.getExecutor().chooseThread(ml.getName()));
+                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                        ml.invalidateLedgerHandle(lh);
+                        callback.readEntryFailed(createManagedLedgerException(exception), ctx);
+                        return null;
+            });
         }
     }
 
@@ -292,20 +290,8 @@ public class EntryCacheImpl implements EntryCache {
             }
 
             // Read all the entries from bookkeeper
-            lh.readAsync(firstEntry, lastEntry).whenCompleteAsync(
-                    (ledgerEntries, exception) -> {
-                        if (exception != null) {
-                            if (exception instanceof BKException
-                                && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
-                                callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                            } else {
-                                ml.invalidateLedgerHandle(lh);
-                                ManagedLedgerException mlException = createManagedLedgerException(exception);
-                                callback.readEntriesFailed(mlException, ctx);
-                            }
-                            return;
-                        }
-
+            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+                    ledgerEntries -> {
                         checkNotNull(ml.getName());
                         checkNotNull(ml.getExecutor());
 
@@ -328,7 +314,17 @@ public class EntryCacheImpl implements EntryCache {
                         } finally {
                             ledgerEntries.close();
                         }
-                    }, ml.getExecutor().chooseThread(ml.getName()));
+                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                        if (exception instanceof BKException
+                                && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
+                            callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                        } else {
+                            ml.invalidateLedgerHandle(lh);
+                            ManagedLedgerException mlException = createManagedLedgerException(exception);
+                            callback.readEntriesFailed(mlException, ctx);
+                        }
+                        return null;
+            });
         }
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index d360bbd..1c0c288 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -193,12 +193,8 @@ public class EntryCacheManager {
         @Override
         public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
                 final ReadEntriesCallback callback, Object ctx) {
-            lh.readAsync(firstEntry, lastEntry).whenComplete(
-                    (ledgerEntries, exception) -> {
-                        if (exception != null) {
-                            callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                            return;
-                        }
+            lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
+                    ledgerEntries -> {
                         List<Entry> entries = Lists.newArrayList();
                         long totalSize = 0;
                         try {
@@ -215,7 +211,10 @@ public class EntryCacheManager {
                         ml.mbean.addReadEntriesSample(entries.size(), totalSize);
 
                         callback.readEntriesComplete(entries, ctx);
-                    });
+                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
+                        callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                        return null;
+            });
         }
 
         @Override

[pulsar] 04/15: The problem of two exception handling (#12744)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b9629cd86db4755a445fb8817496a685784c7d03
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Nov 18 15:28:07 2021 +0800

    The problem of two exception handling (#12744)
    
    whenCompleteAsync has handle exception, don't use exceptionally, otherwise it will be handle twice
    
    (cherry picked from commit fc8d50ecf841bd1a4b01fa09720411f6190ee5be)
---
 .../bookkeeper/mledger/impl/EntryCacheImpl.java       | 19 ++-----------------
 .../bookkeeper/mledger/impl/EntryCacheManager.java    |  3 ---
 2 files changed, 2 insertions(+), 20 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index 6a0ac2c..6d54abb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -234,12 +234,7 @@ public class EntryCacheImpl implements EntryCache {
                         } finally {
                             ledgerEntries.close();
                         }
-                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
-                          ml.invalidateLedgerHandle(lh);
-                          callback.readEntryFailed(createManagedLedgerException(exception), ctx);
-                          return null;
-                    }
-                    );
+                    }, ml.getExecutor().chooseThread(ml.getName()));
         }
     }
 
@@ -333,17 +328,7 @@ public class EntryCacheImpl implements EntryCache {
                         } finally {
                             ledgerEntries.close();
                         }
-                    }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception->{
-                    	  if (exception instanceof BKException
-                                  && ((BKException)exception).getCode() == BKException.Code.TooManyRequestsException) {
-                                  callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                              } else {
-                                  ml.invalidateLedgerHandle(lh);
-                                  ManagedLedgerException mlException = createManagedLedgerException(exception);
-                                  callback.readEntriesFailed(mlException, ctx);
-                              }
-                    	return null;
-                    });
+                    }, ml.getExecutor().chooseThread(ml.getName()));
         }
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
index c87bcb8..d360bbd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java
@@ -215,9 +215,6 @@ public class EntryCacheManager {
                         ml.mbean.addReadEntriesSample(entries.size(), totalSize);
 
                         callback.readEntriesComplete(entries, ctx);
-                    }).exceptionally(exception -> {
-                    	callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
-                    	return null;
                     });
         }