You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 16:26:39 UTC
[camel] 01/06: CAMEL-18356: consolidated the Kafka resume strategies
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9e832b0428cf7f0101e9be2cb6b9c19f4958b5fd
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 16:50:25 2022 +0200
CAMEL-18356: consolidated the Kafka resume strategies
---
.../resume/kafka/MultiNodeKafkaResumeStrategy.java | 64 +---------------------
.../kafka/SingleNodeKafkaResumeStrategy.java | 60 ++++++++++++++++----
2 files changed, 50 insertions(+), 74 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 4cfe256fb3d..82580200f03 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory;
*
* @param <K> the type of key
*/
+@Deprecated
public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> {
private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
- private final ExecutorService executorService;
/**
* Create a new instance of this class
@@ -64,68 +64,8 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
ExecutorService executorService) {
- super(resumeStrategyConfiguration);
-
- this.executorService = executorService;
- }
-
- protected void poll() {
- poll(getConsumer());
- }
-
- protected void poll(Consumer<byte[], byte[]> consumer) {
- Deserializable deserializable = (Deserializable) getAdapter();
-
- ConsumerRecords<byte[], byte[]> records;
- do {
- records = consume(10, consumer);
-
- if (records.isEmpty()) {
- break;
- }
-
- for (ConsumerRecord<byte[], byte[]> record : records) {
- byte[] value = record.value();
-
- LOG.trace("Read from Kafka: {}", value);
-
- deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()));
- }
- } while (true);
+ super(resumeStrategyConfiguration, executorService);
}
- @Override
- public void loadCache() throws Exception {
- super.loadCache();
- executorService.submit(() -> refresh());
- }
-
- /**
- * Launch a thread to refresh the offsets periodically
- */
- private void refresh() {
- LOG.trace("Creating a offset cache refresher");
- try {
- Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
- prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-
- try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
- consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
- poll(consumer);
- }
- } catch (Exception e) {
- LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
- }
- }
-
- @Override
- public void stop() {
- try {
- executorService.shutdown();
- } finally {
- super.stop();
- }
- }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 870185127de..db87f65074a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,13 @@ import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
+import java.util.Properties;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Cacheable;
@@ -37,6 +41,7 @@ import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,9 +55,7 @@ import org.slf4j.LoggerFactory;
/**
* A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
- * integrations. For multi-node integrations (i.e: using clusters with the master component check
- * {@link MultiNodeKafkaResumeStrategy}.
- *
+ * integrations.
*/
public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
@@ -66,15 +69,29 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
private boolean subscribed;
private ResumeAdapter adapter;
private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+ private final ExecutorService executorService;
/**
* Builds an instance of this class
- *
+ *
* @param resumeStrategyConfiguration the configuration to use for this strategy instance
*
*/
public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+ executorService = Executors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Builds an instance of this class
+ *
+ * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+ *
+ */
+ public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
+ ExecutorService executorService) {
+ this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+ this.executorService = executorService;
}
/**
@@ -125,7 +142,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Loads the existing data into the cache
- *
+ *
* @throws Exception
*/
public void loadCache() throws Exception {
@@ -138,9 +155,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
if (!(adapter instanceof Deserializable)) {
throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
}
- poll();
- unsubscribe();
+ executorService.submit(() -> refresh());
}
protected void poll() {
@@ -160,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.trace("Read from Kafka: {}", value);
if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
- break;
+ LOG.warn("Deserializar indicates that this is the last record to deserialize");
}
}
} while (true);
@@ -168,7 +184,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Subscribe to the topic if not subscribed yet
- *
+ *
* @param topic the topic to consume the messages from
*/
protected void checkAndSubscribe(String topic) {
@@ -181,7 +197,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Subscribe to the topic if not subscribed yet
- *
+ *
* @param topic the topic to consume the messages from
* @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
*/
@@ -196,7 +212,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
* to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
- *
+ *
* @param remaining the number of remaining messages on the topic to try to collect
* @return
*/
@@ -252,7 +268,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
/**
* Consumes message from the topic previously setup
- *
+ *
* @param retries how many times to retry consuming data from the topic
* @return An instance of the consumer records
*/
@@ -349,6 +365,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.info("Closing the Kafka consumer");
IOHelper.close(producer, "Kafka consumer", LOG);
+
+ executorService.shutdown();
}
@Override
@@ -388,4 +406,22 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
return resumeStrategyConfiguration;
}
+ /**
+ * Launch a thread to refresh the offsets periodically
+ */
+ private void refresh() {
+ LOG.trace("Creating a offset cache refresher");
+ try {
+ Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
+ prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+ try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
+ consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+ poll();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+ }
+ }
}