You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/26 17:50:04 UTC
[pulsar] branch master updated: Support Kafka's
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will
return in a single poll. (#3887)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b42c1e5 Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)
b42c1e5 is described below
commit b42c1e555b0091de93f65ea9903058e314eb84d3
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Tue Mar 26 10:49:57 2019 -0700
Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)
Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.
---
.../apache/kafka/clients/consumer/PulsarKafkaConsumer.java | 13 ++++++++++---
site2/docs/adaptors-kafka.md | 4 ++--
2 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 5b3c53c..0592a4b 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -88,6 +88,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private volatile boolean closed = false;
+ private final int maxRecordsInSinglePoll;
+
private final Properties properties;
private static class QueueItem {
@@ -153,6 +155,13 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+ // If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
+ if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
+ maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
+ } else {
+ maxRecordsInSinglePoll = 1000;
+ }
+
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
@@ -304,8 +313,6 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
});
}
- private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;
-
@SuppressWarnings("unchecked")
@Override
public ConsumerRecords<K, V> poll(long timeoutMillis) {
@@ -354,7 +361,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
lastReceivedOffset.put(tp, offset);
unpolledPartitions.remove(tp);
- if (++numberOfRecords >= MAX_RECORDS_IN_SINGLE_POLL) {
+ if (++numberOfRecords >= maxRecordsInSinglePoll) {
break;
}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index cb97fe2..104f062 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -198,7 +198,7 @@ Properties:
| Config property | Supported | Notes |
|:--------------------------------|:----------|:------------------------------------------------------|
| `group.id` | Yes | Maps to a Pulsar subscription name |
-| `max.poll.records` | Ignored | |
+| `max.poll.records` | Yes | |
| `max.poll.interval.ms` | Ignored | Messages are "pushed" from broker |
| `session.timeout.ms` | Ignored | |
| `heartbeat.interval.ms` | Ignored | |
@@ -206,7 +206,7 @@ Properties:
| `enable.auto.commit` | Yes | |
| `auto.commit.interval.ms` | Ignored | With auto-commit, acks are sent immediately to broker |
| `partition.assignment.strategy` | Ignored | |
-| `auto.offset.reset` | Ignored | |
+| `auto.offset.reset` | Yes | Only support earliest and latest. |
| `fetch.min.bytes` | Ignored | |
| `fetch.max.bytes` | Ignored | |
| `fetch.max.wait.ms` | Ignored | |