You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/24 04:34:56 UTC

[GitHub] merlimat closed pull request #1434: Move config subscriptionInitialPosition from parameter into ConfigurationData

merlimat closed pull request #1434: Move config subscriptionInitialPosition from parameter into ConfigurationData
URL: https://github.com/apache/incubator-pulsar/pull/1434
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3dc4e5612..61a1fc267 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,7 +31,6 @@
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -103,7 +102,7 @@ public String toString() {
         RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
                 CompletableFuture<Consumer<byte[]>> consumerFuture) {
             super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
-                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY, SubscriptionInitialPosition.Earliest);
+                    consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2cc4cf93c..f378d1647 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2333,4 +2333,45 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testConsumerSubscriptionInitialize() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/use/my-ns/test-subscription-initialize-topic";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topicName)
+            .create();
+
+        // 1, produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // 2, create consumer
+        Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-default").subscribe();
+        Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
+        Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+        // 3, produce 5 messages more
+        for (int i = 5; i < 10; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // 4, verify consumer get right message.
+        assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes());
+        assertEquals(earliestConsumer.receive().getData(), "my-message-0".getBytes());
+
+        defaultConsumer.close();
+        latestConsumer.close();
+        earliestConsumer.close();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
index 05deee3b0..e28953f65 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -27,10 +27,21 @@
     /**
      * the latest position which means the start consuming position will be the last message
      */
-    Latest,
+    Latest(0),
 
     /**
      * the earliest position which means the start consuming position will be the first message
      */
-    Earliest,
+    Earliest(1),
+    ;
+
+
+    private final int value;
+
+    SubscriptionInitialPosition(int value) {
+        this.value = value;
+    }
+
+    public final int getValue() { return value; }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 067a67770..46709d20a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,6 +63,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -138,12 +139,12 @@
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest);
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) {
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -581,7 +582,7 @@ public void connectionOpened(final ClientCnx cnx) {
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted);
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()));
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 881a69c26..ed374f663 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -32,7 +32,6 @@
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -83,7 +82,7 @@ public void reachedEndOfTopic(Consumer<T> consumer) {
         }
 
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest);
+                -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
     }
 
     @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 59d678077..dcea2b2c8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -66,6 +66,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
@@ -301,12 +302,12 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false);
+                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -317,6 +318,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         subscribeBuilder.setPriorityLevel(priorityLevel);
         subscribeBuilder.setDurable(isDurable);
         subscribeBuilder.setReadCompacted(readCompacted);
+        subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services