You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/05 03:05:02 UTC

[GitHub] merlimat closed pull request #3273: Adding config auto.offset.reset to PulsarKafkaConsumer

merlimat closed pull request #3273: Adding config auto.offset.reset to PulsarKafkaConsumer
URL: https://github.com/apache/pulsar/pull/3273
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index cf1e716b76..5b5def2aef 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -21,7 +21,9 @@
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -53,6 +55,7 @@
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -80,6 +83,8 @@
 
     private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
     private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
+    private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
+    private final SubscriptionInitialPosition strategy;
 
     private volatile boolean closed = false;
 
@@ -143,6 +148,8 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ
 
         groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
         isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        log.info("Offset reset strategy has been assigned value {}", strategy);
 
         String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
@@ -159,6 +166,15 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ
         }
     }
 
+    private SubscriptionInitialPosition getStrategy(final String strategy) {
+        switch(strategy) {
+    	    case "earliest":
+    	        return SubscriptionInitialPosition.Earliest;
+    	    default:
+                return SubscriptionInitialPosition.Latest;
+    	}
+    }
+    
     @Override
     public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
         // Block listener thread if the application is slowing down
@@ -243,7 +259,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
                     topicPartitions.add(tp);
                 }
             }
-
+            unpolledPartitions.addAll(topicPartitions);
+            
             // Wait for all consumers to be ready
             futures.forEach(CompletableFuture::join);
 
@@ -311,6 +328,10 @@ public void unsubscribe() {
                 long offset = MessageIdUtils.getOffset(msgId);
 
                 TopicPartition tp = new TopicPartition(topic, partition);
+                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
+                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
+                	resetOffsets(tp);
+                }
 
                 K key = getKey(topic, msg);
                 V value = valueDeserializer.deserialize(topic, msg.getData());
@@ -331,6 +352,7 @@ public void unsubscribe() {
 
                 // Update last offset seen by application
                 lastReceivedOffset.put(tp, offset);
+                unpolledPartitions.remove(tp);
 
                 if (++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
                     break;
@@ -452,7 +474,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
         }
         lastCommittedOffset.clear();
         lastReceivedOffset.clear();
-
+        
         for (TopicPartition tp : partitions) {
             org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
             if (c == null) {
@@ -492,9 +514,22 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
     @Override
     public long position(TopicPartition partition) {
         Long offset = lastReceivedOffset.get(partition);
-        return offset != null ? offset : -1l;
+        if (offset == null && !unpolledPartitions.contains(partition)) {
+            return resetOffsets(partition).getValue();
+        }
+        return unpolledPartitions.contains(partition) ? 0 : offset;
     }
 
+    private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+    	log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+        if (strategy == SubscriptionInitialPosition.Earliest) {
+            seekToBeginning(Collections.singleton(partition));
+        } else {
+            seekToEnd(Collections.singleton(partition));
+        } 
+        return strategy;
+    }
+    
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
         return lastCommittedOffset.get(partition);
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 36aa224eab..49ef3f36b9 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -129,6 +129,7 @@ Properties:
 | Config property                         | Supported | Notes                                                                         |
 |:----------------------------------------|:----------|:------------------------------------------------------------------------------|
 | `acks`                                  | Ignored   | Durability and quorum writes are configured at the namespace level            |
+| `auto.offset.reset`			  | Yes       | Will have a default value of 'latest' if user does not give specific setting. |
 | `batch.size`                            | Ignored   |                                                                               |
 | `block.on.buffer.full`                  | Yes       | If true it will block producer, otherwise give error                          |
 | `bootstrap.servers`                     | Yes       | Needs to point to a single Pulsar service URL                                 |


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services