You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/05 21:25:33 UTC
[incubator-pulsar] branch master updated: PIP-23: Pulsar Java
Client Interceptors. (#2471)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a92111 PIP-23: Pulsar Java Client Interceptors. (#2471)
7a92111 is described below
commit 7a92111fe7824d422d1739abd06d85955b652212
Author: penghui <co...@gmail.com>
AuthorDate: Thu Sep 6 05:25:30 2018 +0800
PIP-23: Pulsar Java Client Interceptors. (#2471)
### Motivation
Support user to add interceptors to producer and consumer.
### Modifications
Add Consumer interceptors.
```java
Message<T> beforeConsume(Message<T> message);
void onAcknowledge(MessageId messageId, Throwable cause);
void onAcknowledgeCumulative(MessageId messageId, Throwable cause);
```
Add Producer interceptors.
```java
Message<T> beforeSend(Message<T> message);
void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable cause);
```
### Result
Users can using interceptors in multiple scenarios, such as for applications to add
custom logging or processing.
Master Issue: #2476
---
.../apache/pulsar/client/impl/RawReaderImpl.java | 2 +-
.../pulsar/broker/service/PersistentTopicTest.java | 7 +-
.../pulsar/broker/service/ReplicatorTest.java | 3 +-
.../broker/service/v1/V1_ReplicatorTest.java | 3 +-
.../apache/pulsar/client/api/InterceptorsTest.java | 359 +++++++++++++++++++++
.../org/apache/pulsar/client/api/Consumer.java | 6 +
.../apache/pulsar/client/api/ConsumerBuilder.java | 8 +
.../pulsar/client/api/ConsumerInterceptor.java | 99 ++++++
.../apache/pulsar/client/api/ProducerBuilder.java | 8 +
.../pulsar/client/api/ProducerInterceptor.java | 92 ++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 25 +-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 20 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 40 ++-
.../pulsar/client/impl/ConsumerInterceptors.java | 132 ++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 4 +-
.../client/impl/MultiTopicsConsumerImpl.java | 12 +-
.../client/impl/PartitionedProducerImpl.java | 6 +-
.../impl/PatternMultiTopicsConsumerImpl.java | 4 +-
.../apache/pulsar/client/impl/ProducerBase.java | 18 +-
.../pulsar/client/impl/ProducerBuilderImpl.java | 18 +-
.../apache/pulsar/client/impl/ProducerImpl.java | 72 +++--
.../pulsar/client/impl/ProducerInterceptors.java | 110 +++++++
.../pulsar/client/impl/PulsarClientImpl.java | 45 +--
.../org/apache/pulsar/client/impl/ReaderImpl.java | 2 +-
.../pulsar/client/impl/TopicMessageImpl.java | 4 +
.../pulsar/functions/instance/ContextImplTest.java | 2 +-
.../MultiConsumersOneOutputTopicProducersTest.java | 27 +-
27 files changed, 1033 insertions(+), 95 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index ae1a4db..99aec93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -111,7 +111,7 @@ public class RawReaderImpl implements RawReader {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
- Schema.BYTES);
+ Schema.BYTES, null);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f8ac40a..8c175d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
@@ -36,7 +37,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -80,7 +80,6 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -1224,7 +1223,7 @@ public class PersistentTopicTest {
verify(clientImpl)
.createProducerAsync(
any(ProducerConfigurationData.class),
- any(Schema.class)
+ any(Schema.class), eq(null)
);
replicator.disconnect(false);
@@ -1235,7 +1234,7 @@ public class PersistentTopicTest {
verify(clientImpl, Mockito.times(2))
.createProducerAsync(
any(ProducerConfigurationData.class),
- any(Schema.class)
+ any(Schema.class), any(null)
);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index de49925..528ff64 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
@@ -242,7 +243,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
Thread.sleep(3000);
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
- Mockito.any(Schema.class));
+ Mockito.any(Schema.class), eq(null));
client1.shutdown();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
index 0d6d303..92bb641 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.v1;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
@@ -241,7 +242,7 @@ public class V1_ReplicatorTest extends V1_ReplicatorTestBase {
Thread.sleep(3000);
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
- Mockito.any(Schema.class));
+ Mockito.any(Schema.class), eq(null));
client1.shutdown();
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
new file mode 100644
index 0000000..d833846
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorsTest extends ProducerConsumerBase {
+
+ private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class);
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testProducerInterceptor() throws PulsarClientException {
+ ProducerInterceptor<String> interceptor1 = new ProducerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ log.info("Before send message: {}", new String(msg.getData()));
+ java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
+ for (int i = 0; i < properties.size(); i++) {
+ if ("key".equals(properties.get(i).getKey())) {
+ msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("after").build());
+ }
+ }
+ return msg;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
+ message.getProperties();
+ Assert.assertEquals("complete", message.getProperty("key"));
+ log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
+ }
+ };
+
+ ProducerInterceptor<String> interceptor2 = new ProducerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ log.info("Before send message: {}", new String(msg.getData()));
+ java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
+ for (int i = 0; i < properties.size(); i++) {
+ if ("key".equals(properties.get(i).getKey())) {
+ msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("complete").build());
+ }
+ }
+ return msg;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
+ message.getProperties();
+ Assert.assertEquals("complete", message.getProperty("key"));
+ log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
+ }
+ };
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .intercept(interceptor1, interceptor2)
+ .create();
+
+ MessageId messageId = producer.newMessage().property("key", "before").value("Hello Pulsar!").send();
+ log.info("Send result messageId: {}", messageId);
+ producer.close();
+ }
+
+ @Test
+ public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+ return msg;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledge messageId: {}", messageId, cause);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+ }
+ };
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ producer.newMessage().value("Hello Pulsar!").send();
+
+ Message<String> received = consumer.receive();
+ MessageImpl<String> msg = (MessageImpl<String>) received;
+ boolean haveKey = false;
+ for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+ if ("beforeConsumer".equals(keyValue.getKey())) {
+ haveKey = true;
+ }
+ }
+ Assert.assertTrue(haveKey);
+ consumer.acknowledge(received);
+ producer.close();
+ consumer.close();
+ }
+
+ @Test
+ public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException {
+
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+ return msg;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledge messageId: {}", messageId, cause);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+ }
+ };
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic1")
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1")
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ producer.newMessage().value("Hello Pulsar!").send();
+ producer1.newMessage().value("Hello Pulsar!").send();
+
+ int keyCount = 0;
+ for (int i = 0; i < 2; i++) {
+ Message<String> received = consumer.receive();
+ MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
+ for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+ if ("beforeConsumer".equals(keyValue.getKey())) {
+ keyCount++;
+ }
+ }
+ consumer.acknowledge(received);
+ }
+ Assert.assertEquals(2, keyCount);
+ producer.close();
+ producer1.close();
+ consumer.close();
+ }
+
+ @Test
+ public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
+
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+ return msg;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledge messageId: {}", messageId, cause);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+ }
+ };
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic1")
+ .create();
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topicsPattern("persistent://my-property/my-ns/my-.*")
+ .subscriptionType(SubscriptionType.Shared)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ producer.newMessage().value("Hello Pulsar!").send();
+ producer1.newMessage().value("Hello Pulsar!").send();
+
+ int keyCount = 0;
+ for (int i = 0; i < 2; i++) {
+ Message<String> received = consumer.receive();
+ MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
+ for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+ if ("beforeConsumer".equals(keyValue.getKey())) {
+ keyCount++;
+ }
+ }
+ consumer.acknowledge(received);
+ }
+ Assert.assertEquals(2, keyCount);
+ producer.close();
+ producer1.close();
+ consumer.close();
+ }
+
+ @Test
+ public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException {
+
+ List<MessageId> ackHolder = new ArrayList<>();
+
+ ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+ MessageImpl<String> msg = (MessageImpl<String>) message;
+ msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+ return msg;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ log.info("onAcknowledge messageId: {}", messageId, cause);
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+ long acknowledged = ackHolder.stream().filter(m -> (m.compareTo(messageId) <= 0)).count();
+ Assert.assertEquals(acknowledged, 100);
+ ackHolder.clear();
+ log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+ }
+ };
+
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionType(SubscriptionType.Failover)
+ .intercept(interceptor)
+ .subscriptionName("my-subscription")
+ .subscribe();
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("persistent://my-property/my-ns/my-topic")
+ .create();
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage().value("Hello Pulsar!").send();
+ }
+
+ int keyCount = 0;
+ for (int i = 0; i < 100; i++) {
+ Message<String> received = consumer.receive();
+ MessageImpl<String> msg = (MessageImpl<String>) received;
+ for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+ if ("beforeConsumer".equals(keyValue.getKey())) {
+ keyCount++;
+ }
+ }
+ ackHolder.add(received.getMessageId());
+ if (i == 99) {
+ consumer.acknowledgeCumulative(received);
+ }
+ }
+ Assert.assertEquals(100, keyCount);
+ producer.close();
+ consumer.close();
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 69d885f..f5a9266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -288,4 +288,10 @@ public interface Consumer<T> extends Closeable {
* @return Whether the consumer is connected to the broker
*/
boolean isConnected();
+
+ /**
+ * Get the name of consumer.
+ * @return consumer name.
+ */
+ String getConsumerName();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8657859..da1cbb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -330,4 +330,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Set subscriptionInitialPosition for the consumer
*/
ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
+
+ /**
+ * Intercept {@link Consumer}.
+ *
+ * @param interceptors the list of interceptors to intercept the consumer created by this builder.
+ * @return consumer builder.
+ */
+ ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
new file mode 100644
index 0000000..1134d8a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ * <p>
+ * A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ * <p>
+ * Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+public interface ConsumerInterceptor<T> extends AutoCloseable {
+
+ /**
+ * Close the interceptor.
+ */
+ void close();
+
+ /**
+ * This is called just before the message is returned by
+ * {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
+ * Message)} or the {@link java.util.concurrent.CompletableFuture} returned by
+ * {@link Consumer#receiveAsync()} completes.
+ * <p>
+ * This method is allowed to modify message, in which case the new message
+ * will be returned.
+ * <p>
+ * Any exception thrown by this method will be caught by the caller, logged,
+ * but not propagated to client.
+ * <p>
+ * Since the consumer may run multiple interceptors, a particular
+ * interceptor's
+ * <tt>beforeConsume</tt> callback will be called in the order specified by
+ * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first
+ * interceptor in the list gets the consumed message, the following
+ * interceptor will be passed
+ * the message returned by the previous interceptor, and so on. Since
+ * interceptors are allowed to modify message, interceptors may potentially
+ * get the messages already modified by other interceptors. However building a
+ * pipeline of mutable
+ * interceptors that depend on the output of the previous interceptor is
+ * discouraged, because of potential side-effects caused by interceptors
+ * potentially failing to modify the message and throwing an exception.
+ * if one of interceptors in the list throws an exception from
+ * <tt>beforeConsume</tt>, the exception is caught, logged,
+ * and the next interceptor is called with the message returned by the last
+ * successful interceptor in the list, or otherwise the original consumed
+ * message.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param message the message to be consumed by the client.
+ * @return message that is either modified by the interceptor or same message
+ * passed into the method.
+ */
+ Message<T> beforeConsume(Consumer<T> consumer, Message<T> message);
+
+ /**
+ * This is called consumer sends the acknowledgment to the broker.
+ *
+ * <p>Any exception thrown by this method will be ignored by the caller.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param messageId message to ack, null if acknowledge fail.
+ * @param exception the exception on acknowledge.
+ */
+ void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception);
+
+ /**
+ * This is called consumer send the cumulative acknowledgment to the broker.
+ *
+ * <p>Any exception thrown by this method will be ignored by the caller.
+ *
+ * @param consumer the consumer which contains the interceptor
+ * @param messageId message to ack, null if acknowledge fail.
+ * @param exception the exception on acknowledge.
+ */
+ void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 8256b4a..b3aa720 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -318,4 +318,12 @@ public interface ProducerBuilder<T> extends Cloneable {
* @return
*/
ProducerBuilder<T> properties(Map<String, String> properties);
+
+ /**
+ * Intercept {@link Producer}.
+ *
+ * @param interceptors the list of interceptors to intercept the producer created by this builder.
+ * @return producer builder.
+ */
+ ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
new file mode 100644
index 0000000..b6a0d77
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the
+ * messages received by the producer before they are published to the Pulsar
+ * brokers.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
+ * not propagated further.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. Interceptor
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<T> extends AutoCloseable {
+
+ /**
+ * Close the interceptor.
+ */
+ void close();
+
+ /**
+ * This is called from {@link Producer#send(Object)} and {@link
+ * Producer#sendAsync(Object)} methods, before
+ * send the message to the brokers. This method is allowed to modify the
+ * record, in which case, the new record
+ * will be returned.
+ * <p>
+ * Any exception thrown by this method will be caught by the caller and
+ * logged, but not propagated further.
+ * <p>
+ * Since the producer may run multiple interceptors, a particular
+ * interceptor's {@link #beforeSend(Producer, Message)} callback will be called in the
+ * order specified by
+ * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+ * <p>
+ * The first interceptor in the list gets the message passed from the client,
+ * the following interceptor will be passed the message returned by the
+ * previous interceptor, and so on. Since interceptors are allowed to modify
+ * messages, interceptors may potentially get the message already modified by
+ * other interceptors. However, building a pipeline of mutable interceptors
+ * that depend on the output of the previous interceptor is discouraged,
+ * because of potential side-effects caused by interceptors potentially
+ * failing to modify the message and throwing an exception. If one of the
+ * interceptors in the list throws an exception from
+ * {@link#beforeSend(Message)}, the exception is caught, logged, and the next
+ * interceptor is called with the message returned by the last successful
+ * interceptor in the list, or otherwise the client.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param message message to send
+ * @return the intercepted message
+ */
+ Message<T> beforeSend(Producer<T> producer, Message<T> message);
+
+ /**
+ * This method is called when the message sent to the broker has been
+ * acknowledged, or when sending the message fails.
+ * This method is generally called just before the user callback is
+ * called, and in additional cases when an exception on the producer side.
+ * <p>
+ * Any exception thrown by this method will be ignored by the caller.
+ * <p>
+ * This method will generally execute in the background I/O thread, so the
+ * implementation should be reasonably fast. Otherwise, sending of messages
+ * from other threads could be delayed.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param message the message that application sends
+ * @param msgId the message id that assigned by the broker; null if send failed.
+ * @param exception the exception on sending messages, null indicates send has succeed.
+ */
+ void onSendAcknowledgement(Producer<T> producer, Message<T> message, MessageId msgId, Throwable exception);
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a6fda6f..d8a0de3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -60,10 +60,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected int maxReceiverQueueSize;
protected Schema<T> schema;
+ protected final ConsumerInterceptors<T> interceptors;
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
+ CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
super(client, topic);
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = conf.getSubscriptionName();
@@ -81,6 +82,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.listenerExecutor = listenerExecutor;
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
+ this.interceptors = interceptors;
}
@Override
@@ -335,6 +337,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
return subscription;
}
+ @Override
public String getConsumerName() {
return this.consumerName;
}
@@ -360,4 +363,24 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
this.maxReceiverQueueSize = newSize;
}
+ protected Message<T> beforeConsume(Message<T> message) {
+ if (interceptors != null) {
+ return interceptors.beforeConsume(this, message);
+ } else {
+ return message;
+ }
+ }
+
+ protected void onAcknowledge(MessageId messageId, Throwable exception) {
+ if (interceptors != null) {
+ interceptors.onAcknowledge(this, messageId, exception);
+ }
+ }
+
+ protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
+ if (interceptors != null) {
+ interceptors.onAcknowledgeCumulative(this, messageId, exception);
+ }
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f0067f7..2095bab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -32,6 +34,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -52,6 +55,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private final PulsarClientImpl client;
private ConsumerConfigurationData<T> conf;
private final Schema<T> schema;
+ private List<ConsumerInterceptor<T>> interceptorList;
private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
@@ -104,8 +108,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return FutureUtil.failedFuture(
new InvalidConfigurationException("Subscription name must be set on the consumer builder"));
}
-
- return client.subscribeAsync(conf, schema);
+ return interceptorList == null || interceptorList.size() == 0 ?
+ client.subscribeAsync(conf, schema, null) :
+ client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
}
@Override
@@ -242,7 +247,16 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
- public ConsumerConfigurationData<T> getConf() {
+ @Override
+ public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {
+ if (interceptorList == null) {
+ interceptorList = new ArrayList<>();
+ }
+ interceptorList.addAll(Arrays.asList(interceptors));
+ return this;
+ }
+
+ public ConsumerConfigurationData<T> getConf() {
return conf;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fe37b69..a0a2319 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -143,14 +143,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
- ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
- this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
+ ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
+ this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, interceptors);
}
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
- SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
- super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
+ SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors interceptors) {
+ super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -263,8 +263,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.take();
- messageProcessed(message);
- return message;
+ Message<T> interceptMsg = beforeConsume(message);
+ messageProcessed(interceptMsg);
+ return interceptMsg;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
stats.incrementNumReceiveFailed();
@@ -293,8 +294,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (message == null && conf.getReceiverQueueSize() == 0) {
sendFlowPermitsToBroker(cnx(), 1);
} else if (message != null) {
- messageProcessed(message);
- result.complete(message);
+ Message<T> interceptMsg = beforeConsume(message);
+ messageProcessed(interceptMsg);
+ result.complete(interceptMsg);
}
return result;
@@ -352,10 +354,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
- if (message != null) {
- messageProcessed(message);
+ Message<T> interceptMsg = beforeConsume(message);
+ if (interceptMsg != null) {
+ messageProcessed(interceptMsg);
}
- return message;
+ return interceptMsg;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -410,7 +413,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
checkArgument(messageId instanceof MessageIdImpl);
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
- return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + getState()));
+ PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
+ if (AckType.Individual.equals(ackType)) {
+ onAcknowledge(messageId, exception);
+ } else if (AckType.Cumulative.equals(ackType)) {
+ onAcknowledgeCumulative(messageId, exception);
+ }
+ return FutureUtil.failedFuture(exception);
}
if (messageId instanceof BatchMessageIdImpl) {
@@ -444,7 +453,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
unAckedMessageTracker.remove(msgId);
stats.incrementNumAcksSent(1);
}
+ onAcknowledge(messageId, null);
} else if (ackType == AckType.Cumulative) {
+ onAcknowledgeCumulative(messageId, null);
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
}
@@ -837,9 +848,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
receivedFuture.complete(message);
} else {
// increase permits for available message-queue
- messageProcessed(message);
+ Message<T> interceptMsg = beforeConsume(message);
+ messageProcessed(interceptMsg);
// return message to receivedCallback
- listenerExecutor.execute(() -> receivedFuture.complete(message));
+ listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg));
}
}
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
new file mode 100644
index 0000000..a0d30fc
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
+ * of custom interceptors.
+ */
+public class ConsumerInterceptors<T> implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
+
+ private final List<ConsumerInterceptor<T>> interceptors;
+
+ public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
+ this.interceptors = interceptors;
+ }
+
+ /**
+ * This is called just before the message is returned by {@link Consumer#receive()},
+ * {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture}
+ * returned by {@link Consumer#receiveAsync()} completes.
+ * <p>
+ * This method calls {@link ConsumerInterceptor#beforeConsume(Consumer, Message)} for each interceptor. Messages returned
+ * from each interceptor get passed to beforeConsume() of the next interceptor in the chain of interceptors.
+ * <p>
+ * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets
+ * caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous
+ * successful interceptor beforeConsume call.
+ *
+ * @param consumer the consumer which contains the interceptors
+ * @param message message to be consume by the client.
+ * @return messages that are either modified by interceptors or same as messages passed to this method.
+ */
+ public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
+ Message<T> interceptorMessage = message;
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
+ } catch (Exception e) {
+ if (consumer != null) {
+ log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", consumer.getTopic(), consumer.getConsumerName(), e);
+ } else {
+ log.warn("Error executing interceptor beforeConsume callback", e);
+ }
+ }
+ }
+ return interceptorMessage;
+ }
+
+ /**
+ * This is called when acknowledge request return from the broker.
+ * <p>
+ * This method calls {@link ConsumerInterceptor#onAcknowledge(Consumer, MessageId, Throwable)} method for each interceptor.
+ * <p>
+ * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+ *
+ * @param consumer the consumer which contains the interceptors
+ * @param messageId message to acknowledge.
+ * @param exception exception returned by broker.
+ */
+ public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptors.get(i).onAcknowledge(consumer, messageId, exception);
+ } catch (Exception e) {
+ log.warn("Error executing interceptor onAcknowledge callback ", e);
+ }
+ }
+ }
+
+ /**
+ * This is called when acknowledge cumulative request return from the broker.
+ * <p>
+ * This method calls {@link ConsumerInterceptor#onAcknowledgeCumulative(Consumer, MessageId, Throwable)} (Message, Throwable)} method for each interceptor.
+ * <p>
+ * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+ *
+ * @param consumer the consumer which contains the interceptors
+ * @param messageId messages to acknowledge.
+ * @param exception exception returned by broker.
+ */
+ public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
+ } catch (Exception e) {
+ log.warn("Error executing interceptor onAcknowledgeCumulative callback ", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptors.get(i).close();
+ } catch (Exception e) {
+ log.error("Fail to close consumer interceptor ", e);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 96b68c1..97e2247 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -241,7 +241,7 @@ public class MessageImpl<T> implements Message<T> {
return null;
}
- ByteBuf getDataBuffer() {
+ public ByteBuf getDataBuffer() {
return payload;
}
@@ -274,7 +274,7 @@ public class MessageImpl<T> implements Message<T> {
return properties.get(name);
}
- MessageMetadata.Builder getMessageBuilder() {
+ public MessageMetadata.Builder getMessageBuilder() {
return msgMetadataBuilder;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f1cb9cf..75fdac6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -85,9 +85,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private final ConsumerConfigurationData<T> internalConfig;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
- CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
+ CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
- Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema);
+ Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -632,7 +632,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture,
int numPartitions,
- Schema<T> schema) {
+ Schema<T> schema, ConsumerInterceptors<T> interceptors) {
checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer");
// get topic name, then remove it from conf, so constructor will create a consumer with no topic.
@@ -641,7 +641,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
cloneConf.getTopicNames().remove(topicName);
CompletableFuture<Consumer> future = new CompletableFuture<>();
- MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema);
+ MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema, interceptors);
future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))
@@ -695,7 +695,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, configurationData,
- client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema);
+ client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema, interceptors);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
@@ -706,7 +706,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig,
- client.externalExecutorProvider().getExecutor(), 0, subFuture, schema);
+ client.externalExecutorProvider().getExecutor(), 0, subFuture, schema, interceptors);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3e159e8..12ecf2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -53,8 +53,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
private final TopicMetadata topicMetadata;
public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
- CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema) {
- super(client, topic, conf, producerCreatedFuture, schema);
+ CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors<T> interceptors) {
+ super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.producers = Lists.newArrayListWithCapacity(numPartitions);
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
@@ -111,7 +111,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) {
String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();
ProducerImpl<T> producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(),
- partitionIndex, schema);
+ partitionIndex, schema, interceptors);
producers.add(producer);
producer.producerCreatedFuture().handle((prod, createException) -> {
if (createException != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index d0b0c60..3f6dfec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -51,8 +51,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture,
- Schema<T> schema) {
- super(client, conf, listenerExecutor, subscribeFuture, schema);
+ Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+ super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors);
this.topicsPattern = topicsPattern;
if (this.namespaceName == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index eb02b6b..39e632c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -36,13 +36,15 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
protected final CompletableFuture<Producer<T>> producerCreatedFuture;
protected final ProducerConfigurationData conf;
protected final Schema<T> schema;
+ protected final ProducerInterceptors<T> interceptors;
protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
- CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema) {
+ CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors<T> interceptors) {
super(client, topic);
this.producerCreatedFuture = producerCreatedFuture;
this.conf = conf;
this.schema = schema;
+ this.interceptors = interceptors;
}
@Override
@@ -148,6 +150,20 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
return producerCreatedFuture;
}
+ protected Message<T> beforeSend(Message<T> message) {
+ if (interceptors != null) {
+ return interceptors.beforeSend(this, message);
+ } else {
+ return message;
+ }
+ }
+
+ protected void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable exception) {
+ if (interceptors != null) {
+ interceptors.onSendAcknowledgement(this, message, msgId, exception);
+ }
+ }
+
@Override
public String toString() {
return "ProducerBase{" + "topic='" + topic + '\'' + '}';
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index ff391be..834e0c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -33,6 +36,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -46,6 +50,7 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
private final PulsarClientImpl client;
private ProducerConfigurationData conf;
private Schema<T> schema;
+ private List<ProducerInterceptor<T>> interceptorList;
@VisibleForTesting
public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
@@ -97,7 +102,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
}
- return client.createProducerAsync(conf, schema);
+ return interceptorList == null || interceptorList.size() == 0 ?
+ client.createProducerAsync(conf, schema, null) :
+ client.createProducerAsync(conf, schema, new ProducerInterceptors<>(interceptorList));
}
@Override
@@ -226,4 +233,13 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
conf.getProperties().putAll(properties);
return this;
}
+
+ @Override
+ public ProducerBuilder<T> intercept(ProducerInterceptor<T>... interceptors) {
+ if (interceptorList == null) {
+ interceptorList = new ArrayList<>();
+ }
+ interceptorList.addAll(Arrays.asList(interceptors));
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c3428e2..9401e7c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -121,8 +121,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
.newUpdater(ProducerImpl.class, "msgIdGenerator");
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
- CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema) {
- super(client, topic, conf, producerCreatedFuture, schema);
+ CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
+ ProducerInterceptors<T> interceptors) {
+ super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.producerId = client.newProducerId();
this.producerName = conf.getProducerName();
this.partitionIndex = partitionIndex;
@@ -205,9 +206,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
@Override
CompletableFuture<MessageId> internalSendAsync(Message<T> message) {
+
CompletableFuture<MessageId> future = new CompletableFuture<>();
- sendAsync(message, new SendCallback() {
+ MessageImpl<T> interceptorMessage = (MessageImpl<T>) beforeSend(message);
+ //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors.
+ interceptorMessage.getDataBuffer().retain();
+ if (interceptors != null) {
+ interceptorMessage.getProperties();
+ }
+ sendAsync(interceptorMessage, new SendCallback() {
SendCallback nextCallback = null;
MessageImpl<?> nextMsg = null;
long createdAt = System.nanoTime();
@@ -229,25 +237,40 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
@Override
public void sendComplete(Exception e) {
- if (e != null) {
- stats.incrementSendFailed();
- future.completeExceptionally(e);
- } else {
- future.complete(message.getMessageId());
- stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
- }
- while (nextCallback != null) {
- SendCallback sendCallback = nextCallback;
- MessageImpl<?> msg = nextMsg;
+ try {
if (e != null) {
stats.incrementSendFailed();
- sendCallback.getFuture().completeExceptionally(e);
+ onSendAcknowledgement(interceptorMessage, null, e);
+ future.completeExceptionally(e);
} else {
- sendCallback.getFuture().complete(msg.getMessageId());
+ onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
+ future.complete(interceptorMessage.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
- nextMsg = nextCallback.getNextMessage();
- nextCallback = nextCallback.getNextSendCallback();
+ } finally {
+ interceptorMessage.getDataBuffer().release();
+ }
+
+ while (nextCallback != null) {
+ SendCallback sendCallback = nextCallback;
+ MessageImpl<?> msg = nextMsg;
+ //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors.
+ try {
+ msg.getDataBuffer().retain();
+ if (e != null) {
+ stats.incrementSendFailed();
+ onSendAcknowledgement((Message<T>) msg, null, e);
+ sendCallback.getFuture().completeExceptionally(e);
+ } else {
+ onSendAcknowledgement((Message<T>) msg, msg.getMessageId(), null);
+ sendCallback.getFuture().complete(msg.getMessageId());
+ stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
+ }
+ nextMsg = nextCallback.getNextMessage();
+ nextCallback = nextCallback.getNextSendCallback();
+ } finally {
+ msg.getDataBuffer().release();
+ }
}
}
@@ -292,14 +315,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
? "Compressed"
: "";
- callback.sendComplete(new PulsarClientException.InvalidMessageException(
- format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
- PulsarDecoder.MaxMessageSize)));
+ PulsarClientException.InvalidMessageException invalidMessageException =
+ new PulsarClientException.InvalidMessageException(
+ format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
+ PulsarDecoder.MaxMessageSize));
+ callback.sendComplete(invalidMessageException);
return;
}
if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
- callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message"));
+ PulsarClientException.InvalidMessageException invalidMessageException =
+ new PulsarClientException.InvalidMessageException("Cannot re-use the same message");
+ callback.sendComplete(invalidMessageException);
compressedPayload.release();
return;
}
@@ -463,8 +490,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
semaphore.acquire();
} else {
if (!semaphore.tryAcquire()) {
- callback.sendComplete(
- new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
+ callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
return false;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
new file mode 100644
index 0000000..59d1ad3
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that holds the list{@link org.apache.pulsar.client.api.ProducerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ProducerInterceptors<T> implements Closeable {
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
+
+ private final List<ProducerInterceptor<T>> interceptors;
+
+ public ProducerInterceptors(List<ProducerInterceptor<T>> interceptors) {
+ this.interceptors = interceptors;
+ }
+
+ /**
+ * This is called when client sends message to pulsar broker, before key and value gets serialized.
+ * The method calls {@link ProducerInterceptor#beforeSend(Producer,Message)} method. Message returned from
+ * first interceptor's beforeSend() is passed to the second interceptor beforeSend(), and so on in the
+ * interceptor chain. The message returned from the last interceptor is returned from this method.
+ *
+ * This method does not throw exceptions. Exceptions thrown by any interceptor methods are caught and ignored.
+ * If a interceptor in the middle of the chain, that normally modifies the message, throws an exception,
+ * the next interceptor in the chain will be called with a message returned by the previous interceptor that did
+ * not throw an exception.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param message the message from client
+ * @return the message to send to topic/partition
+ */
+ public Message<T> beforeSend(Producer<T> producer, Message<T> message) {
+ Message<T> interceptorMessage = message;
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage);
+ } catch (Exception e) {
+ if (message != null && producer != null) {
+ log.warn("Error executing interceptor beforeSend callback for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), e);
+ } else {
+ log.warn("Error Error executing interceptor beforeSend callback ", e);
+ }
+ }
+ }
+ return interceptorMessage;
+ }
+
+ /**
+ * This method is called when the message send to the broker has been acknowledged, or when sending the record fails
+ * before it gets send to the broker.
+ * This method calls {@link ProducerInterceptor#onSendAcknowledgement(Producer, Message, MessageId, Throwable)} method for
+ * each interceptor.
+ *
+ * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param message The message returned from the last interceptor is returned from {@link ProducerInterceptor#beforeSend(Producer, Message)}
+ * @param msgId The message id that broker returned. Null if has error occurred.
+ * @param exception The exception thrown during processing of this message. Null if no error occurred.
+ */
+ public void onSendAcknowledgement(Producer<T> producer, Message<T> message, MessageId msgId, Throwable exception) {
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptors.get(i).onSendAcknowledgement(producer, message, msgId, exception);
+ } catch (Exception e) {
+ log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (int i = 0; i < interceptors.size(); i++) {
+ try {
+ interceptors.get(i).close();
+ } catch (Exception e) {
+ log.error("Fail to close producer interceptor ", e);
+ }
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4013068..b87b9bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -238,10 +238,15 @@ public class PulsarClientImpl implements PulsarClient {
}
public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
- return createProducerAsync(conf, Schema.BYTES);
+ return createProducerAsync(conf, Schema.BYTES, null);
}
- public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
+ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
+ return createProducerAsync(conf, schema, null);
+ }
+
+ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema,
+ ProducerInterceptors<T> interceptors) {
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
@@ -273,9 +278,9 @@ public class PulsarClientImpl implements PulsarClient {
ProducerBase<T> producer;
if (metadata.partitions > 1) {
producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
- producerCreatedFuture, schema);
+ producerCreatedFuture, schema, interceptors);
} else {
- producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema);
+ producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
}
synchronized (producers) {
@@ -336,10 +341,10 @@ public class PulsarClientImpl implements PulsarClient {
}
public CompletableFuture<Consumer<byte[]>> subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
- return subscribeAsync(conf, Schema.BYTES);
+ return subscribeAsync(conf, Schema.BYTES, null);
}
- public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+ public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
@@ -377,15 +382,15 @@ public class PulsarClientImpl implements PulsarClient {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern"));
}
- return patternTopicSubscribeAsync(conf, schema);
+ return patternTopicSubscribeAsync(conf, schema, interceptors);
} else if (conf.getTopicNames().size() == 1) {
- return singleTopicSubscribeAsync(conf, schema);
+ return singleTopicSubscribeAsync(conf, schema, interceptors);
} else {
- return multiTopicSubscribeAsync(conf, schema);
+ return multiTopicSubscribeAsync(conf, schema, interceptors);
}
}
- private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+ private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (schema instanceof AutoSchema) {
AutoSchema autoSchema = (AutoSchema) schema;
return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
@@ -395,20 +400,20 @@ public class PulsarClientImpl implements PulsarClient {
log.info("Auto detected schema for topic {} : {}",
conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
autoSchema.setSchema(genericSchema);
- return doSingleTopicSubscribeAsync(conf, schema);
+ return doSingleTopicSubscribeAsync(conf, schema, interceptors);
} else {
return FutureUtil.failedFuture(
new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
}
});
} else {
- return doSingleTopicSubscribeAsync(conf, schema);
+ return doSingleTopicSubscribeAsync(conf, schema, interceptors);
}
}
- private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
String topic = conf.getSingleTopic();
@@ -423,10 +428,10 @@ public class PulsarClientImpl implements PulsarClient {
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
- listenerThread, consumerSubscribedFuture, metadata.partitions, schema);
+ listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1,
- consumerSubscribedFuture, schema);
+ consumerSubscribedFuture, schema, interceptors);
}
synchronized (consumers) {
@@ -441,11 +446,11 @@ public class PulsarClientImpl implements PulsarClient {
return consumerSubscribedFuture;
}
- private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+ private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
- externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema);
+ externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
@@ -455,10 +460,10 @@ public class PulsarClientImpl implements PulsarClient {
}
public CompletableFuture<Consumer<byte[]>> patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> conf) {
- return patternTopicSubscribeAsync(conf, Schema.BYTES);
+ return patternTopicSubscribeAsync(conf, Schema.BYTES, null);
}
- private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors interceptors) {
String regex = conf.getTopicsPattern().pattern();
TopicName destination = TopicName.get(regex);
NamespaceName namespaceName = destination.getNamespaceObject();
@@ -479,7 +484,7 @@ public class PulsarClientImpl implements PulsarClient {
conf,
externalExecutorProvider.getExecutor(),
consumerSubscribedFuture,
- schema);
+ schema, interceptors);
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index aafd125..388e5c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -85,7 +85,7 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
- partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
+ partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index eae02b0..230a022 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -127,4 +127,8 @@ public class TopicMessageImpl<T> implements Message<T> {
public Optional<EncryptionContext> getEncryptionCtx() {
return msg.getEncryptionCtx();
}
+
+ public Message<T> getMessage() {
+ return msg;
+ }
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4380c63..f5108fc 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -64,7 +64,7 @@ public class ContextImplTest {
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
- when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class)))
+ when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
.thenReturn(CompletableFuture.completedFuture(producer));
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 89efcf5..399a4c7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.functions.instance.producers;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -190,6 +192,11 @@ public class MultiConsumersOneOutputTopicProducersTest {
public ProducerBuilder<byte[]> properties(Map<String, String> properties) {
return this;
}
+
+ @Override
+ public ProducerBuilder<byte[]> intercept(ProducerInterceptor<byte[]>... interceptors) {
+ return null;
+ }
}
@BeforeMethod
@@ -197,7 +204,7 @@ public class MultiConsumersOneOutputTopicProducersTest {
this.mockClient = mock(PulsarClient.class);
when(mockClient.newProducer(any(Schema.class)))
- .thenReturn(new MockProducerBuilder());
+ .thenReturn(new MockProducerBuilder());
producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test");
producers.initialize();
@@ -206,12 +213,12 @@ public class MultiConsumersOneOutputTopicProducersTest {
private Producer<byte[]> createMockProducer(String topic) {
Producer<byte[]> producer = mock(Producer.class);
when(producer.closeAsync())
- .thenAnswer(invocationOnMock -> {
- synchronized (mockProducers) {
- mockProducers.remove(topic);
- }
- return FutureUtils.Void();
- });
+ .thenAnswer(invocationOnMock -> {
+ synchronized (mockProducers) {
+ mockProducers.remove(topic);
+ }
+ return FutureUtils.Void();
+ });
return producer;
}
@@ -224,13 +231,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .newProducer(Schema.BYTES);
+ .newProducer(Schema.BYTES);
assertTrue(producers.getProducers().containsKey(producerName));
// second get will not create a new producer
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
- .newProducer(Schema.BYTES);
+ .newProducer(Schema.BYTES);
assertTrue(producers.getProducers().containsKey(producerName));
// close