You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/20 03:27:35 UTC

[pulsar] branch master updated: Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e77a165  Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)
e77a165 is described below

commit e77a165dd2c0f29268b079c5f9a213f3994ea3a8
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Tue Mar 19 20:27:30 2019 -0700

    Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)
    
    **Motivatio**
    Add a wrapper around Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor` to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090
    
    The wrapper will try to delegate all call to underlying instance of Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor`  it holds.
    
    When `PulsarKafkaProducer` convert a Kafka's `ProducerRecord` to Pulsar's `Message`, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.
    When doing the delegation, we'll do
    Pulsar`Message` -> Kafka's `ProducerRecord` -> invoke underlying Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor#onSend`  -> Pulsar`Message`
    It'll try to preserve all the information. Verified through unit test.
    For `org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement` it'll call `org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement` only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.
---
 .../clients/producer/PulsarKafkaProducer.java      |  32 ++-
 .../compat/KafkaProducerInterceptorWrapper.java    | 254 +++++++++++++++++++++
 .../clients/producer/PulsarKafkaProducerTest.java  |  86 ++++++-
 .../KafkaProducerInterceptorWrapperTest.java       | 102 +++++++++
 site2/docs/adaptors-kafka.md                       |   2 +-
 5 files changed, 472 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 4906cf5..8ae3bf3 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
 import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
@@ -69,6 +70,8 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
     private final Partitioner partitioner;
     private volatile Cluster cluster = Cluster.empty();
 
+    private List<ProducerInterceptor<K, V>> interceptors;
+
     public PulsarKafkaProducer(Map<String, Object> configs) {
         this(configs, null, null);
     }
@@ -157,6 +160,9 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
         boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull;
         pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
+
+        interceptors = (List) producerConfig.getConfiguredInstances(
+                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
     }
 
     @Override
@@ -238,7 +244,13 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         try {
             // Add the partitions info for the new topic
             cluster = cluster.withPartitions(readPartitionsInfo(topic));
-            return pulsarProducerBuilder.clone().topic(topic).create();
+            List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
+                    .map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic))
+                    .collect(Collectors.toList());
+            return pulsarProducerBuilder.clone()
+                    .topic(topic)
+                    .intercept(wrappedInterceptors.toArray(new org.apache.pulsar.client.api.ProducerInterceptor[wrappedInterceptors.size()]))
+                    .create();
         } catch (PulsarClientException e) {
             throw new RuntimeException(e);
         }
@@ -308,5 +320,23 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
         return new RecordMetadata(tp, offset, 0, mb.getPublishTime(), 0, mb.hasKey() ? mb.getKey().length() : 0, size);
     }
 
