You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/26 17:50:04 UTC

[pulsar] branch master updated: Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b42c1e5  Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)
b42c1e5 is described below

commit b42c1e555b0091de93f65ea9903058e314eb84d3
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Tue Mar 26 10:49:57 2019 -0700

    Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)
    
    Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.
---
 .../apache/kafka/clients/consumer/PulsarKafkaConsumer.java  | 13 ++++++++++---
 site2/docs/adaptors-kafka.md                                |  4 ++--
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 5b3c53c..0592a4b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -88,6 +88,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
     private volatile boolean closed = false;
 
+    private final int maxRecordsInSinglePoll;
+
     private final Properties properties;
 
     private static class QueueItem {
@@ -153,6 +155,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
 
         String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
+        // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
+        if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
+            maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+        } else {
+            maxRecordsInSinglePoll = 1000;
+        }
+
         this.properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
         ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
@@ -304,8 +313,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         });
     }
 
-    private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;
-
     @SuppressWarnings("unchecked")
     @Override
     public ConsumerRecords<K, V> poll(long timeoutMillis) {
@@ -354,7 +361,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 lastReceivedOffset.put(tp, offset);
                 unpolledPartitions.remove(tp);
 
-                if (++numberOfRecords >= MAX_RECORDS_IN_SINGLE_POLL) {
+                if (++numberOfRecords >= maxRecordsInSinglePoll) {
                     break;
                 }
 
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index cb97fe2..104f062 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -198,7 +198,7 @@ Properties:
 | Config property                 | Supported | Notes                                                 |
 |:--------------------------------|:----------|:------------------------------------------------------|
 | `group.id`                      | Yes       | Maps to a Pulsar subscription name                    |
-| `max.poll.records`              | Ignored   |                                                       |
+| `max.poll.records`              | Yes       |                                                       |
 | `max.poll.interval.ms`          | Ignored   | Messages are "pushed" from broker                     |
 | `session.timeout.ms`            | Ignored   |                                                       |
 | `heartbeat.interval.ms`         | Ignored   |                                                       |
@@ -206,7 +206,7 @@ Properties:
 | `enable.auto.commit`            | Yes       |                                                       |
 | `auto.commit.interval.ms`       | Ignored   | With auto-commit, acks are sent immediately to broker |
 | `partition.assignment.strategy` | Ignored   |                                                       |
-| `auto.offset.reset`             | Ignored   |                                                       |
+| `auto.offset.reset`             | Yes       | Only support earliest and latest.                     |
 | `fetch.min.bytes`               | Ignored   |                                                       |
 | `fetch.max.bytes`               | Ignored   |                                                       |
 | `fetch.max.wait.ms`             | Ignored   |                                                       |