You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/24 04:34:56 UTC
[incubator-pulsar] branch master updated: move
subscriptionInitialPosition from parameter into ConfigurationData (#1434)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6424b6e move subscriptionInitialPosition from parameter into ConfigurationData (#1434)
6424b6e is described below
commit 6424b6e4f988c5cfc6922d6cf61b234d6625c17d
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Fri Mar 23 21:34:54 2018 -0700
move subscriptionInitialPosition from parameter into ConfigurationData (#1434)
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 3 +-
.../client/api/SimpleProducerConsumerTest.java | 41 ++++++++++++++++++++++
.../client/api/SubscriptionInitialPosition.java | 15 ++++++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 ++--
.../org/apache/pulsar/client/impl/ReaderImpl.java | 3 +-
.../org/apache/pulsar/common/api/Commands.java | 6 ++--
6 files changed, 64 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 3dc4e56..61a1fc2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -103,7 +102,7 @@ public class RawReaderImpl implements RawReader {
RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
- consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY, SubscriptionInitialPosition.Earliest);
+ consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2cc4cf9..f378d16 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2333,4 +2333,45 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testConsumerSubscriptionInitialize() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ String topicName = "persistent://my-property/use/my-ns/test-subscription-initialize-topic";
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .create();
+
+ // 1, produce 5 messages
+ for (int i = 0; i < 5; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // 2, create consumer
+ Consumer<byte[]> defaultConsumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("test-subscription-default").subscribe();
+ Consumer<byte[]> latestConsumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("test-subscription-latest").subscriptionInitialPosition(SubscriptionInitialPosition.Latest).subscribe();
+ Consumer<byte[]> earliestConsumer = pulsarClient.newConsumer().topic(topicName)
+ .subscriptionName("test-subscription-earliest").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+ // 3, produce 5 messages more
+ for (int i = 5; i < 10; i++) {
+ final String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // 4, verify consumer get right message.
+ assertEquals(defaultConsumer.receive().getData(), "my-message-5".getBytes());
+ assertEquals(latestConsumer.receive().getData(), "my-message-5".getBytes());
+ assertEquals(earliestConsumer.receive().getData(), "my-message-0".getBytes());
+
+ defaultConsumer.close();
+ latestConsumer.close();
+ earliestConsumer.close();
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
index 05deee3..e28953f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/SubscriptionInitialPosition.java
@@ -27,10 +27,21 @@ public enum SubscriptionInitialPosition {
/**
* the latest position which means the start consuming position will be the last message
*/
- Latest,
+ Latest(0),
/**
* the earliest position which means the start consuming position will be the first message
*/
- Earliest,
+ Earliest(1),
+ ;
+
+
+ private final int value;
+
+ SubscriptionInitialPosition(int value) {
+ this.value = value;
+ }
+
+ public final int getValue() { return value; }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 067a677..46709d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -138,12 +139,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
- this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, SubscriptionInitialPosition.Latest);
+ this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
}
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, SubscriptionInitialPosition subscriptionInitialPosition) {
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
@@ -581,7 +582,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
- consumerName, isDurable, startMessageIdData, metadata, readCompacted);
+ consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()));
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 881a69c..ed374f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -32,7 +32,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -83,7 +82,7 @@ public class ReaderImpl<T> implements Reader<T> {
}
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, SubscriptionInitialPosition.Latest);
+ -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
}
@Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 59d6780..dcea2b2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
@@ -301,12 +302,12 @@ public class Commands {
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
- true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false);
+ true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest);
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
- Map<String, String> metadata, boolean readCompacted) {
+ Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
@@ -317,6 +318,7 @@ public class Commands {
subscribeBuilder.setPriorityLevel(priorityLevel);
subscribeBuilder.setDurable(isDurable);
subscribeBuilder.setReadCompacted(readCompacted);
+ subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.