You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/05 17:39:20 UTC
[camel] branch main updated: CAMEL-18356: consolidated the Kafka resume strategies
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 0e13aa0d354 CAMEL-18356: consolidated the Kafka resume strategies
0e13aa0d354 is described below
commit 0e13aa0d3542b548b4cc6a5a4407e61f69350cbb
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Aug 5 15:39:17 2022 +0200
CAMEL-18356: consolidated the Kafka resume strategies
---
.../resume/kafka/MultiNodeKafkaResumeStrategy.java | 131 ---------------------
.../kafka/SingleNodeKafkaResumeStrategy.java | 60 ++++++++--
2 files changed, 48 insertions(+), 143 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
deleted file mode 100644
index 4cfe256fb3d..00000000000
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.camel.resume.Deserializable;
-import org.apache.camel.resume.Resumable;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for multi node
- * integrations. This is suitable, for instance, when using clusters with the master component.
- *
- * @param <K> the type of key
- */
-public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> {
- private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
- private final ExecutorService executorService;
-
- /**
- * Create a new instance of this class
- *
- * @param resumeStrategyConfiguration the configuration to use for this strategy instance
- */
- public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
- // just in case users don't want to provide their own worker thread pool
- this(resumeStrategyConfiguration, Executors.newSingleThreadExecutor());
- }
-
- /**
- * Builds an instance of this class
- *
- * @param resumeStrategyConfiguration the configuration to use for this strategy instance
- * @param executorService an executor service that will run a separate thread for periodically
- * refreshing the offsets
- */
-
- public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
- ExecutorService executorService) {
- super(resumeStrategyConfiguration);
-
- this.executorService = executorService;
- }
-
- protected void poll() {
- poll(getConsumer());
- }
-
- protected void poll(Consumer<byte[], byte[]> consumer) {
- Deserializable deserializable = (Deserializable) getAdapter();
-
- ConsumerRecords<byte[], byte[]> records;
- do {
- records = consume(10, consumer);
-
- if (records.isEmpty()) {
- break;
- }
-
- for (ConsumerRecord<byte[], byte[]> record : records) {
- byte[] value = record.value();
-
- LOG.trace("Read from Kafka: {}", value);
-
- deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()));
- }
- } while (true);
- }
-
- @Override
- public void loadCache() throws Exception {
- super.loadCache();
-
- executorService.submit(() -> refresh());
- }
-
- /**
- * Launch a thread to refresh the offsets periodically
- */
- private void refresh() {
- LOG.trace("Creating a offset cache refresher");
- try {
- Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
- prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-
- try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
- consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
- poll(consumer);
- }
- } catch (Exception e) {
- LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
- }
- }
-
- @Override
- public void stop() {
- try {
- executorService.shutdown();
- } finally {
- super.stop();
- }
- }
-}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 870185127de..db87f65074a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,13 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
+import java.util.Properties;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Cacheable;
@@ -37,6 +41,7 @@ import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,9 +55,7 @@ import org.slf4j.LoggerFactory;
/**
* A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
- * integrations. For multi-node integrations (i.e: using clusters with the master component check
- * {@link MultiNodeKafkaResumeStrategy}.
- *
+ * integrations.
*/
public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
@@ -66,15 +69,29 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
private boolean subscribed;
private ResumeAdapter adapter;
private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+ private final ExecutorService executorService;
/**
* Builds an instance of this class
- *
+ *
* @param resumeStrategyConfiguration the configuration to use for this strategy instance
*
*/
public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Builds an instance of this class
+ *
+ * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+ *
+ */
+ public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
+ ExecutorService executorService) {
+ this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+ this.executorService = executorService;
}
/**
@@ -125,7 +142,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Loads the existing data into the cache
- *
+ *
* @throws Exception
*/
public void loadCache() throws Exception {
@@ -138,9 +155,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
if (!(adapter instanceof Deserializable)) {
throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
}
- poll();
- unsubscribe();
+ executorService.submit(() -> refresh());
}
protected void poll() {
@@ -160,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.trace("Read from Kafka: {}", value);
if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
- break;
+ LOG.warn("Deserializar indicates that this is the last record to deserialize");
}
}
} while (true);
@@ -168,7 +184,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Subscribe to the topic if not subscribed yet
- *
+ *
* @param topic the topic to consume the messages from
*/
protected void checkAndSubscribe(String topic) {
@@ -181,7 +197,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Subscribe to the topic if not subscribed yet
- *
+ *
* @param topic the topic to consume the messages from
* @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
*/
@@ -196,7 +212,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
* to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
- *
+ *
* @param remaining the number of remaining messages on the topic to try to collect
* @return
*/
@@ -252,7 +268,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Consumes message from the topic previously setup
- *
+ *
* @param retries how many times to retry consuming data from the topic
* @return An instance of the consumer records
*/
@@ -349,6 +365,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.info("Closing the Kafka consumer");
IOHelper.close(producer, "Kafka consumer", LOG);
+
+ executorService.shutdown();
}
@Override
@@ -388,4 +406,22 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
return resumeStrategyConfiguration;
}
+ /**
+ * Launch a thread to refresh the offsets periodically
+ */
+ private void refresh() {
+ LOG.trace("Creating a offset cache refresher");
+ try {
+ Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
+ prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+ try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
+ consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+ poll();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+ }
+ }
}