You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:11 UTC

[pulsar] 02/15: [Broker] Optimize RawReader#create when using Compactor (#14447)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9edbef8122ca1d33185394822bf3143e50a16e1
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Feb 24 23:14:31 2022 +0800

    [Broker] Optimize RawReader#create when using Compactor (#14447)
    
    (cherry picked from commit 992dfbd81f3dbdc648c30ff73e37b6c9ae2b276e)
---
 pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java | 2 +-
 .../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java      | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index f74157a9389..fe068f2f943 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -34,7 +34,7 @@ public interface RawReader {
     static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
         CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
         RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
-        return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r);
+        return future.thenApply(__ -> r);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 217dd5ccc85..6b032370898 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -56,6 +57,7 @@ public class RawReaderImpl implements RawReader {
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
         consumerConfiguration.setReadCompacted(true);
+        consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
 
         consumer = new RawConsumerImpl(client, consumerConfiguration,
                                        consumerFuture);