You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/13 12:47:17 UTC

[pulsar] branch master updated: [feature][doc] Add reader interceptor support for Java clients (#15578)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd54508b619 [feature][doc] Add reader interceptor support for Java clients (#15578)
cd54508b619 is described below

commit cd54508b61986a03279031ca9c38680e5d576362
Author: momo-jun <60...@users.noreply.github.com>
AuthorDate: Fri May 13 20:47:10 2022 +0800

    [feature][doc] Add reader interceptor support for Java clients (#15578)
---
 site2/docs/assets/reader-interceptor.svg |  1 +
 site2/docs/client-libraries-java.md      | 67 ++++++++++++++++++++++++--------
 2 files changed, 52 insertions(+), 16 deletions(-)

diff --git a/site2/docs/assets/reader-interceptor.svg b/site2/docs/assets/reader-interceptor.svg
new file mode 100644
index 00000000000..30c41593a5e
--- /dev/null
+++ b/site2/docs/assets/reader-interceptor.svg
@@ -0,0 +1 @@
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:lucid="lucid" width="1048" height="236.89"><g transform="translate(-92.00000000000003 -276.9013361034965)" lucid:page-tab-id="0_0"><path d="M0 0h1870.87v1322.83H0z" fill="#fff"/><path d="M112 302.9a6 6 0 0 1 6-6h338.67a6 6 0 0 1 6 6v184.9a6 6 0 0 1-6 6H118a6 6 0 0 1-6-6z" fill="#fff"/><path d="M113.5 302.9c0 .83-.67 1.5-1.5 1.5s-1.5-.67-1.5-1.5.67-1.5 1.5-1.5 1.5.67 1.5 1.5zm5.06-5.92c0 .82-.67 1.5-1 [...]
\ No newline at end of file
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 13205461f74..0782142453f 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -4,9 +4,9 @@ title: Pulsar Java client
 sidebar_label: Java
 ---
 
-You can use a Pulsar Java client to create the Java [producer](#producer), [consumer](#consumer), [readers](#reader) and [TableView](#tableview) of messages and to perform [administrative tasks](admin-api-overview.md). The current Java client version is **{{pulsar:version}}**.
+You can use a Pulsar Java client to create the Java [producer](#producer), [consumer](#consumer), [reader](#reader) and [TableView](#tableview) of messages and to perform [administrative tasks](admin-api-overview.md). The current Java client version is **{{pulsar:version}}**.
 
-All the methods in [producer](#producer), [consumer](#consumer), [readers](#reader) and [TableView](#tableview) of a Java client are thread-safe.
+All the methods in [producer](#producer), [consumer](#consumer), [reader](#reader) and [TableView](#tableview) of a Java client are thread-safe.
 
 Javadoc for the Pulsar client is divided into two domains by package as follows.
 
@@ -404,7 +404,7 @@ This chapter explains the working process of cluster-level failover. For more im
 <!--DOCUSAURUS_CODE_TABS-->
 <!--Automatic cluster-level failover-->
 
-In automatic failover cluster, the primary cluster and backup cluster are aware of each other's availability. The automatic failover cluster performs the following actions without administrator intervention:
+In an automatic failover cluster, the primary cluster and backup cluster are aware of each other's availability. The automatic failover cluster performs the following actions without administrator intervention:
 
 1. The Pulsar client runs a probe task at intervals defined in `checkInterval`.
    
@@ -412,7 +412,7 @@ In automatic failover cluster, the primary cluster and backup cluster are aware
 
     2a) If there are healthy backup clusters, the Pulsar client switches to a backup cluster in the order defined in `secondary`.
 
-    2b) If there is no healthy backup cluster, the Pulsar client does not perform the switchover, and the probe task continues to look  for an available backup cluster.
+    2b) If there is no healthy backup cluster, the Pulsar client does not perform the switchover, and the probe task continues to look for an available backup cluster.
 
 3. The probe task checks whether the primary cluster functions well or not. 
 
@@ -428,13 +428,13 @@ In automatic failover cluster, the primary cluster and backup cluster are aware
 
 2. The probe task fetches the service URL configuration from the URL provider service, which is configured by `urlProvider`.
 
-    2a) If the service URL configuration is changed, the probe task  switches to the target cluster without checking the health status of the target cluster.
+    2a) If the service URL configuration is changed, the probe task switches to the target cluster without checking the health status of the target cluster.
 
     2b) If the service URL configuration is not changed, the Pulsar client does not perform the switchover.
 
 3. If the Pulsar client switches to the target cluster, the probe task continues to fetch service URL configuration from the URL provider service at intervals defined in `checkInterval`. 
 
