You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ur...@apache.org on 2022/08/31 12:01:55 UTC
[pulsar-site] branch main updated: Docs sync done from apache/pulsar(#4378856)
This is an automated email from the ASF dual-hosted git repository.
urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git
The following commit(s) were added to refs/heads/main by this push:
new 528bb68988e Docs sync done from apache/pulsar(#4378856)
528bb68988e is described below
commit 528bb68988eb314d80b62e4769a7d752372ba60b
Author: Pulsar Site Updater <de...@pulsar.apache.org>
AuthorDate: Wed Aug 31 12:01:48 2022 +0000
Docs sync done from apache/pulsar(#4378856)
---
site2/website-next/docs/client-libraries-java.md | 101 +++++++++++++++++++++
.../version-2.10.x/client-libraries-java.md | 101 +++++++++++++++++++++
.../version-2.9.x/client-libraries-java.md | 101 +++++++++++++++++++++
3 files changed, 303 insertions(+)
diff --git a/site2/website-next/docs/client-libraries-java.md b/site2/website-next/docs/client-libraries-java.md
index e4978f01037..3c02e4edcf8 100644
--- a/site2/website-next/docs/client-libraries-java.md
+++ b/site2/website-next/docs/client-libraries-java.md
@@ -727,6 +727,47 @@ Producer<byte[]> producer = client.newProducer()
By default, producer chunks the large message based on max message size (`maxMessageSize`) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration `chunkMaxMessageSize`.
> **Note:** To enable chunking, you need to disable batching (`enableBatching`=`false`) concurrently.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1298,6 +1339,66 @@ If the message key is not specified, messages without key are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions) {
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
index 0b402f1cc45..5721bf31215 100644
--- a/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.10.x/client-libraries-java.md
@@ -652,6 +652,47 @@ Producer<byte[]> producer = client.newProducer()
By default, producer chunks the large message based on max message size (`maxMessageSize`) configured at broker (eg: 5MB). However, client can also configure max chunked size using producer configuration `chunkMaxMessageSize`.
> **Note:** To enable chunking, you need to disable batching (`enableBatching`=`false`) concurrently.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -1222,6 +1263,66 @@ If the message key is not specified, messages without key are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions) {
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.
diff --git a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
index 067a3a10de1..edd60396213 100644
--- a/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
+++ b/site2/website-next/versioned_docs/version-2.9.x/client-libraries-java.md
@@ -275,6 +275,47 @@ producer.newMessage()
You can terminate the builder chain with `sendAsync()` and get a future return.
+### Intercept messages
+
+`ProducerInterceptor`s intercept and possibly mutate messages received by the producer before they are published to the brokers.
+
+The interface has three main events:
+* `eligible` checks if the interceptor can be applied to the message.
+* `beforeSend` is triggered before the producer sends the message to the broker. You can modify messages within this event.
+* `onSendAcknowledgement` is triggered when the message is acknowledged by the broker or the sending failed.
+
+To intercept messages, you can add one or multiple `ProducerInterceptor`s when creating a `Producer` as follows.
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+ .topic(topic)
+ .intercept(new ProducerInterceptor {
+ @Override
+ boolean eligible(Message message) {
+ return true; // process all messages
+ }
+
+ @Override
+ Message beforeSend(Producer producer, Message message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
+ // user-defined processing logic
+ }
+ })
+ .create();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Consumer
In Pulsar, consumers subscribe to topics and handle messages that producers publish to those topics. You can instantiate a new [consumer](reference-terminology.md#consumer) by first instantiating a {@inject: javadoc:PulsarClient:/client/org/apache/pulsar/client/api/PulsarClient} object and passing it a URL for a Pulsar broker (as [above](#client-configuration)).
@@ -765,6 +806,66 @@ If the message key is not specified, messages without key are dispatched to one
:::
+### Intercept messages
+
+`ConsumerInterceptor`s intercept and possibly mutate messages received by the consumer.
+
+The interface has six main events:
+* `beforeConsume` is triggered before the message is returned by `receive()` or `receiveAsync()`. You can modify messages within this event.
+* `onAcknowledge` is triggered before the consumer sends the acknowledgement to the broker.
+* `onAcknowledgeCumulative` is triggered before the consumer sends the cumulative acknowledgement to the broker.
+* `onNegativeAcksSend` is triggered when a redelivery from a negative acknowledgement occurs.
+* `onAckTimeoutSend` is triggered when a redelivery from an acknowledgement timeout occurs.
+* `onPartitionsChange` is triggered when the partitions of the (partitioned) topic change.
+
+To intercept messages, you can add one or multiple `ConsumerInterceptor`s when creating a `Consumer` as follows.
+
+```java
+
+Consumer<String> consumer = client.newConsumer()
+ .topic("my-topic")
+ .subscriptionName("my-subscription")
+ .intercept(new ConsumerInterceptor<String> {
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
+ // user-defined processing logic
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int partitions) {
+ // user-defined processing logic
+ }
+ })
+ .subscribe();
+
+```
+
+:::note
+
+If you are using multiple interceptors, they apply in the order they are passed to the `intercept` method.
+
+:::
+
## Reader
With the [reader interface](concepts-clients.md#reader-interface), Pulsar clients can "manually position" themselves within a topic and reading all messages from a specified message onward. The Pulsar API for Java enables you to create {@inject: javadoc:Reader:/client/org/apache/pulsar/client/api/Reader} objects by specifying a topic and a {@inject: javadoc:MessageId:/client/org/apache/pulsar/client/api/MessageId}.