You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:06 UTC

[pulsar] branch branch-2.8 updated (ef4c8e5 -> 1a32edc)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from ef4c8e5  Fix invalid setting of enabled ciphers to fix warning from BoringSSL
     new 3c253ea  Close old compacted ledger when open new. (#13210)
     new c578829  [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
     new a98c52b  [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)
     new 05d88f1  Fix Version.h not found when CMake binary directory is customized (#13324)
     new d0eac9a  [pulsar-client] Fix multi topic reader has message available behavior (#13332)
     new c941b2f  [Broker] Fix race conditions in closing producers and consumers (#13428)
     new 756ade0  Fix NPE in cmdTopics (#13450)
     new 1a32edc  Fixes the NPE in system topics policies service (#13469)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 14 ++---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  8 +--
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 15 +++--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 15 +++--
 .../apache/pulsar/broker/service/ServerCnx.java    | 24 +++++---
 .../SystemTopicBasedTopicPoliciesService.java      | 31 +++++++---
 .../service/persistent/CompactorSubscription.java  |  8 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |  2 +
 .../api/AuthorizationProducerConsumerTest.java     |  2 +
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 67 ++++++++++++++++++++++
 pulsar-client-cpp/CMakeLists.txt                   |  1 +
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  4 ++
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  2 +-
 14 files changed, 152 insertions(+), 44 deletions(-)

[pulsar] 07/08: Fix NPE in cmdTopics (#13450)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 756ade01b2b9e590b3408a2b82634313ae11014c
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Dec 23 17:25:49 2021 +0800

    Fix NPE in cmdTopics (#13450)
    
    (cherry picked from commit 76f35666deb5a956b7eef9732a3028b246e5294c)
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java          | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 3d522ff..c73e8ad 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -590,6 +590,10 @@ public class CmdTopics extends CmdBase {
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
             String internalInfo = getTopics().getInternalInfo(topic);
+            if (internalInfo == null) {
+                System.out.println("Did not find any internal metadata info");
+                return;
+            }
             JsonObject result = JsonParser.parseString(internalInfo).getAsJsonObject();
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
             System.out.println(gson.toJson(result));

[pulsar] 04/08: Fix Version.h not found when CMake binary directory is customized (#13324)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05d88f198cedce17e19a08e7ca4905a027e5db76
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Dec 15 17:08:09 2021 +0800

    Fix Version.h not found when CMake binary directory is customized (#13324)
    
    ### Motivation
    
    When I build C++ tests on my local env, the following error happened.
    
    ```
    tests/VersionTest.cc:19:10: fatal error: 'pulsar/Version.h' file not found
    #include <pulsar/Version.h>
    ```
    
    It's because I specified another directory as CMake directory.
    
    ```bash
    mkdir _builds && cd _builds && cmake ..
    ```
    
    After https://github.com/apache/pulsar/pull/12769, the `Version.h` is generated under `${CMAKE_BINARY_DIR}/include/pulsar` directory but it's not included in `CMakeLists.txt`. CI works well because it's built in the default CMake directory so that `CMAKE_BINARY_DIR` is the same with `CMAKE_SOURCE_DIR`, which is included.
    
    ### Modifications
    
    Add the `${CMAKE_BINARY_DIR}/include` to `included_directories`.
    
    (cherry picked from commit ca37e67211feda4f7e0984e6414e707f1c1dfd07)
---
 pulsar-client-cpp/CMakeLists.txt | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index a8ca467..22d5605 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -332,6 +332,7 @@ file(MAKE_DIRECTORY ${AUTOGEN_DIR})
 include_directories(
   ${CMAKE_SOURCE_DIR}
   ${CMAKE_SOURCE_DIR}/include
+  ${CMAKE_BINARY_DIR}/include
   ${AUTOGEN_DIR}
   ${Boost_INCLUDE_DIR}
   ${OPENSSL_INCLUDE_DIR}

[pulsar] 03/08: [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a98c52bc6841cd48c8ec87f2982f0a89b547789f
Author: Ruguo Yu <ji...@163.com>
AuthorDate: Wed Dec 15 15:54:41 2021 +0800

    [Broker] Modify return result of NamespacesBase#internalGetPublishRate (#13237)
    
    ### Motivation
    It should return `null` instead of `RestException` in method `NamespacesBase#internalGetPublishRate`, because `null` means that the `publish-rate` is not configured.
    It is the same as `internalGetSubscriptionDispatchRate` as below:
    https://github.com/apache/pulsar/blob/6d9d24d50db5418ddbb845d2c7a2be2b9ac72893/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L1303-L1308
    
    (cherry picked from commit 3e55b4f2c40ecd73bb38962cebc4a822bfe5f1ef)
---
 .../apache/pulsar/broker/admin/impl/NamespacesBase.java   |  8 +-------
 .../org/apache/pulsar/broker/admin/v1/Namespaces.java     | 15 ++++++++++-----
 .../org/apache/pulsar/broker/admin/v2/Namespaces.java     | 15 ++++++++++-----
 .../client/api/AuthorizationProducerConsumerTest.java     |  2 ++
 4 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 5d78216..3d67008 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1198,13 +1198,7 @@ public abstract class NamespacesBase extends AdminResource {
         validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
 
         Policies policies = getNamespacePolicies(namespaceName);
-        PublishRate publishRate = policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
-        if (publishRate != null) {
-            return publishRate;
-        } else {
-            throw new RestException(Status.NOT_FOUND,
-                    "Publish-rate is not configured for cluster " + pulsar().getConfiguration().getClusterName());
-        }
+        return policies.publishMaxMessageRate.get(pulsar().getConfiguration().getClusterName());
     }
 
     @SuppressWarnings("deprecation")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 529b8ce..9fec59c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -624,7 +624,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{cluster}/{namespace}/publishRate")
     @ApiOperation(hidden = true,
-            value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get publish-rate configured for the namespace, null means publish-rate not configured, "
+                    + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public PublishRate getPublishRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@@ -646,7 +647,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{cluster}/{namespace}/dispatchRate")
     @ApiOperation(hidden = true,
-            value = "Get dispatch-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get dispatch-rate configured for the namespace, null means dispatch-rate not configured, "
+                    + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getDispatchRate(@PathParam("property") String property, @PathParam("cluster") String cluster,
@@ -669,8 +671,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{property}/{cluster}/{namespace}/subscriptionDispatchRate")
-    @ApiOperation(value =
-            "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get subscription dispatch-rate configured for the namespace, null means subscription "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getSubscriptionDispatchRate(@PathParam("property") String property,
@@ -695,7 +698,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate")
-    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, null means replicator "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index f9515e3..ed5d9e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -565,7 +565,8 @@ public class Namespaces extends NamespacesBase {
     @GET
     @Path("/{property}/{namespace}/publishRate")
     @ApiOperation(hidden = true,
-            value = "Get publish-rate configured for the namespace, -1 represents not configured yet")
+            value = "Get publish-rate configured for the namespace, null means publish-rate not configured, "
+                    + "-1 means msg-publish-rate or byte-publish-rate not configured in publish-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public PublishRate getPublishRate(
@@ -597,7 +598,8 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/dispatchRate")
-    @ApiOperation(value = "Get dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get dispatch-rate configured for the namespace, null means dispatch-rate not configured, "
+            + "-1 means msg-dispatch-rate or byte-dispatch-rate not configured in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getDispatchRate(@PathParam("tenant") String tenant,
@@ -620,8 +622,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/subscriptionDispatchRate")
-    @ApiOperation(
-            value = "Get Subscription dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get subscription dispatch-rate configured for the namespace, null means subscription "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist")})
     public DispatchRate getSubscriptionDispatchRate(@PathParam("tenant") String tenant,
@@ -695,7 +698,9 @@ public class Namespaces extends NamespacesBase {
 
     @GET
     @Path("/{tenant}/{namespace}/replicatorDispatchRate")
-    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet")
+    @ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, null means replicator "
+            + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+            + "in dispatch-rate yet")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Namespace does not exist") })
     public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 8807f11..475dc8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
@@ -207,6 +208,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         superAdmin.tenants().createTenant("my-property",
                 new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
         superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        assertNull(superAdmin.namespaces().getPublishRate(namespace));
 
         // subscriptionRole doesn't have topic-level authorization, so it will fail to get topic stats-internal info
         try {

[pulsar] 05/08: [pulsar-client] Fix multi topic reader has message available behavior (#13332)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d0eac9a2b157209f80659966649200906086a9ee
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Thu Dec 23 16:36:27 2021 +0800

    [pulsar-client] Fix multi topic reader has message available behavior (#13332)
    
    ### Motivation
    
    When we use a multi-topic reader, the `hasMessageAvailable` method might have the wrong behavior, since the multi-topics consumer receives all messages from the single-topic consumer, the single-topic consumer `hasMessageAvailable` might always be `false` (The lastDequeuedMessageId reach to the end of the queue, all message enqueue to multi-topic consumer's `incomingMessages` queue).
    
    We should check the multi-topics consumer  `incomingMessages` size > 0 when calling `hasMessageAvailable `.
    
    ### Modifications
    
    1. Add a check of `incomingMessages` size > 0
    2. Add units test `testHasMessageAvailableAsync` to verify the behavior.
    
    (cherry picked from commit 6c7dcc0cf877cfcb8bcea18cde7662ebacb01d4c)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 67 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  2 +-
 3 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a8a6ced..f6230e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -26,15 +26,19 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -43,6 +47,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -61,6 +66,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")
+@Slf4j
 public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "reader-multi-topics-sub";
@@ -122,6 +128,67 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 10000)
+    public void testHasMessageAvailableAsync() throws Exception {
+        String topic = "persistent://my-property/my-ns/testHasMessageAvailableAsync";
+        String content = "my-message-";
+        int msgNum = 10;
+        admin.topics().createPartitionedTopic(topic, 2);
+        // stop retention from cleaning up
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true)
+                .startMessageId(MessageId.earliest).create()) {
+            Assert.assertFalse(reader.hasMessageAvailable());
+            Assert.assertFalse(reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS));
+        }
+
+        try (Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic).startMessageId(MessageId.earliest).create()) {
+            try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) {
+                for (int i = 0; i < msgNum; i++) {
+                    producer.newMessage().key(content + i)
+                            .value((content + i).getBytes(StandardCharsets.UTF_8)).send();
+                }
+            }
+            // Should have message available
+            Assert.assertTrue(reader.hasMessageAvailableAsync().get());
+            try {
+                // Should have message available too
+                Assert.assertTrue(reader.hasMessageAvailable());
+            } catch (PulsarClientException e) {
+                fail("Expect success but failed.", e);
+            }
+            List<Message<byte[]>> msgs = Collections.synchronizedList(new ArrayList<>());
+            CountDownLatch latch = new CountDownLatch(1);
+            readMessageUseAsync(reader, msgs, latch);
+            latch.await();
+            Assert.assertEquals(msgs.size(), msgNum);
+        }
+    }
+
+    private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
+        reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
+            if (hasMessageAvailable) {
+                try {
+                    Message<T> msg = reader.readNext();
+                    msgs.add(msg);
+                } catch (PulsarClientException e) {
+                    log.error("Read message failed.", e);
+                    latch.countDown();
+                    return;
+                }
+                readMessageUseAsync(reader, msgs, latch);
+            } else {
+                latch.countDown();
+            }
+        }).exceptionally(throwable -> {
+            log.error("Read message failed.", throwable);
+            latch.countDown();
+            return null;
+        });
+    }
+
+    @Test(timeOut = 10000)
     public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
         String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
         int topicNum = 3;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index faa2d65..177cc9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -754,6 +754,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        if (numMessagesInQueue() > 0) {
+            return CompletableFuture.completedFuture(true);
+        }
         List<CompletableFuture<Void>> futureList = new ArrayList<>();
         final AtomicBoolean hasMessageAvailable = new AtomicBoolean(false);
         for (ConsumerImpl<T> consumer : consumers.values()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index d25d1ba..9cab114 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -141,7 +141,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     @Override
     public boolean hasMessageAvailable() throws PulsarClientException {
-        return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
+        return multiTopicsConsumer.hasMessageAvailable();
     }
 
     @Override

[pulsar] 08/08: Fixes the NPE in system topics policies service (#13469)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1a32edc69eb017d1a95f19708a029aab9aeddbf8
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Dec 23 19:27:23 2021 +0800

    Fixes the NPE in system topics policies service (#13469)
    
    ---
    
    *Motivation*
    
    The `namespaceEventsSystemTopicFactory` is created when you will
    use it. But the `createSystemTopicFactoryIfNeeded()` may failed
    which will cause the `namespaceEventsSystemTopicFactory` is null
    and throw a NPE error from the method.
    
    *Modifications*
    
    - throw the error and failed the method when there has exceptions in
    `createSystemTopicFactoryIfNeeded()`
    
    (cherry picked from commit 4022b2884f46bb5e1593da419bb226ad1e0fc768)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 1cc71e9..0b305ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -88,9 +88,13 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, ActionType actionType,
                                                          TopicPolicies policies) {
-        createSystemTopicFactoryIfNeeded();
-
         CompletableFuture<Void> result = new CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
 
         SystemTopicClient<PulsarEvent> systemTopicClient =
                 namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
@@ -186,8 +190,9 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     @Override
     public CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName) {
         CompletableFuture<TopicPolicies> result = new CompletableFuture<>();
-        createSystemTopicFactoryIfNeeded();
-        if (namespaceEventsSystemTopicFactory == null) {
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
             result.complete(null);
             return result;
         }
@@ -206,7 +211,6 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             result.complete(null);
             return result;
         }
-        createSystemTopicFactoryIfNeeded();
         synchronized (this) {
             if (readerCaches.get(namespace) != null) {
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
@@ -240,9 +244,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry(
             NamespaceName namespace) {
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
+        try {
+            createSystemTopicFactoryIfNeeded();
+        } catch (PulsarServerException e) {
+            result.completeExceptionally(e);
+            return result;
+        }
         SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
                 .createTopicPoliciesSystemTopicClient(namespace);
-        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>();
         Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
         RetryUtil.retryAsynchronously(() -> {
             try {
@@ -389,6 +399,12 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                     // However, due to compatibility, it is temporarily retained here
                     // and can be deleted in the future.
                     policiesCache.remove(topicName);
+                    try {
+                        createSystemTopicFactoryIfNeeded();
+                    } catch (PulsarServerException e) {
+                        log.error("Failed to create system topic factory");
+                        break;
+                    }
                     SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
                             .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
@@ -408,7 +424,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         }
     }
 
-    private void createSystemTopicFactoryIfNeeded() {
+    private void createSystemTopicFactoryIfNeeded() throws PulsarServerException {
         if (namespaceEventsSystemTopicFactory == null) {
             synchronized (this) {
                 if (namespaceEventsSystemTopicFactory == null) {
@@ -417,6 +433,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                                 new NamespaceEventsSystemTopicFactory(pulsarService.getClient());
                     } catch (PulsarServerException e) {
                         log.error("Create namespace event system topic factory error.", e);
+                        throw e;
                     }
                 }
             }

[pulsar] 01/08: Close old compacted ledger when open new. (#13210)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c253ea52c13763f60bf8d15b32e2eee5c6c5840
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon Dec 13 17:03:20 2021 +0800

    Close old compacted ledger when open new. (#13210)
    
    (cherry picked from commit 07ef9231db8b844586b9217ee2d59237eb9c54b7)
---
 .../pulsar/broker/service/persistent/CompactorSubscription.java   | 8 ++++++--
 .../org/apache/pulsar/broker/service/PersistentTopicTest.java     | 2 ++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 76e54b4..f7279968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -47,8 +47,12 @@ public class CompactorSubscription extends PersistentSubscription {
         Map<String, Long> properties = cursor.getProperties();
         if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) {
             long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
-            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(),
-                                              compactedLedgerId);
+            compactedTopic.newCompactedLedger(cursor.getMarkDeletedPosition(), compactedLedgerId)
+                    .thenAccept(previousContext -> {
+                        if (previousContext != null) {
+                            compactedTopic.deleteCompactedLedger(previousContext.getLedger().getId());
+                        }
+                    });
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 9f21e46..27b5c87 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1877,6 +1877,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
+        when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
         new CompactorSubscription(topic, compactedTopic, Compactor.COMPACTION_SUBSCRIPTION, cursorMock);
         verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId);
     }

[pulsar] 06/08: [Broker] Fix race conditions in closing producers and consumers (#13428)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c941b2f742d796210ba482c96bd4fb5f66dd61fa
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Dec 22 07:05:48 2021 +0200

    [Broker] Fix race conditions in closing producers and consumers (#13428)
    
    - closing ServerCnx while producers or consumers are created can lead
      to a producer or consumer never getting removed from the topic's
      list of producers
    
    (cherry picked from commit 3316db5a52cdeee49bc90fe18baac28d5688bfe8)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a8d7f23..9044488 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -283,6 +283,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         // Connection is gone, close the producers immediately
         producers.forEach((__, producerFuture) -> {
+            // prevent race conditions in completing producers
+            if (!producerFuture.isDone()
+                    && producerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
+                return;
+            }
             if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                 Producer producer = producerFuture.getNow(null);
                 producer.closeNow(true);
@@ -290,17 +295,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         });
 
         consumers.forEach((__, consumerFuture) -> {
-            Consumer consumer;
-            if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
-                consumer = consumerFuture.getNow(null);
-            } else {
+            // prevent race conditions in completing consumers
+            if (!consumerFuture.isDone()
+                    && consumerFuture.completeExceptionally(new IllegalStateException("Connection closed."))) {
                 return;
             }
-
-            try {
-                consumer.close();
-            } catch (BrokerServiceException e) {
-                log.warn("Consumer {} was already closed: {}", consumer, e);
+            if (consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
+                Consumer consumer = consumerFuture.getNow(null);
+                try {
+                    consumer.close();
+                } catch (BrokerServiceException e) {
+                    log.warn("Consumer {} was already closed: {}", consumer, e);
+                }
             }
         });
         this.service.getPulsarStats().recordConnectionClose();

[pulsar] 02/08: [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c57882972e981aa69907c06f84c5896cf82cab34
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Dec 9 18:11:50 2021 -0600

    [Broker] Optimize ManagedLedger Ledger Ownership Check (#13222)
    
    (cherry picked from commit 02b8de039df38fa95656e7ca9b5c0babd25ff021)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java  | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 02ea420..4fe5e9a 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1825,21 +1825,19 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
         }
-        if (!ledgers.containsKey(position.getLedgerId())) {
-            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
-                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
-            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
-                + "the ledgerId does not belong to this topic or has been deleted"), ctx);
-            return;
-        }
         if (position.getLedgerId() == currentLedger.getId()) {
             asyncReadEntry(currentLedger, position, callback, ctx);
-        } else {
+        } else if (ledgers.containsKey(position.getLedgerId())) {
             getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> asyncReadEntry(ledger, position, callback, ctx)).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
                 return null;
             });
+        } else {
+            log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+                    + "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
+            callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+                    + "the ledgerId does not belong to this topic or has been deleted"), ctx);
         }
 
     }