-    3a) If the service URL configuration is changed, the probe task  switches to the target cluster without checking the health status of the target cluster.
+    3a) If the service URL configuration is changed, the probe task switches to the target cluster without checking the health status of the target cluster.
 
     3b) If the service URL configuration is not changed, it does not perform the switchover.
 
@@ -553,9 +553,9 @@ You can terminate the builder chain with `sendAsync()` and get a future return.
 
 ### Enable chunking
 
-Message [chunking](concepts-messaging.md#chunking) enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages at the consumer side. 
+Message [chunking](concepts-messaging.md#chunking) enables Pulsar to process large payload messages by splitting the message into chunks at the producer side and aggregating chunked messages on the consumer side. 
 
-The message chunking feature is OFF by default. The following is an example about how to enable message chunking when creating a producer.
+The message chunking feature is OFF by default. The following is an example of how to enable message chunking when creating a producer.
 
 ```java
 Producer<byte[]> producer = client.newProducer()
@@ -581,7 +581,7 @@ Consumer consumer = client.newConsumer()
         .subscribe();
 ```
 
-The `subscribe` method will auto subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any received message, and then [acknowledges](reference-terminology.md#acknowledgment-ack) that the message has been processed. If the processing logic fails, you can use [negative acknowledgement](reference-terminology.md#acknowledgment [...]
+The `subscribe` method will auto-subscribe the consumer to the specified topic and subscription. One way to make the consumer listen on the topic is to set up a `while` loop. In this example loop, the consumer listens for messages, prints the contents of any received message, and then [acknowledges](reference-terminology.md#acknowledgment-ack) that the message has been processed. If the processing logic fails, you can use [negative acknowledgement](reference-terminology.md#acknowledgment [...]
 
 ```java
 while (true) {
@@ -698,7 +698,7 @@ consumer.acknowledge(messages)
 >
 > Batch receive policy limits the number and bytes of messages in a single batch. You can specify a timeout to wait for enough messages.
 >
-> The batch receive is completed if any of the following condition is met: enough number of messages, bytes of messages, wait timeout.
+> The batch receive is completed if any of the following conditions is met: enough number of messages, bytes of messages, wait timeout.
 >
 > ```java
 > Consumer consumer = client.newConsumer()
@@ -874,7 +874,7 @@ Pulsar has various [subscription types](concepts-messaging#subscription-types) t
 
 A subscription is identical with the subscription name; a subscription name can specify only one subscription type at a time. To change the subscription type, you should first stop all consumers of this subscription.
 
-Different subscription types have different message distribution types. This section describes the differences of subscription types and how to use them.
+Different subscription types have different message distribution types. This section describes the differences between subscription types and how to use them.
 
 In order to better describe their differences, assuming you have a topic named "my-topic", and the producer has published 10 messages.
 
@@ -980,7 +980,7 @@ Consumer consumer2 = client.newConsumer()
 //Both consumer1 and consumer 2 is active consumers.
 ```
 
-In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round robin distribution across consumers.
+In Shared subscription type, multiple consumers can attach to the same subscription and messages are delivered in a round-robin distribution across consumers.
 
 If a broker dispatches only one message at a time, consumer1 receives the following information.
 
@@ -1128,7 +1128,6 @@ Total hash range size is 65536, so the max end of the range should be less than
 Configuring chuncking for readers is similar to that for consumers. See [configure chunking for consumers](#configure-chunking) for more information.
 
 The following is an example of how to configure message chunking for a reader.
-
 ```java
 Reader<byte[]> reader = pulsarClient.newReader()
         .topic(topicName)
@@ -1139,13 +1138,51 @@ Reader<byte[]> reader = pulsarClient.newReader()
         .create();
 ```
 
+### Create reader with interceptor
+
+Pulsar reader interceptor intercepts and possibly mutates messages with user-defined processing before [Pulsar reader](concepts-clients.md#reader-interface) reads them. With reader interceptors, you can apply unified messaging processes before messages can be read, such as modifying messages, adding properties, collecting statistics and etc, without creating similar mechanisms respectively.
+
+![Reader interceptor](assets/reader-interceptor.svg)
+
+Pulsar reader interceptor works on top of Pulsar consumer interceptor. The plugin interface `ReaderInterceptor` can be treated as a subset of `ConsumerInterceptor` and it has two main events.
+* `beforeRead` is triggered before readers read messages. You can modify messages within this event.
+* `onPartitionsChange` is triggered when changes on partitions have been detected.
+
+To perceive triggered events and perform customized processing, you can add `ReaderInterceptor` when creating a `Reader` as follows.
+```java
+PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
+Reader<byte[]> reader = pulsarClient.newReader()
+        .topic(“t1”)
+        .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+        .intercept(new ReaderInterceptor<byte[]>() {
+            @Override
+            public void close() {
+            }
+
+            @Override
+            public Message<byte[]> beforeRead(Reader<byte[]> reader, Message<byte[]> message) {
+                // user-defined processing logic
+                return message;
+            }
+
+            @Override
+            public void onPartitionsChange(String topicName, int partitions) {
+                // user-defined processing logic
+            }
+        })
+        .startMessageId(MessageId.earliest)
+        .create();
+```
+
 ## TableView
 
 The TableView interface serves an encapsulated access pattern, providing a continuously updated key-value map view of the compacted topic data. Messages without keys will be ignored.
 
 With TableView, Pulsar clients can fetch all the message updates from a topic and construct a map with the latest values of each key. These values can then be used to build a local cache of data. In addition, you can register consumers with the TableView by specifying a listener to perform a scan of the map and then receive notifications when new messages are received. Consequently, event handling can be triggered to serve use cases, such as event-driven applications and message monitoring.
 
-> **Note:** Each TableView uses one Reader instance per partition, and reads the topic starting from the compacted view by default. It is highly recommended to enable automatic compaction by [configuring the topic compaction policies](cookbooks-compaction.md#configuring-compaction-to-run-automatically) for the given topic or namespace. More frequent compaction results in shorter startup times because less data is replayed to reconstruct the TableView of the topic.
+> **Note**
+>
+> Each TableView uses one Reader instance per partition, and reads the topic starting from the compacted view by default. It is highly recommended to enable automatic compaction by [configuring the topic compaction policies](cookbooks-compaction.md#configuring-compaction-to-run-automatically) for the given topic or namespace. More frequent compaction results in shorter startup times because less data is replayed to reconstruct the TableView of the topic.
 
 The following figure illustrates the dynamic construction of a TableView updated with newer values of each key.
 ![TableView](assets/tableview.png)
@@ -1153,7 +1190,6 @@ The following figure illustrates the dynamic construction of a TableView updated
 ### Configure TableView
  
 The following is an example of how to configure a TableView.
-
 ```java
 TableView<String> tv = client.newTableViewBuilder(Schema.STRING)
   .topic("my-tableview")
@@ -1172,7 +1208,6 @@ You can use the available parameters in the `loadConf` configuration or related
 You can register listeners for both existing messages on a topic and new messages coming into the topic by using `forEachAndListen`, and specify to perform operations for all existing messages by using `forEach`.
 
 The following is an example of how to register listeners with TableView.
-
 ```java
 // Register listeners for all existing and incoming messages
 tv.forEachAndListen((key, value) -> /*operations on all existing and incoming messages*/)