You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/25 13:11:33 UTC

[GitHub] XiaoZYang closed pull request #1111: 1. add client configuration item *seekPosition*

XiaoZYang closed pull request #1111: 1. add client configuration item *seekPosition*
URL: https://github.com/apache/incubator-pulsar/pull/1111
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore.swp b/.gitignore.swp
deleted file mode 100644
index 692019ee7..000000000
Binary files a/.gitignore.swp and /dev/null differ
diff --git a/conf/client.conf b/conf/client.conf
index 5afa987e1..f2fd1556c 100644
--- a/conf/client.conf
+++ b/conf/client.conf
@@ -25,3 +25,4 @@ brokerServiceUrl=pulsar://localhost:6650/
 #useTls=
 #tlsAllowInsecureConnection
 #tlsTrustCertsFilePath
+seekPosition=latest
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 2da67eb0f..1160f25aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -278,4 +278,43 @@
      * @return a future to track the completion of the seek operation
      */
     CompletableFuture<Void> seekAsync(MessageId messageId);
+
+
+    /**
+     * Reset the subscription associated with this consumer to a configured message id.
+     * <p>
+     *
+     * The message id can either be the first or last messages in the topic which is determined by your client configuration.
+     * Corresponding configuration is <b>seekPosition</b>, legal choice has to be one of following:
+     * <p>
+     * <ul>
+     * <li>earliest: Reset the subscription on the earliest message available in the topic
+     * <li>latest: Reset the subscription on the latest message in the topic, it's the default configuration if not given
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     */
+    void seek() throws PulsarClientException;
+
+    /**
+     * Reset the subscription associated with this consumer to a configured message id.
+     * <p>
+     *
+     * The message id can either be the first or last messages in the topic which is determined by your client configuration.
+     * Corresponding configuration is <b>seekPosition</b>, legal choice has to be one of following:
+     * <p>
+     * <ul>
+     * <li>earliest: Reset the subscription on the earliest message available in the topic
+     * <li>latest: Reset the subscription on the latest message in the topic, it's the default configuration if not given
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     */
+    CompletableFuture<Void> seekAsync();
+
+    public final static String SeekPositonKey = "seekPosition";
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 845ad4467..c5991b193 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -46,6 +46,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.client.util.SeekPosition;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
 import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -106,6 +107,7 @@
 
     private final Map<String, String> metadata;
 
+    private final MessageId seekPosition;
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will retain messages and persist the current
         // position
@@ -165,6 +167,8 @@
             metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
         }
 
+        this.seekPosition = SeekPosition.getPosition(metadata.get(Consumer.SeekPositonKey));
+
         grabCnx();
     }
 
@@ -1204,6 +1208,20 @@ public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
         }
     }
 
+    @Override
+    public void seek() throws PulsarClientException {
+        try {
+            seekAsync().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync() {
+        return seekAsync(this.seekPosition);
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
         try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 2d7830e2a..29f287d1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -492,6 +492,22 @@ public void seek(MessageId messageId) throws PulsarClientException {
         return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics"));
     }
 
+    @Override
+    public void seek() throws PulsarClientException {
+        try {
+            seekAsync().get();
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        } catch (InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync() {
+        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics"));
+    }
+
     /**
      * helper method that returns current state of data structure used to track acks for batch messages
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/SeekPosition.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/SeekPosition.java
new file mode 100644
index 000000000..1cc70a41b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/SeekPosition.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+public class SeekPosition {
+    public final static String seekPositionLatest = "latest";
+    public final static String seekPositionEarliest = "earliest";
+    public final static String seekPositionDefault = seekPositionLatest;
+
+    private final static Map<String, MessageId> positions;
+    
+    static {
+        HashMap<String, MessageId> _positions = new HashMap<String, MessageId>();
+        _positions.put(seekPositionLatest, MessageIdImpl.latest);
+        _positions.put(seekPositionEarliest, MessageIdImpl.earliest);
+        positions = Collections.unmodifiableMap(_positions);
+    }
+
+    
+    public static MessageId getPosition(String key){
+        if(positions.containsKey(key)){
+            return positions.get(key);
+        }
+        return positions.get(seekPositionDefault);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SeekPositionTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SeekPositionTest.java
new file mode 100644
index 000000000..623d87b6d
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SeekPositionTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.util.SeekPosition;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class SeekPositionTest {
+    @Test
+    void TestSeekPositionUtil() {
+        MessageId earlist = MessageIdImpl.earliest;
+        MessageId latest = MessageIdImpl.latest;
+        MessageId defaul = SeekPosition.getPosition(SeekPosition.seekPositionDefault);
+        Assert.assertEquals(SeekPosition.getPosition(SeekPosition.seekPositionEarliest), earlist);
+        Assert.assertEquals(SeekPosition.getPosition(SeekPosition.seekPositionLatest), latest);
+        Assert.assertEquals(SeekPosition.getPosition(null), defaul);
+        Assert.assertEquals(SeekPosition.getPosition("123"), defaul);
+        Assert.assertEquals(SeekPosition.getPosition("hello"), defaul);   
+    }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services