You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/21 15:14:46 UTC
[pulsar] branch master updated: Support to set
subscriptionTopicsMode via properties. (#3395)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9370c3 Support to set subscriptionTopicsMode via properties. (#3395)
b9370c3 is described below
commit b9370c3615b1c16bfb9aaa94f6b5f466f2ef81a3
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Mon Jan 21 23:14:42 2019 +0800
Support to set subscriptionTopicsMode via properties. (#3395)
---
.../client/kafka/compat/PulsarConsumerKafkaConfig.java | 14 ++++++++++++++
site2/docs/adaptors-kafka.md | 1 +
2 files changed, 15 insertions(+)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 13eb002..a527827 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.client.kafka.compat;
+import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
public class PulsarConsumerKafkaConfig {
@@ -31,6 +33,7 @@ public class PulsarConsumerKafkaConfig {
public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+ public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
@@ -53,6 +56,17 @@ public class PulsarConsumerKafkaConfig {
Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
}
+ if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+ RegexSubscriptionMode mode;
+ try {
+ mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+ + Arrays.asList(RegexSubscriptionMode.values()));
+ }
+ consumerBuilder.subscriptionTopicsMode(mode);
+ }
+
return consumerBuilder;
}
}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index f10b7d7..21c9b26 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -260,4 +260,5 @@ You can configure Pulsar authentication provider directly from the Kafka propert
| [`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-) | 1000 | Set the size of the consumer receiver queue |
| [`pulsar.consumer.acknowledgments.group.time.millis`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-) | 100 | Set the max amount of group time for consumers to send out the acknowledgments to the broker |
| [`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-) | 50000 | Set the max total receiver queue size across partitions |
+| [`pulsar.consumer.subscription.topics.mode`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-) | PersistentOnly | Set the subscription topic mode for consumers |