+    private ProducerInterceptor createKafkaProducerInterceptor(String clazz) {
+        try {
+            return (ProducerInterceptor) Class.forName(clazz).newInstance();
+        } catch (ClassNotFoundException e) {
+            String errorMessage = "Can't find Interceptor class: " + e.getMessage();
+            logger.error(errorMessage);
+            throw new RuntimeException(errorMessage);
+        } catch (InstantiationException e) {
+            String errorMessage = "Can't initiate provided Interceptor class: " + e.getMessage();
+            logger.error(errorMessage);
+            throw new RuntimeException(errorMessage);
+        } catch (IllegalAccessException e) {
+            String errorMessage = "Can't access provided Interceptor class: " + e.getMessage();
+            logger.error(errorMessage);
+            throw new RuntimeException(errorMessage);
+        }
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
new file mode 100644
index 0000000..99195ce
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapper.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+
+/**
+ * A wrapper for Kafka's {@link org.apache.kafka.clients.producer.ProducerInterceptor} to make pulsar support
+ * Kafka ProducerInterceptor. It holds an instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor}
+ * and it'll delegate all invocation to that instance.
+ * <p>
+ * Extend {@link ProducerInterceptor<byte[]>} as all Pulsar {@link Message} created by
+ * {@link org.apache.kafka.clients.producer.PulsarKafkaProducer} is of type byte[].
+ *
+ */
+public class KafkaProducerInterceptorWrapper<K, V> implements ProducerInterceptor<byte[]> {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaProducerInterceptorWrapper.class);
+
+    final private org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
+
+    // For serializer key/value, and to determine the deserializer for key/value.
+    private final Serializer<K> keySerializer;
+
+    private final Serializer<V> valueSerializer;
+
+    // Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar
+    // producer, it's safe to set it as final.
+    private final String topic;
+
+    private Schema<byte[]> scheme;
+
+    private long eventTime;
+
+    private String partitionID;
+
+    /**
+     * Create a wrapper of type {@link ProducerInterceptor} that will delegate all work to underlying Kafka's interceptor.
+     *
+     * @param kafkaProducerInterceptor Underlying instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor<K, V>}
+     *                                 that this wrapper will delegate work to.
+     * @param keySerializer            {@link Serializer} used to serialize Kafka {@link ProducerRecord#key}.
+     * @param valueSerializer          {@link Serializer} used to serialize Kafka {@link ProducerRecord#value}.
+     * @param topic                    Topic this {@link ProducerInterceptor} will be associated to.
+     */
+    public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor,
+                                           Serializer<K> keySerializer,
+                                           Serializer<V> valueSerializer,
+                                           String topic) {
+        this.kafkaProducerInterceptor = kafkaProducerInterceptor;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.topic = topic;
+    }
+
+    /**
+     * Called when interceptor is closed.
+     * The wrapper itself doesn't own any resource, just call underlying {@link org.apache.kafka.clients.producer.ProducerInterceptor#close()}
+     */
+    @Override
+    public void close() {
+        kafkaProducerInterceptor.close();
+    }
+
+    /**
+     * It tries to convert a Pulsar {@link Message} to a Kafka{@link ProducerRecord}, pass it to underlying
+     * {@link org.apache.kafka.clients.producer.ProducerInterceptor#onSend(ProducerRecord)} then convert the output
+     * back to Pulsar {@link Message}.
+     * <p>
+     * When build a Pulsar {@link Message} at {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage}
+     * schema, eventtime, partitionID, key and value are set. All this information will be preserved during the conversion.
+     *
+     * @param producer the producer which contains the interceptor, will be ignored as Kafka
+     *                 {@link org.apache.kafka.clients.producer.ProducerInterceptor} doesn't use it.
+     * @param message message to send
+     * @return Processed message.
+     */
+    @Override
+    public Message<byte[]> beforeSend(Producer<byte[]> producer, Message<byte[]> message) {
+        return toPulsarMessage(kafkaProducerInterceptor.onSend(toKafkaRecord(message)));
+    }
+
+    /**
+     * Delegate work to {@link org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement}
+     * @param producer the producer which contains the interceptor.
+     * @param message the message that application sends
+     * @param msgId the message id that assigned by the broker; null if send failed.
+     * @param exception the exception on sending messages, null indicates send has succeed.
+     */
+    @Override
+    public void onSendAcknowledgement(Producer<byte[]> producer, Message<byte[]> message, MessageId msgId, Throwable exception) {
+        try {
+            PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
+            partitionID = getPartitionID(messageMetadataBuilder);
+            TopicPartition topicPartition = new TopicPartition(topic, Integer.parseInt(partitionID));
+            kafkaProducerInterceptor.onAcknowledgement(new RecordMetadata(topicPartition,
+                                                                -1,
+                                                                -1,
+                                                                messageMetadataBuilder.getEventTime(),
+                                                                -1,
+                                                                message.getKeyBytes().length,
+                                                                message.getValue().length), new Exception(exception));
+        } catch (NumberFormatException e) {
+            String errorMessage = "Unable to convert partitionID to integer: " + e.getMessage();
+            log.error(errorMessage);
+            throw new RuntimeException(errorMessage);
+        }
+    }
+
+    /**
+     * Convert a Kafka {@link ProducerRecord} to a Pulsar {@link Message}.
+     *
+     * @param producerRecord Kafka record to be convert.
+     * @return Pulsar message.
+     */
+    private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
+        TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme);
+        typedMessageBuilder.key(serializeKey(topic, producerRecord.key()));
+        typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value()));
+        typedMessageBuilder.eventTime(eventTime);
+        typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID);
+        return typedMessageBuilder.getMessage();
+    }
+
+    /**
+     * Convert a Pulsar {@link Message} to a Kafka {@link ProducerRecord}.
+     * First it'll store those field that Kafka record doesn't need such as schema.
+     * Then it try to deserialize the value as it's been serialized to byte[] when creating the message.
+     *
+     * @param message Pulsar message to be convert.
+     * @return Kafka record.
+     */
+    private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
+        Deserializer valueDeserializer = getDeserializer(valueSerializer);
+        V value = (V) valueDeserializer.deserialize(topic, message.getValue());
+        try {
+            scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
+            PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
+            partitionID = getPartitionID(messageMetadataBuilder);
+            eventTime = message.getEventTime();
+            return new ProducerRecord<>(topic, Integer.parseInt(partitionID), eventTime, deserializeKey(topic, message.getKey()), value);
+        } catch (NumberFormatException e) {
+            // If not able to parse partitionID, ignore it.
+            return new ProducerRecord<>(topic, deserializeKey(topic, message.getKey()), value);
+        } catch (IllegalAccessException e) {
+            String errorMessage = "Unable to get the schema of message due to " + e.getMessage();
+            log.error(errorMessage);
+            throw new RuntimeException(errorMessage);
+        }
+    }
+
+    private String serializeKey(String topic, K key) {
+        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+        if (keySerializer instanceof StringSerializer) {
+            return (String) key;
+        } else {
+            byte[] keyBytes = keySerializer.serialize(topic, key);
+            return Base64.getEncoder().encodeToString(keyBytes);
+        }
+    }
+
+    private K deserializeKey(String topic, String key) {
+        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+        if (keySerializer instanceof StringSerializer) {
+            return (K) key;
+        } else {
+            Deserializer keyDeserializer = getDeserializer(keySerializer);
+            return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
+        }
+    }
+
+    /**
+     * Try to get the partitionID from messageMetadataBuilder.
+     * As it is set in {@link org.apache.kafka.clients.producer.PulsarKafkaProducer#buildMessage}, can guarantee
+     * a partitionID will be return.
+     *
+     * @param messageMetadataBuilder
+     * @return PartitionID
+     */
+    private String getPartitionID(PulsarApi.MessageMetadata.Builder messageMetadataBuilder) {
+        return messageMetadataBuilder.getPropertiesList()
+                                    .stream()
+                                    .filter(keyValue -> keyValue.getKey().equals(KafkaMessageRouter.PARTITION_ID))
+                                    .findFirst()
+                                    .get()
+                                    .getValue();
+    }
+
+    private Deserializer getDeserializer(Serializer serializer) {
+        if (serializer instanceof StringSerializer) {
+            return new StringDeserializer();
+        } else if (serializer instanceof LongDeserializer) {
+            return new LongDeserializer();
+        } else if (serializer instanceof IntegerSerializer) {
+            return new IntegerDeserializer();
+        } else if (serializer instanceof DoubleSerializer) {
+            return new DoubleDeserializer();
+        } else if (serializer instanceof BytesSerializer) {
+            return new BytesDeserializer();
+        } else if (serializer instanceof ByteBufferSerializer) {
+            return new ByteBufferDeserializer();
+        } else if (serializer instanceof ByteArraySerializer) {
+            return new ByteArrayDeserializer();
+        } else {
+            throw new IllegalArgumentException(serializer.getClass().getName() + " is not a valid or supported subclass of org.apache.kafka.common.serialization.Serializer.");
+        }
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 37ff221..2c4af1e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -19,12 +19,18 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
@@ -35,13 +41,18 @@ import org.testng.IObjectFactory;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.anyVararg;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -50,7 +61,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
-@PowerMockIgnore({"org.apache.logging.log4j.*"})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
 public class PulsarKafkaProducerTest {
 
     @ObjectFactory
@@ -92,12 +103,60 @@ public class PulsarKafkaProducerTest {
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
         properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
 
-        PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+        new PulsarKafkaProducer<>(properties, null, null);
 
         verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
         verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
     }
 
+    @Test
+    public void testPulsarKafkaInterceptor() throws PulsarClientException {
+        // Arrange
+        PulsarClient mockClient = mock(PulsarClient.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        CompletableFuture mockPartitionFuture = new CompletableFuture();
+        CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+        TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+        mockPartitionFuture.complete(new ArrayList<>());
+        mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+        doReturn(mockClient).when(mockClientBuilder).build();
+        doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg());
+        doReturn(mockProducer).when(mockProducerBuilder).create();
+        doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+        doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        List interceptors =  new ArrayList();
+        interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
+
+        // Act
+        PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
+
+        // Verify
+        verify(mockProducerBuilder, times(1)).intercept(anyVararg());
+    }
+
     @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
     public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
         Properties properties = new Properties();
@@ -110,4 +169,27 @@ public class PulsarKafkaProducerTest {
         new PulsarKafkaProducer<>(properties, null, null);
     }
 
+    public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
+
+        @Override
+        public ProducerRecord onSend(ProducerRecord record) {
+            return null;
+        }
+
+        @Override
+        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+
+        }
+    }
+
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
new file mode 100644
index 0000000..8da6339
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/pulsar/client/kafka/compat/KafkaProducerInterceptorWrapperTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.apache.pulsar.client.impl.ProducerInterceptors;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class KafkaProducerInterceptorWrapperTest {
+
+    /**
+     * This test case is to make sure information is not lost during process of convert Pulsar message to Kafka record
+     * and back to Pulsar message.
+     */
+    @Test
+    public void testProducerInterceptorConvertRecordCorrectly() {
+
+        String topic = "topic name";
+        int partitionID = 666;
+        long timeStamp = Math.abs(new Random().nextLong());
+
+        org.apache.kafka.clients.producer.ProducerInterceptor<String, byte[]> mockInterceptor1 =
+                (org.apache.kafka.clients.producer.ProducerInterceptor<String, byte[]>) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
+                Assert.assertEquals(record.key(), "original key");
+                Assert.assertEquals(record.value(), "original value".getBytes());
+                Assert.assertEquals(record.timestamp().longValue(), timeStamp);
+                Assert.assertEquals(record.partition().intValue(), partitionID);
+                return new ProducerRecord<String, byte[]>(topic, "processed key", "processed value".getBytes());
+            }
+        }).when(mockInterceptor1).onSend(any(ProducerRecord.class));
+
+        org.apache.kafka.clients.producer.ProducerInterceptor<String, String> mockInterceptor2 =
+                (org.apache.kafka.clients.producer.ProducerInterceptor<String, String>) mock(org.apache.kafka.clients.producer.ProducerInterceptor.class);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                ProducerRecord<String, byte[]> record = (ProducerRecord<String, byte[]>) invocation.getArguments()[0];
+                Assert.assertEquals(record.key(), "processed key");
+                Assert.assertEquals(record.value(), "processed value".getBytes());
+                Assert.assertEquals(record.timestamp().longValue(), timeStamp);
+                Assert.assertEquals(record.partition().intValue(), partitionID);
+                return record;
+            }
+        }).when(mockInterceptor2).onSend(any(ProducerRecord.class));
+
+        ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
+                new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic),
+                new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)}));
+
+        TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
+        typedMessageBuilder.key("original key");
+        typedMessageBuilder.value("original value".getBytes());
+        typedMessageBuilder.eventTime(timeStamp);
+        typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, String.valueOf(partitionID));
+        typedMessageBuilder.getMessage();
+
+        producerInterceptors.beforeSend(null, typedMessageBuilder.getMessage());
+
+        verify(mockInterceptor1, times(1)).onSend(any(ProducerRecord.class));
+        verify(mockInterceptor2, times(1)).onSend(any(ProducerRecord.class));
+    }
+
+}
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 924192d..cb97fe2 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -137,7 +137,7 @@ Properties:
 | `client.id`                             | Ignored   |                                                                               |
 | `compression.type`                      | Yes       | Allows `gzip` and `lz4`. No `snappy`.                                         |
 | `connections.max.idle.ms`               | Yes       | Only support up to 2,147,483,647,000(Integer.MAX_VALUE * 1000) ms of idle time|
-| `interceptor.classes`                   | Ignored   |                                                                               |
+| `interceptor.classes`                   | Yes       |                                                                               |
 | `key.serializer`                        | Yes       |                                                                               |
 | `linger.ms`                             | Yes       | Controls the group commit time when batching messages                         |
 | `max.block.ms`                          | Ignored   |                                                                               |