You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 15:01:18 UTC
[camel] 02/02: CAMEL-18362: wait for the initial cache loading
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit ae1b727400e707b2d7b310747ab062febf50619b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 16:58:49 2022 +0200
CAMEL-18362: wait for the initial cache loading
---
.../kafka/KafkaResumeStrategyConfiguration.java | 23 ++++++++++++++
.../KafkaResumeStrategyConfigurationBuilder.java | 17 ++++++++++
.../kafka/SingleNodeKafkaResumeStrategy.java | 37 ++++++++++++++++------
3 files changed, 68 insertions(+), 9 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 2e314079223..94aede6fadd 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.resume.kafka;
+import java.time.Duration;
import java.util.Properties;
import org.apache.camel.resume.ResumeStrategyConfiguration;
@@ -28,6 +29,8 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
private Properties producerProperties;
private Properties consumerProperties;
private String topic;
+ private Duration maxInitializationDuration;
+ private int maxInitializationRetries;
public Properties getProducerProperties() {
return producerProperties;
@@ -58,4 +61,24 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
this.topic = topic;
}
+
+ public Duration getMaxInitializationDuration() {
+ return maxInitializationDuration;
+ }
+
+ public void setMaxInitializationDuration(Duration maxInitializationDuration) {
+ this.maxInitializationDuration = maxInitializationDuration;
+ }
+
+ public int getMaxInitializationRetries() {
+ return maxInitializationRetries;
+ }
+
+ public void setMaxInitializationRetries(int maxInitializationRetries) {
+ if (maxInitializationRetries < 1) {
+ throw new IllegalArgumentException("The maximum number of initialization retries must be equal or bigger than 1");
+ }
+
+ this.maxInitializationRetries = maxInitializationRetries;
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 150936148d0..3c4e6a21ca5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.resume.kafka;
+import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
@@ -43,6 +44,8 @@ public class KafkaResumeStrategyConfigurationBuilder
private Properties producerProperties;
private Properties consumerProperties;
private String topic;
+ private Duration maxInitializationDuration = Duration.ofSeconds(10);
+ private int maxInitializationRetries = 5;
private KafkaResumeStrategyConfigurationBuilder() {
}
@@ -103,6 +106,18 @@ public class KafkaResumeStrategyConfigurationBuilder
return this;
}
+ public KafkaResumeStrategyConfigurationBuilder withMaxInitializationDuration(Duration duration) {
+ this.maxInitializationDuration = duration;
+
+ return this;
+ }
+
+ public KafkaResumeStrategyConfigurationBuilder withMaxInitializationRetries(int retries) {
+ this.maxInitializationRetries = retries;
+
+ return this;
+ }
+
/**
* Creates a basic consumer
*
@@ -138,6 +153,8 @@ public class KafkaResumeStrategyConfigurationBuilder
resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
resumeStrategyConfiguration.setProducerProperties(producerProperties);
resumeStrategyConfiguration.setTopic(topic);
+ resumeStrategyConfiguration.setMaxInitializationDuration(maxInitializationDuration);
+ resumeStrategyConfiguration.setMaxInitializationRetries(maxInitializationRetries);
return resumeStrategyConfiguration;
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index a527b66c9d7..9a60cce760f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -20,11 +20,13 @@ package org.apache.camel.processor.resume.kafka;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
+import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -152,13 +154,24 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
}
- executorService.submit(this::refresh);
+ CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+ executorService.submit(() -> refresh(latch));
+
+ try {
+ LOG.trace("Waiting for kafka resume strategy async initialization");
+ if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+ LOG.debug("The initialization timed out");
+ }
+ LOG.trace("Kafka resume strategy initialization complete");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
/**
* Launch a thread to refresh the offsets periodically
*/
- private void refresh() {
+ private void refresh(CountDownLatch latch) {
LOG.trace("Creating a offset cache refresher");
try {
@@ -169,7 +182,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
- poll(consumer);
+ poll(consumer, latch);
} catch (WakeupException e) {
LOG.info("Kafka consumer was interrupted during a blocking call");
} catch (Exception e) {
@@ -182,27 +195,33 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
}
}
- protected void poll(Consumer<byte[], byte[]> consumer) {
+ protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) {
Deserializable deserializable = (Deserializable) adapter;
+ boolean initialized = false;
do {
ConsumerRecords<byte[], byte[]> records = consume(consumer);
- if (records.isEmpty()) {
- continue;
- }
-
for (ConsumerRecord<byte[], byte[]> record : records) {
byte[] value = record.value();
if (LOG.isTraceEnabled()) {
- LOG.trace("Read from Kafka: {}", value);
+ LOG.trace("Read from Kafka at {} ({}): {}", Instant.ofEpochMilli(record.timestamp()),
+ record.timestampType(), value);
}
if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
LOG.warn("Deserializer indicates that this is the last record to deserialize");
}
}
+
+ if (!initialized) {
+ if (latch.getCount() == 1) {
+ initialized = true;
+ }
+
+ latch.countDown();
+ }
} while (true);
}