You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/24 02:48:50 UTC

[incubator-pulsar.wiki] branch master updated: Created PIP-13: Subscribe to topics represented by regular expressions (markdown)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new 701e706  Created PIP-13: Subscribe to topics represented by regular expressions (markdown)
701e706 is described below

commit 701e706db5c8288b46c6867166b96c4c63e4a7c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 23 18:48:48 2018 -0800

    Created PIP-13: Subscribe to topics represented by regular expressions (markdown)
---
 ...to-topics-represented-by-regular-expressions.md | 87 ++++++++++++++++++++++
 1 file changed, 87 insertions(+)

diff --git a/PIP-13:-Subscribe-to-topics-represented-by-regular-expressions.md b/PIP-13:-Subscribe-to-topics-represented-by-regular-expressions.md
new file mode 100644
index 0000000..92b288a
--- /dev/null
+++ b/PIP-13:-Subscribe-to-topics-represented-by-regular-expressions.md
@@ -0,0 +1,87 @@
+ * **Status**: Proposal
+ * **Author**: Jia Zhai - Streamlio
+ * **Pull Request**: []
+ * **Mailing List discussion**: 
+
+
+## Motivation
+The consumer needs to handle subscription to topics represented by regular expressions. The scope is `namespace` in first stage, all topics/patten should be targeted in same namespace, This will make easy authentication and authorization control.
+
+At last, we should add and implementation a serials of new methods in `PulsarClient.java`
+```java
+Consumer subscribe(Collection<String> topics, String subscription);
+Consumer subscribe(Pattern topicsPattern, String subscription);
+```
+
+The goals the should be achieved are these below, we could achieve it one by one:
+- support subscription to multiple topics in the same namespace (no guarantee on ordering between topics)
+- support regex based subscription
+- auto-discover topic addition/deletion
+
+## Design
+
+### support subscription to multiple topics
+This will need a new implementation of `ConsumerBase` which wrapper over multiple single-topic-consumers, let’s name it as `TopicsConsumerImpl`. 
+When user call new method
+`Consumer subscribe(Collection<String> topics, String subscription);`
+It will iteratively new a `ConsumerImpl` for each topic, and return a `TopicsConsumerImpl`. The main work is:
+ 
+1. This `TopicsConsumerImpl` class should provide implementation of abstract methods in `ConsumerBase`, Should also provide some specific methods such as:
+```java
+// maintain a map for all the <Topic, Consumer>, after we subscribe all the topics.
+private final ConcurrentMap<String, ConsumerImpl> consumers = new ConcurrentHashMap<>();
+// get topics
+Set<String> getTopics();
+// get consumers
+List<ConsumerImpl> getConsumers();
+
+// subscribe a topic
+void subscribeTopic(String topic);
+// unSubscribe a topic
+void unSubscribeTopic(String topic);
+```
+
+2. While Message receive/ack, the message identify is needed. In the implementation, we need handle Message identify(MessageId) differently for some of the abstract methods in `ConsumerBase`, because we have to add `MessageId` with additional `String topic` or `consumer id`, Or we may need to change `MessageIdData` in `PulsarApi.proto`.
+
+
+
+### support regex based subscription.
+As mentioned before, the scope is `namespace`. The main work is:
+
+1. In above `TopicsConsumerImpl` class, need to keep the `Pattern`, which was passed in from api for subscription.
+2. leverage currently pulsar admin API of `getList` to get a list of Topics.
+In `interface PersistentTopics `:
+```java
+List<String> getList(String namespace) throws PulsarAdminException;
+List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException;
+```
+
+3. The process of new method `Consumer subscribe(String namespace, Pattern topicsPattern, String subscription)` should be like this:
+- call method `List<String> getList(String namespace)` to get all the topics;
+- Use `topicsPattern` to filter out the matched sub-topics-list.
+- construct the `TopicsConsumerImpl` with the the sub-topics-list.
+
+### auto-discover topic addition/deletion
+The main work is:
+1. provide a listener, which based on topics changes, to do subscribe and unsubscribe on individual topic when target topic been changed(remove/add).
+```java
+Interface TopicsChangeListener {
+	// unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
+	void onTopicsRemoved(Collection<String> topics);
+	// subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
+	void onTopicsAdded(Collection<String> topics);
+}
+```
+Add a method `void registerListener(TopicsChangeListener listener)` to `TopicsConsumerImpl`
+
+2. Based on above work, using a timer, periodically call `List<String> getList(String namespace)`. And comparing the filtered fresh sub-topics-list with current topics holden in `TopicsConsumerImpl`, try to get 2 lists: `newAddedTopicsList` and  `removedTopicsList`.
+3. If the 2 lists not empty, call `TopicsChangeListener.onTopicsAdded(newAddedTopicsList)`, and `TopicsChangeListener.onTopicsRemoved(removedTopicsList)` to do subscribe and unsubscribe, and update `consumers` map in `TopicsConsumerImpl`.
+
+# Changes
+The changes will be mostly on the surface and on client side:
+1. add and implementation a serials of new methods in `org.apache.pulsar.client.api.PulsarClient.java`
+```java
+Consumer subscribe(Collection<String> topics, String subscription);
+Consumer subscribe(Pattern topicsPattern, String subscription);
+```
+2. add and implenentation of new `Consumer`, which is `TopicsConsumerImpl` , returned by above `subscribe` method
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.