You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/04 12:33:11 UTC
[pulsar] 02/15: [Broker] Optimize RawReader#create when using Compactor (#14447)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d9edbef8122ca1d33185394822bf3143e50a16e1
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Thu Feb 24 23:14:31 2022 +0800
[Broker] Optimize RawReader#create when using Compactor (#14447)
(cherry picked from commit 992dfbd81f3dbdc648c30ff73e37b6c9ae2b276e)
---
pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java | 2 +-
.../src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 2 ++
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index f74157a9389..fe068f2f943 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -34,7 +34,7 @@ public interface RawReader {
static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl) client, topic, subscription, future);
- return future.thenCompose(x -> x.seekAsync(MessageId.earliest)).thenApply(__ -> r);
+ return future.thenApply(__ -> r);
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 217dd5ccc85..6b032370898 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
@@ -56,6 +57,7 @@ public class RawReaderImpl implements RawReader {
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
+ consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);