You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:17 UTC
[camel] 11/16: CAMEL-15562: minor cleanups for the Kafka strategy
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 616b50d4851e712ae563e337e7f12c155697b1c9
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Feb 28 13:51:57 2022 +0100
CAMEL-15562: minor cleanups for the Kafka strategy
---
.../resume/kafka/AbstractKafkaResumeStrategy.java | 37 ++++++++++++++--------
1 file changed, 24 insertions(+), 13 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index a779c00..079d936 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -87,7 +87,13 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
init();
}
- private Properties createProducer(String bootstrapServers) {
+ /**
+ * Creates a basic string-based producer
+ *
+ * @param bootstrapServers the Kafka host
+ * @return A set of default properties for producing string-based key/pair records from Kafka
+ */
+ public static Properties createProducer(String bootstrapServers) {
Properties config = new Properties();
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
@@ -96,15 +102,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- // set up the producer to remove all batching on send, we want all sends
- // to be fully synchronous
- config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
- config.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
-
return config;
}
- private Properties createConsumer(String bootstrapServers) {
+ /**
+ * Creates a basic string-based consumer
+ *
+ * @param bootstrapServers the Kafka host
+ * @return A set of default properties for consuming string-based key/pair records from Kafka
+ */
+ public static Properties createConsumer(String bootstrapServers) {
Properties config = new Properties();
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -124,13 +131,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
/**
* Sends data to a topic
- *
- * @param topic the topic to send data to
+ *
* @param message the message to send
* @throws ExecutionException
* @throws InterruptedException
*/
- public void produce(String topic, K key, V message) throws ExecutionException, InterruptedException {
+ public void produce(K key, V message) throws ExecutionException, InterruptedException {
ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message);
errorCount = 0;
@@ -151,7 +157,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
LOG.debug("Updating offset on Kafka with key {} to {}", key, offsetValue);
- produce(topic, key, offsetValue);
+ produce(key, offsetValue);
resumeCache.add(key, offsetValue);
}
@@ -231,9 +237,14 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
public abstract void subscribe() throws Exception;
- // TODO: bad method
public void unsubscribe() {
- consumer.unsubscribe();
+ try {
+ consumer.unsubscribe();
+ } catch (IllegalStateException e) {
+ LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", topic);
+ } catch (Exception e) {
+ LOG.error("Error unsubscribing from the Kafka topic {}: {}", topic, e.getMessage(), e);
+ }
}
/**