You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ur...@apache.org on 2022/08/31 12:01:55 UTC

[pulsar-site] branch main updated: Docs sync done from apache/pulsar(#4378856)

This is an automated email from the ASF dual-hosted git repository.

urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 528bb68988e Docs sync done from apache/pulsar(#4378856)
528bb68988e is described below

commit 528bb68988eb314d80b62e4769a7d752372ba60b
Author: Pulsar Site Updater <de...@pulsar.apache.org>
AuthorDate: Wed Aug 31 12:01:48 2022 +0000

    Docs sync done from apache/pulsar(#4378856)
---
 site2/website-next/docs/client-libraries-java.md   | 101 +++++++++++++++++++++
 .../version-2.10.x/client-libraries-java.md        | 101 +++++++++++++++++++++
 .../version-2.9.x/client-libraries-java.md         | 101 +++++++++++++++++++++
 3 files changed, 303 insertions(+)

diff --git a/site2/website-next/docs/client-libraries-java.md b/site2/website-next/docs/client-libraries-java.md
index e4978f01037..3c02e4edcf8 100644
--- a/site2/website-next/docs/client-libraries-java.md
+++ b/site2/website-next/docs/client-libraries-java.md
@@ -727,6 +727,47 @@ Producer<byte[]> producer = client.newProducer()
 By default, producer chunks the large message based on max message size (`maxMessageSize`) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration `chunkMaxMessageSize`.
 > **Note:** To enable chunking, you need to disable batching (`enableBatching`=`false`) concurrently.
 
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+        .topic(topic)
+        .intercept(new ProducerInterceptor {
+			@Override
+			boolean eligible(Message message) {
+			    return true;  // process all messages
+			}
+
+			@Override
+			Message beforeSend(Producer producer, Message message) {
+			    // user-defined processing logic
+			}
+
+			@Override
+			void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+			    // user-defined processing logic
+			}
+        })
+        .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Consumer
 
 In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1298,6 +1339,66 @@ If the message key is not specified, messages without key are dispatched to one
 
 :::
 
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+        .topic("my-topic")
+        .subscriptionName("my-subscription")
+        .intercept(new ConsumerInterceptor<String> {
+              @Override
+              public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onPartitionsChange(String topicName, int partitions) {
+                  // user-defined processing logic
+              }
+        })
+        .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Reader 
 
 With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
index 0b402f1cc45..5721bf31215 100644
--- a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
@@ -652,6 +652,47 @@ Producer<byte[]> producer = client.newProducer()
 By default, producer chunks the large message based on max message size (`maxMessageSize`) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration `chunkMaxMessageSize`.
 > **Note:** To enable chunking, you need to disable batching (`enableBatching`=`false`) concurrently.
 
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+        .topic(topic)
+        .intercept(new ProducerInterceptor {
+			@Override
+			boolean eligible(Message message) {
+			    return true;  // process all messages
+			}
+
+			@Override
+			Message beforeSend(Producer producer, Message message) {
+			    // user-defined processing logic
+			}
+
+			@Override
+			void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+			    // user-defined processing logic
+			}
+        })
+        .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Consumer
 
 In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1222,6 +1263,66 @@ If the message key is not specified, messages without key are dispatched to one
 
 :::
 
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+        .topic("my-topic")
+        .subscriptionName("my-subscription")
+        .intercept(new ConsumerInterceptor<String> {
+              @Override
+              public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onPartitionsChange(String topicName, int partitions) {
+                  // user-defined processing logic
+              }
+        })
+        .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Reader 
 
 With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
index 067a3a10de1..edd60396213 100644
--- a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
@@ -275,6 +275,47 @@ producer.newMessage()
 
 You can terminate the builder chain with `sendAsync()` and get a future return.
 
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+        .topic(topic)
+        .intercept(new ProducerInterceptor {
+			@Override
+			boolean eligible(Message message) {
+			    return true;  // process all messages
+			}
+
+			@Override
+			Message beforeSend(Producer producer, Message message) {
+			    // user-defined processing logic
+			}
+
+			@Override
+			void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+			    // user-defined processing logic
+			}
+        })
+        .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Consumer
 
 In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -765,6 +806,66 @@ If the message key is not specified, messages without key are dispatched to one
 
 :::
 
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+        .topic("my-topic")
+        .subscriptionName("my-subscription")
+        .intercept(new ConsumerInterceptor<String> {
+              @Override
+              public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+                  // user-defined processing logic
+              }
+
+              @Override
+              public void onPartitionsChange(String topicName, int partitions) {
+                  // user-defined processing logic
+              }
+        })
+        .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
 ## Reader 
 
 With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.