You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/24 04:34:56 UTC

[incubator-pulsar] branch master updated: move subscriptionInitialPosition from parameter into ConfigurationData (#1434)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6424b6e  move subscriptionInitialPosition from parameter into ConfigurationData (#1434)
6424b6e is described below

commit 6424b6e4f988c5cfc6922d6cf61b234d6625c17d
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Fri Mar 23 21:34:54 2018 -0700

    move subscriptionInitialPosition from parameter into ConfigurationData (#1434)
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  3 +-
 .../client/api/SimpleProducerConsumerTest.java     | 41 ++++++++++++++++++++++
 .../client/api/SubscriptionInitialPosition.java    | 15 ++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  7 ++--
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  3 +-
 .../org/apache/pulsar/common/api/Commands.java     |  6 ++--
 6 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3dc4e56..61a1fc2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -103,7 +102,7 @@ public class RawReaderImpl implements RawReader {
         RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
             super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY, SubscriptionInitialPosition.Earliest);
+                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2cc4cf9..f378d16 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2333,4 +2333,45 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testConsumerSubscriptionInitialize() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/use/my-ns/test-subscription-initialize-topic";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .create();
+
+        // 1, produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // 2, create consumer
+        Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-default").subscribe();
+        Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
+        Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+        // 3, produce 5 messages more
+        for (int i = 5; i < 10; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // 4, verify consumer get right message.
+        assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(earliestConsumer.receive().getData(), "my-message-0".getBytes());
+
+        defaultConsumer.close();
+        latestConsumer.close();
+        earliestConsumer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
index 05deee3..e28953f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -27,10 +27,21 @@ public enum SubscriptionInitialPosition {
     /**
      * the latest position which means the start consuming position will be the last message
      */
-    Latest,
+    Latest(0),
 
     /**
      * the earliest position which means the start consuming position will be the first message
      */
-    Earliest,
+    Earliest(1),
+    ;
+
+
+    private final int value;
+
+    SubscriptionInitialPosition(int value) {
+        this.value = value;
+    }
+
+    public final int getValue() { return value; }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 067a677..46709d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -138,12 +139,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest);
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) {
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -581,7 +582,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted);
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()));
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 881a69c..ed374f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -83,7 +82,7 @@ public class ReaderImpl<T> implements Reader<T> {
         }
 
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest);
+                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
     }
 
     @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 59d6780..dcea2b2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
@@ -301,12 +302,12 @@ public class Commands {
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false);
+                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -317,6 +318,7 @@ public class Commands {
         subscribeBuilder.setPriorityLevel(priorityLevel);
         subscribeBuilder.setDurable(isDurable);
         subscribeBuilder.setReadCompacted(readCompacted);
+        subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.