You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/28 20:34:04 UTC
[pulsar] branch master updated: Support Kafka's
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0487804 Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)
0487804 is described below
commit 0487804c6ba39f227c1f2e7d293700dcc3dcf646
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Thu Mar 28 13:33:58 2019 -0700
Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)
* Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer.
Apply onConsume in poll() before returning the ConsumerRecords,
apply onCommit in doCommitOffsets() before committing all offsets.
Also apply doc to reflect support for ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.
* Update error message for applying onConsume and onCommit for interceptors to include the specific interceptor name.
---
.../clients/consumer/PulsarKafkaConsumer.java | 46 +++++++++++++++++++++-
site2/docs/adaptors-kafka.md | 1 +
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 0592a4b..0c0059f 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -86,6 +86,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
private final SubscriptionInitialPosition strategy;
+ private List<ConsumerInterceptor<K, V>> interceptors;
+
private volatile boolean closed = false;
private final int maxRecordsInSinglePoll;
@@ -162,6 +164,9 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
maxRecordsInSinglePoll = 1000;
}
+ interceptors = (List) config.getConfiguredInstances(
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
+
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
@@ -374,7 +379,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
commitAsync();
}
- return new ConsumerRecords<>(records);
+ // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
+ return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
@@ -438,6 +444,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
@@ -457,6 +464,43 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
return offsets;
}
+ /**
+ * Apply all onConsume methods in a list of ConsumerInterceptors.
+ * Catch any exception during the process.
+ *
+ * @param interceptors Interceptors provided.
+ * @param consumerRecords ConsumerRecords returned by calling {@link this#poll(long)}.
+ * @return ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
+ */
+ private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
+ ConsumerRecords processedConsumerRecords = consumerRecords;
+ for (ConsumerInterceptor interceptor : interceptors) {
+ try {
+ processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
+ } catch (Exception e) {
+ log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
+ }
+ }
+ return processedConsumerRecords;
+ }
+
+ /**
+ * Apply all onCommit methods in a list of ConsumerInterceptors.
+ * Catch any exception during the process.
+ *
+ * @param interceptors Interceptors provided.
+ * @param offsets Offsets need to be commit.
+ */
+ private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
+ for (ConsumerInterceptor interceptor : interceptors) {
+ try {
+ interceptor.onCommit(offsets);
+ } catch (Exception e) {
+ log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
+ }
+ }
+ }
+
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 785cdcc..ca4f877 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -209,6 +209,7 @@ Properties:
| `fetch.min.bytes` | Ignored | |
| `fetch.max.bytes` | Ignored | |
| `fetch.max.wait.ms` | Ignored | |
+| `interceptor.classes` | Yes | |
| `metadata.max.age.ms` | Ignored | |
| `max.partition.fetch.bytes` | Ignored | |
| `send.buffer.bytes` | Ignored | |