You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:42:42 UTC

[pulsar] branch branch-2.11 updated (be9cbc3b286 -> aff59a7ed19)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from be9cbc3b286 [fix][broker]Consumer can't consume messages because there has two sames topics in one broker (#17526)
     new e333cf97a70 [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)
     new aff59a7ed19 [fix][broker] Extract additional servlets to the default directory by… (#17477)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../web/plugin/servlet/AdditionalServlets.java     |  3 +-
 .../web/plugin/servlet/AdditionalServletsTest.java | 82 ++++++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 24 +++++++
 .../apache/pulsar/client/impl/TableViewTest.java   | 60 ++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  8 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 24 ++++---
 6 files changed, 189 insertions(+), 12 deletions(-)
 create mode 100644 pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletsTest.java


[pulsar] 02/02: [fix][broker] Extract additional servlets to the default directory by… (#17477)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aff59a7ed19d87825103f285cf4b8e561e15bc5a
Author: Andras Beni <an...@streamnative.io>
AuthorDate: Thu Sep 22 13:27:09 2022 +0200

    [fix][broker] Extract additional servlets to the default directory by… (#17477)
---
 .../web/plugin/servlet/AdditionalServlets.java     |  3 +-
 .../web/plugin/servlet/AdditionalServletsTest.java | 82 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
index 8a7bce8e332..4feef4c7a6f 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Map;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 import org.apache.pulsar.common.nar.NarClassLoader;
 
@@ -70,7 +71,7 @@ public class AdditionalServlets implements AutoCloseable {
         }
 
         String narExtractionDirectory = conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY);
-        if (narExtractionDirectory == null) {
+        if (StringUtils.isBlank(narExtractionDirectory)) {
             narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
         }
 
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletsTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletsTest.java
new file mode 100644
index 00000000000..0d5e6f5262b
--- /dev/null
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServletsTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web.plugin.servlet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public class AdditionalServletsTest {
+
+
+    @Test
+    public void testEmptyStringAsExtractionDirectory() throws IOException {
+        Properties p = new Properties();
+        p.put("narExtractionDirectory", "");
+        p.put("additionalServlets", "AS1,AS2");
+        p.put("additionalServletDirectory", "/additionalServletDirectory");
+
+        PulsarConfiguration config = mock(PulsarConfiguration.class);
+        Mockito.when(config.getProperties()).thenReturn(p);
+
+        AdditionalServletMetadata asm1 = additionalServletMetadata(1);
+        AdditionalServletMetadata asm2 = additionalServletMetadata(2);
+
+        AdditionalServletDefinitions definitions = new AdditionalServletDefinitions();
+        definitions.servlets().put("AS1", asm1);
+        definitions.servlets().put("AS2", asm2);
+
+        AdditionalServletWithClassLoader as1 = mock(AdditionalServletWithClassLoader.class);
+        AdditionalServletWithClassLoader as2 = mock(AdditionalServletWithClassLoader.class);
+
+        String originalTmpDirectory = System.getProperty("java.io.tmpdir");
+        try (MockedStatic<AdditionalServletUtils> utils = mockStatic(AdditionalServletUtils.class)) {
+            String tmpDirectory = "/my/tmp/directory";
+            System.setProperty("java.io.tmpdir", tmpDirectory);
+            utils.when(() -> AdditionalServletUtils.searchForServlets(
+                    "/additionalServletDirectory", tmpDirectory)).thenReturn(definitions);
+            utils.when(() -> AdditionalServletUtils.load(asm1, tmpDirectory)).thenReturn(as1);
+            utils.when(() -> AdditionalServletUtils.load(asm2, tmpDirectory)).thenReturn(as2);
+
+            AdditionalServlets servlets = AdditionalServlets.load(config);
+
+            Assert.assertEquals(servlets.getServlets().get("AS1"), as1);
+            Assert.assertEquals(servlets.getServlets().get("AS2"), as2);
+        } finally {
+            System.setProperty("java.io.tmpdir", originalTmpDirectory);
+        }
+    }
+
+    private AdditionalServletMetadata additionalServletMetadata(int index) {
+        AdditionalServletMetadata as = new AdditionalServletMetadata();
+        as.setArchivePath(Paths.get("/additionalServletDirectory/" + index));
+        as.setDefinition(new AdditionalServletDefinition());
+        as.getDefinition().setName("as" + index);
+        as.getDefinition().setAdditionalServletClass("com.example.AS" + index);
+        as.getDefinition().setDescription("Additional Servlet " +index);
+        return as;
+    }
+}


[pulsar] 01/02: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e333cf97a702e3e17f2467c1b68db667d3aea131
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Thu Sep 22 03:29:14 2022 -0700

    [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 24 +++++++++
 .../apache/pulsar/client/impl/TableViewTest.java   | 60 ++++++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  8 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 24 +++++----
 4 files changed, 105 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 6b6bf959483..edb5f0cd88d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -625,6 +626,29 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    void shouldSupportCancellingReadNextAsync() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsReaderImpl<byte[]> reader = (MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+        // given
+        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        Awaitility.await().untilAsserted(() -> {
+            AssertJUnit.assertTrue(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+        });
+
+        // when
+        future.cancel(false);
+
+        // then
+        AssertJUnit.assertFalse(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+    }
+
+
     private void testReadMessages(String topic, boolean enableBatch) throws Exception {
         int numKeys = 9;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 20f510e97e2..8722f649212 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -29,7 +33,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -42,6 +48,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -217,4 +224,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(timeOut = 30 * 1000, dataProvider = "partitionedTopic")
+    public void testAck(boolean partitionedTopic) throws Exception {
+        String topic = null;
+        if (partitionedTopic) {
+            topic = "persistent://public/default/tableview-ack-test";
+            admin.topics().createPartitionedTopic(topic, 3);
+        } else {
+            topic = "persistent://public/default/tableview-no-partition-ack-test";
+            admin.topics().createNonPartitionedTopic(topic);
+        }
+
+        @Cleanup
+        TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+
+        ConsumerBase consumerBase;
+        if (partitionedTopic) {
+            MultiTopicsReaderImpl<String> reader =
+                    ((CompletableFuture<MultiTopicsReaderImpl<String>>) FieldUtils
+                            .readDeclaredField(tv1, "reader", true)).get();
+            consumerBase = spy(reader.getMultiTopicsConsumer());
+            FieldUtils.writeDeclaredField(reader, "multiTopicsConsumer", consumerBase, true);
+        } else {
+            ReaderImpl<String> reader = ((CompletableFuture<ReaderImpl<String>>) FieldUtils
+                    .readDeclaredField(tv1, "reader", true)).get();
+            consumerBase = spy(reader.getConsumer());
+            FieldUtils.writeDeclaredField(reader, "consumer", consumerBase, true);
+        }
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        int msgCount = 20;
+        for (int i = 0; i < msgCount; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        Awaitility.await()
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(Duration.ofMillis(5000))
+                .untilAsserted(()
+                        -> verify(consumerBase, times(msgCount)).acknowledgeCumulativeAsync(any(MessageId.class)));
+
+
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 3ec95386cb8..cbb921ed9d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
 @Slf4j
 public class MultiTopicsReaderImpl<T> implements Reader<T> {
@@ -146,7 +147,8 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        return multiTopicsConsumer.receiveAsync().thenApply(msg -> {
+        CompletableFuture<Message<T>> originalFuture = multiTopicsConsumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
             multiTopicsConsumer.acknowledgeCumulativeAsync(msg)
                     .exceptionally(ex -> {
                         log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
@@ -155,6 +157,10 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
                     });
             return msg;
         });
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index c4b5263736e..04f8706f21c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
 @Slf4j
 public class ReaderImpl<T> implements Reader<T> {
@@ -177,17 +178,20 @@ public class ReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
+        CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+            consumer.acknowledgeCumulativeAsync(msg)
+                    .exceptionally(ex -> {
+                        log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+                                getConsumer().getSubscription(), msg.getMessageId(), ex);
+                        return null;
+                    });
+            return msg;
         });
-        return receiveFuture;
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override