You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/20 03:27:35 UTC
[pulsar] branch master updated: Add a wrapper around Kafka's
ProducerInterceptor to support Kafka's
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e77a165 Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)
e77a165 is described below
commit e77a165dd2c0f29268b079c5f9a213f3994ea3a8
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Tue Mar 19 20:27:30 2019 -0700
Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)
**Motivatio**
Add a wrapper around Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor` to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090
The wrapper will try to delegate all call to underlying instance of Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor` it holds.
When `PulsarKafkaProducer` convert a Kafka's `ProducerRecord` to Pulsar's `Message`, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.
When doing the delegation, we'll do
Pulsar`Message` -> Kafka's `ProducerRecord` -> invoke underlying Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor#onSend` -> Pulsar`Message`
It'll try to preserve all the information. Verified through unit test.
For `org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement` it'll call `org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement` only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.
---
.../clients/producer/PulsarKafkaProducer.java | 32 ++-
.../compat/KafkaProducerInterceptorWrapper.java | 254 +++++++++++++++++++++
.../clients/producer/PulsarKafkaProducerTest.java | 86 ++++++-
.../KafkaProducerInterceptorWrapperTest.java | 102 +++++++++
site2/docs/adaptors-kafka.md | 2 +-
5 files changed, 472 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 4906cf5..8ae3bf3 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
@@ -69,6 +70,8 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
+ private List<ProducerInterceptor<K, V>> interceptors;
+
public PulsarKafkaProducer(Map<String, Object> configs) {
this(configs, null, null);
}
@@ -157,6 +160,9 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
// Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
+
+ interceptors = (List) producerConfig.getConfiguredInstances(
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
}
@Override
@@ -238,7 +244,13 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
try {
// Add the partitions info for the new topic
cluster = cluster.withPartitions(readPartitionsInfo(topic));
- return pulsarProducerBuilder.clone().topic(topic).create();
+ List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
+ .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic))
+ .collect(Collectors.toList());
+ return pulsarProducerBuilder.clone()
+ .topic(topic)
+ .intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
+ .create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
@@ -308,5 +320,23 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
}
+ private ProducerInterceptor createKafkaProducerInterceptor(String clazz) {
+ try {
+ return (ProducerInterceptor) Class.forName(clazz).newInstance();
+ } catch (ClassNotFoundException e) {
+ String errorMessage = "Can't find Interceptor class: " + e.getMessage();
+ logger.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ } catch (InstantiationException e) {
+ String errorMessage = "Can't initiate provided Interceptor class: " + e.getMessage();
+ logger.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ } catch (IllegalAccessException e) {
+ String errorMessage = "Can't access provided Interceptor class: " + e.getMessage();
+ logger.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+ }
+
private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
new file mode 100644
index 0000000..99195ce
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+
+/**
+ * A wrapper for Kafka's {@link org.apache.kafka.clients.producer.ProducerInterceptor} to make pulsar support
+ * Kafka ProducerInterceptor. It holds an instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor}
+ * and it'll delegate all invocation to that instance.
+ * <p>
+ * Extend {@link ProducerInterceptor<byte[]>} as all Pulsar {@link Message} created by
+ * {@link org.apache.kafka.clients.producer.PulsarKafkaProducer} is of type byte[].
+ *
+ */
+public class KafkaProducerInterceptorWrapper<K, V> implements ProducerInterceptor<byte[]> {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaProducerInterceptorWrapper.class);
+
+ final private org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
+
+ // For serializer key/value, and to determine the deserializer for key/value.
+ private final Serializer<K> keySerializer;
+
+ private final Serializer<V> valueSerializer;
+
+ // Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar
+ // producer, it's safe to set it as final.
+ private final String topic;
+
+ private Schema<byte[]> scheme;
+
+ private long eventTime;
+
+ private String partitionID;
+
+ /**
+ * Create a wrapper of type {@link ProducerInterceptor} that will delegate all work to underlying Kafka's interceptor.
+ *
+ * @param kafkaProducerInterceptor Underlying instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor<K, V>}
+ * that this wrapper will delegate work to.
+ * @param keySerializer {@link Serializer} used to serialize Kafka {@link ProducerRecord#key}.
+ * @param valueSerializer {@link Serializer} used to serialize Kafka {@link ProducerRecord#value}.
+ * @param topic Topic this {@link ProducerInterceptor} will be associated to.
+ */
+ public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor,
+ Serializer<K> keySerializer,
+ Serializer<V> valueSerializer,
+ String topic) {
+ this.kafkaProducerInterceptor = kafkaProducerInterceptor;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.topic = topic;
+ }
+
+ /**
+ * Called when interceptor is closed.
+ * The wrapper itself doesn't own any resource, just call underlying {@link org.apache.kafka.clients.producer.ProducerInterceptor#close()}
+ */
+ @Override
+ public void close() {
+ kafkaProducerInterceptor.close();
+ }
+
+ /**
+ * It tries to convert a Pulsar {@link Message} to a Kafka{@link ProducerRecord}, pass it to underlying
+ * {@link org.apache.kafka.clients.producer.ProducerInterceptor#onSend(ProducerRecord)} then convert the output
+ * back to Pulsar {@link Message}.
+ * <p>
+ * When build a Pulsar {@link Message} at {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage}
+ * schema, eventtime, partitionID, key and value are set. All this information will be preserved during the conversion.
+ *
+ * @param producer the producer which contains the interceptor, will be ignored as Kafka
+ * {@link org.apache.kafka.clients.producer.ProducerInterceptor} doesn't use it.
+ * @param message message to send
+ * @return Processed message.
+ */
+ @Override
+ public Message<byte[]> beforeSend(Producer<byte[]> producer, Message<byte[]> message) {
+ return toPulsarMessage(kafkaProducerInterceptor.onSend(toKafkaRecord(message)));
+ }
+
+ /**
+ * Delegate work to {@link org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement}
+ * @param producer the producer which contains the interceptor.
+ * @param message the message that application sends
+ * @param msgId the message id that assigned by the broker; null if send failed.
+ * @param exception the exception on sending messages, null indicates send has succeed.
+ */
+ @Override
+ public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId msgId, Throwable exception) {
+ try {
+ PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
+ partitionID = getPartitionID(messageMetadataBuilder);
+ TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
+ kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
+ -1,
+ -1,
+ messageMetadataBuilder.getEventTime(),
+ -1,
+ message.getKeyBytes().length,
+ message.getValue().length), new Exception(exception));
+ } catch (NumberFormatException e) {
+ String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage();
+ log.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+ }
+
+ /**
+ * Convert a Kafka {@link ProducerRecord} to a Pulsar {@link Message}.
+ *
+ * @param producerRecord Kafka record to be convert.
+ * @return Pulsar message.
+ */
+ private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
+ TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme);
+ typedMessageBuilder.key(serializeKey(topic, producerRecord.key()));
+ typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value()));
+ typedMessageBuilder.eventTime(eventTime);
+ typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID);
+ return typedMessageBuilder.getMessage();
+ }
+
+ /**
+ * Convert a Pulsar {@link Message} to a Kafka {@link ProducerRecord}.
+ * First it'll store those field that Kafka record doesn't need such as schema.
+ * Then it try to deserialize the value as it's been serialized to byte[] when creating the message.
+ *
+ * @param message Pulsar message to be convert.
+ * @return Kafka record.
+ */
+ private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
+ Deserializer valueDeserializer = getDeserializer(valueSerializer);
+ V value = (V) valueDeserializer.deserialize(topic, message.getValue());
+ try {
+ scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
+ PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
+ partitionID = getPartitionID(messageMetadataBuilder);
+ eventTime = message.getEventTime();
+ return new ProducerRecord<>(topic, Integer.parseInt(partitionID), eventTime, deserializeKey(topic, message.getKey()), value);
+ } catch (NumberFormatException e) {
+ // If not able to parse partitionID, ignore it.
+ return new ProducerRecord<>(topic, deserializeKey(topic, message.getKey()), value);
+ } catch (IllegalAccessException e) {
+ String errorMessage = "Unable to get the schema of message due to " + e.getMessage();
+ log.error(errorMessage);
+ throw new RuntimeException(errorMessage);
+ }
+ }
+
+ private String serializeKey(String topic, K key) {
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (keySerializer instanceof StringSerializer) {
+ return (String) key;
+ } else {
+ byte[] keyBytes = keySerializer.serialize(topic, key);
+ return Base64.getEncoder().encodeToString(keyBytes);
+ }
+ }
+
+ private K deserializeKey(String topic, String key) {
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (keySerializer instanceof StringSerializer) {
+ return (K) key;
+ } else {
+ Deserializer keyDeserializer = getDeserializer(keySerializer);
+ return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
+ }
+ }
+
+ /**
+ * Try to get the partitionID from messageMetadataBuilder.
+ * As it is set in {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage}, can guarantee
+ * a partitionID will be return.
+ *
+ * @param messageMetadataBuilder
+ * @return PartitionID
+ */
+ private String getPartitionID(PulsarApi.MessageMetadata.Builder messageMetadataBuilder) {
+ return messageMetadataBuilder.getPropertiesList()
+ .stream()
+ .filter(keyValue -> keyValue.getKey().equals(KafkaMessageRouter.PARTITION_ID))
+ .findFirst()
+ .get()
+ .getValue();
+ }
+
+ private Deserializer getDeserializer(Serializer serializer) {
+ if (serializer instanceof StringSerializer) {
+ return new StringDeserializer();
+ } else if (serializer instanceof LongDeserializer) {
+ return new LongDeserializer();
+ } else if (serializer instanceof IntegerSerializer) {
+ return new IntegerDeserializer();
+ } else if (serializer instanceof DoubleSerializer) {
+ return new DoubleDeserializer();
+ } else if (serializer instanceof BytesSerializer) {
+ return new BytesDeserializer();
+ } else if (serializer instanceof ByteBufferSerializer) {
+ return new ByteBufferDeserializer();
+ } else if (serializer instanceof ByteArraySerializer) {
+ return new ByteArrayDeserializer();
+ } else {
+ throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
+ }
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 37ff221..2c4af1e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -19,12 +19,18 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
@@ -35,13 +41,18 @@ import org.testng.IObjectFactory;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyVararg;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -50,7 +61,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
@ObjectFactory
@@ -92,12 +103,60 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
- PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+ new PulsarKafkaProducer<>(properties, null, null);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
}
+ @Test
+ public void testPulsarKafkaInterceptor() throws PulsarClientException {
+ // Arrange
+ PulsarClient mockClient = mock(PulsarClient.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ CompletableFuture mockPartitionFuture = new CompletableFuture();
+ CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+ TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+ mockPartitionFuture.complete(new ArrayList<>());
+ mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+ doReturn(mockClient).when(mockClientBuilder).build();
+ doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg());
+ doReturn(mockProducer).when(mockProducerBuilder).create();
+ doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+ doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ List interceptors = new ArrayList();
+ interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
+
+ // Act
+ PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+
+ pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
+
+ // Verify
+ verify(mockProducerBuilder, times(1)).intercept(anyVararg());
+ }
+
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
Properties properties = new Properties();
@@ -110,4 +169,27 @@ public class PulsarKafkaProducerTest {
new PulsarKafkaProducer<>(properties, null, null);
}
+ public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
+
+ @Override
+ public ProducerRecord onSend(ProducerRecord record) {
+ return null;
+ }
+
+ @Override
+ public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+ }
+
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
new file mode 100644
index 0000000..8da6339
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.impl.ProducerInterceptors;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class KafkaProducerInterceptorWrapperTest {
+
+ /**
+ * This test case is to make sure information is not lost during process of convert Pulsar message to Kafka record
+ * and back to Pulsar message.
+ */
+ @Test
+ public void testProducerInterceptorConvertRecordCorrectly() {
+
+ String topic = "topic name";
+ int partitionID = 666;
+ long timeStamp = Math.abs(new Random().nextLong());
+
+ org.apache.kafka.clients.producer.ProducerInterceptor<String, byte[]> mockInterceptor1 =
+ (org.apache.kafka.clients.producer.ProducerInterceptor<String, byte[]>) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
+ Assert.assertEquals(record.key(), "original key");
+ Assert.assertEquals(record.value(), "original value".getBytes());
+ Assert.assertEquals(record.timestamp().longValue(), timeStamp);
+ Assert.assertEquals(record.partition().intValue(), partitionID);
+ return new ProducerRecord<String, byte[]>(topic, "processed key", "processed value".getBytes());
+ }
+ }).when(mockInterceptor1).onSend(any(ProducerRecord.class));
+
+ org.apache.kafka.clients.producer.ProducerInterceptor<String, String> mockInterceptor2 =
+ (org.apache.kafka.clients.producer.ProducerInterceptor<String, String>) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
+ Assert.assertEquals(record.key(), "processed key");
+ Assert.assertEquals(record.value(), "processed value".getBytes());
+ Assert.assertEquals(record.timestamp().longValue(), timeStamp);
+ Assert.assertEquals(record.partition().intValue(), partitionID);
+ return record;
+ }
+ }).when(mockInterceptor2).onSend(any(ProducerRecord.class));
+
+ ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
+ new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic),
+ new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)}));
+
+ TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
+ typedMessageBuilder.key("original key");
+ typedMessageBuilder.value("original value".getBytes());
+ typedMessageBuilder.eventTime(timeStamp);
+ typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, String.valueOf(partitionID));
+ typedMessageBuilder.getMessage();
+
+ producerInterceptors.beforeSend(null, typedMessageBuilder.getMessage());
+
+ verify(mockInterceptor1, times(1)).onSend(any(ProducerRecord.class));
+ verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class));
+ }
+
+}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 924192d..cb97fe2 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -137,7 +137,7 @@ Properties:
| `client.id` | Ignored | |
| `compression.type` | Yes | Allows `gzip` and `lz4`. No `snappy`. |
| `connections.max.idle.ms` | Yes | Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time|
-| `interceptor.classes` | Ignored | |
+| `interceptor.classes` | Yes | |
| `key.serializer` | Yes | |
| `linger.ms` | Yes | Controls the group commit time when batching messages |
| `max.block.ms` | Ignored | |