You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/15 08:05:49 UTC

(pulsar) branch branch-3.2 updated: [improve][broker] Add createTopicIfDoesNotExist option to RawReader constructor (#22264)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new c94b56b0c16 [improve][broker] Add createTopicIfDoesNotExist option to RawReader constructor (#22264)
c94b56b0c16 is described below

commit c94b56b0c16e47e0964732f1231582393981e35d
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Thu Mar 14 16:45:15 2024 +0800

    [improve][broker] Add createTopicIfDoesNotExist option to RawReader constructor (#22264)
---
 .../org/apache/pulsar/client/api/RawReader.java    |  8 +++++++-
 .../apache/pulsar/client/impl/RawReaderImpl.java   | 10 +++++-----
 .../org/apache/pulsar/compaction/Compactor.java    |  2 +-
 .../apache/pulsar/client/impl/RawReaderTest.java   | 23 ++++++++++++++++++++++
 4 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 92a2c89f9bc..b7805c36b3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -32,8 +32,14 @@ public interface RawReader {
      */
 
     static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
+        return create(client, topic, subscription, true);
+    }
+
+    static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription,
+                                               boolean createTopicIfDoesNotExist) {
         CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
-        RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
+        RawReader r =
+                new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future, createTopicIfDoesNotExist);
         return future.thenApply(__ -> r);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index f6523241399..3d7ad9f5865 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -51,7 +51,8 @@ public class RawReaderImpl implements RawReader {
     private RawConsumerImpl consumer;
 
     public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
-                         CompletableFuture<Consumer<byte[]>> consumerFuture) {
+                         CompletableFuture<Consumer<byte[]>> consumerFuture,
+                         boolean createTopicIfDoesNotExist) {
         consumerConfiguration = new ConsumerConfigurationData<>();
         consumerConfiguration.getTopicNames().add(topic);
         consumerConfiguration.setSubscriptionName(subscription);
@@ -61,8 +62,7 @@ public class RawReaderImpl implements RawReader {
         consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
         consumerConfiguration.setAckReceiptEnabled(true);
 
-        consumer = new RawConsumerImpl(client, consumerConfiguration,
-                                       consumerFuture);
+        consumer = new RawConsumerImpl(client, consumerConfiguration, consumerFuture, createTopicIfDoesNotExist);
     }
 
     @Override
@@ -111,7 +111,7 @@ public class RawReaderImpl implements RawReader {
         final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
 
         RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
-                CompletableFuture<Consumer<byte[]>> consumerFuture) {
+                CompletableFuture<Consumer<byte[]>> consumerFuture, boolean createTopicIfDoesNotExist) {
             super(client,
                     conf.getSingleTopic(),
                     conf,
@@ -123,7 +123,7 @@ public class RawReaderImpl implements RawReader {
                     MessageId.earliest,
                     0 /* startMessageRollbackDurationInSec */,
                     Schema.BYTES, null,
-                    false
+                    createTopicIfDoesNotExist
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index e93a642c76e..983443432ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -56,7 +56,7 @@ public abstract class Compactor {
     }
 
     public CompletableFuture<Long> compact(String topic) {
-        return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION).thenComposeAsync(
+        return RawReader.create(pulsar, topic, COMPACTION_SUBSCRIPTION, false).thenComposeAsync(
                 this::compactAndCloseReader, scheduler);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 95d8926c9ff..d3fcc36a546 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -36,15 +37,18 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -496,4 +500,23 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
             }
         }
     }
+
+    @Test
+    public void testAutoCreateTopic() throws ExecutionException, InterruptedException, PulsarAdminException {
+        String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");
+
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        TopicStats stats = admin.topics().getStats(topic);
+        Assert.assertNotNull(stats);
+        reader.closeAsync().join();
+
+        String topic2 = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader");
+        try {
+            reader = RawReader.create(pulsarClient, topic2, subscription, false).get();
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e.getCause() instanceof PulsarClientException.TopicDoesNotExistException);
+        }
+        reader.closeAsync().join();
+    }
 }