You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/21 15:14:46 UTC

[pulsar] branch master updated: Support to set subscriptionTopicsMode via properties. (#3395)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b9370c3  Support to set subscriptionTopicsMode via properties. (#3395)
b9370c3 is described below

commit b9370c3615b1c16bfb9aaa94f6b5f466f2ef81a3
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Mon Jan 21 23:14:42 2019 +0800

    Support to set subscriptionTopicsMode via properties. (#3395)
---
 .../client/kafka/compat/PulsarConsumerKafkaConfig.java     | 14 ++++++++++++++
 site2/docs/adaptors-kafka.md                               |  1 +
 2 files changed, 15 insertions(+)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 13eb002..a527827 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.client.kafka.compat;
 
+import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
 
 public class PulsarConsumerKafkaConfig {
 
@@ -31,6 +33,7 @@ public class PulsarConsumerKafkaConfig {
     public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
     public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
     public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+    public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
 
     public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
         ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
@@ -53,6 +56,17 @@ public class PulsarConsumerKafkaConfig {
                     Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
         }
 
+        if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+            RegexSubscriptionMode mode;
+            try {
+                mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+                    + Arrays.asList(RegexSubscriptionMode.values()));
+            }
+            consumerBuilder.subscriptionTopicsMode(mode);
+        }
+
         return consumerBuilder;
     }
 }
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index f10b7d7..21c9b26 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -260,4 +260,5 @@ You can configure Pulsar authentication provider directly from the Kafka propert
 | [`pulsar.consumer.receiver.queue.size`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setReceiverQueueSize-int-) | 1000 | Set the size of the consumer receiver queue |
 | [`pulsar.consumer.acknowledgments.group.time.millis`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#acknowledgmentGroupTime-long-java.util.concurrent.TimeUnit-) | 100 | Set the max amount of group time for consumers to send out the acknowledgments to the broker |
 | [`pulsar.consumer.total.receiver.queue.size.across.partitions`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setMaxTotalReceiverQueueSizeAcrossPartitions-int-) | 50000 | Set the max total receiver queue size across partitions |
+| [`pulsar.consumer.subscription.topics.mode`](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#subscriptionTopicsMode-Mode-) | PersistentOnly | Set the subscription topic mode for consumers |