You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/28 20:34:04 UTC

[pulsar] branch master updated: Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0487804  Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)
0487804 is described below

commit 0487804c6ba39f227c1f2e7d293700dcc3dcf646
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Thu Mar 28 13:33:58 2019 -0700

    Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)
    
    * Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer.
    Apply onConsume in poll() before returning the ConsumerRecords,
    apply onCommit in doCommitOffsets() before committing all offsets.
    
    Also apply doc to reflect support for ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.
    
    * Update error message for applying onConsume and onCommit for interceptors to include the specific interceptor name.
---
 .../clients/consumer/PulsarKafkaConsumer.java      | 46 +++++++++++++++++++++-
 site2/docs/adaptors-kafka.md                       |  1 +
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0592a4b..0c0059f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -86,6 +86,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
     private final SubscriptionInitialPosition strategy;
 
+    private List<ConsumerInterceptor<K, V>> interceptors;
+
     private volatile boolean closed = false;
 
     private final int maxRecordsInSinglePoll;
@@ -162,6 +164,9 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
             maxRecordsInSinglePoll = 1000;
         }
 
+        interceptors = (List) config.getConfiguredInstances(
+                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
+
         this.properties = new Properties();
         config.originals().forEach((k, v) -> properties.put(k, v));
         ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
@@ -374,7 +379,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
                 commitAsync();
             }
 
-            return new ConsumerRecords<>(records);
+            // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
+            return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
@@ -438,6 +444,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
     private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
 
+        applyConsumerInterceptorsOnCommit(interceptors, offsets);
         offsets.forEach((topicPartition, offsetAndMetadata) -> {
             org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
             lastCommittedOffset.put(topicPartition, offsetAndMetadata);
@@ -457,6 +464,43 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
         return offsets;
     }
 
+    /**
+     * Apply all onConsume methods in a list of ConsumerInterceptors.
+     * Catch any exception during the process.
+     *
+     * @param interceptors     Interceptors provided.
+     * @param consumerRecords  ConsumerRecords returned by calling {@link this#poll(long)}.
+     * @return                 ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
+     */
+    private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
+        ConsumerRecords processedConsumerRecords = consumerRecords;
+        for (ConsumerInterceptor interceptor : interceptors) {
+            try {
+                processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
+            } catch (Exception e) {
+                log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
+            }
+        }
+        return processedConsumerRecords;
+    }
+
+    /**
+     * Apply all onCommit methods in a list of ConsumerInterceptors.
+     * Catch any exception during the process.
+     *
+     * @param interceptors   Interceptors provided.
+     * @param offsets        Offsets need to be commit.
+     */
+    private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
+        for (ConsumerInterceptor interceptor : interceptors) {
+            try {
+                interceptor.onCommit(offsets);
+            } catch (Exception e) {
+                log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
+            }
+        }
+    }
+
     @Override
     public void seek(TopicPartition partition, long offset) {
         MessageId msgId = MessageIdUtils.getMessageId(offset);
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 785cdcc..ca4f877 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -209,6 +209,7 @@ Properties:
 | `fetch.min.bytes`               | Ignored   |                                                       |
 | `fetch.max.bytes`               | Ignored   |                                                       |
 | `fetch.max.wait.ms`             | Ignored   |                                                       |
+| `interceptor.classes`           | Yes       |                                                       |
 | `metadata.max.age.ms`           | Ignored   |                                                       |
 | `max.partition.fetch.bytes`     | Ignored   |                                                       |
 | `send.buffer.bytes`             | Ignored   |                                                       |