You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/15 06:24:19 UTC
[camel] branch master updated: CAMEL-16208: camel-vertx-kafka -
Optimize a little bit
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 89575e5 CAMEL-16208: camel-vertx-kafka - Optimize a little bit
89575e5 is described below
commit 89575e552d6f7a11786191f543a56aac557717cb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 15 07:23:27 2021 +0100
CAMEL-16208: camel-vertx-kafka - Optimize a little bit
---
.../component/vertx/kafka/VertxKafkaConsumer.java | 42 ++++++++++------------
1 file changed, 19 insertions(+), 23 deletions(-)
diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
index a857e44..ae99290 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfigurat
import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumer.class);
+ private Synchronization onCompletion;
private KafkaConsumer<Object, Object> kafkaConsumer;
public VertxKafkaConsumer(final VertxKafkaEndpoint endpoint, final Processor processor) {
@@ -43,6 +45,12 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
}
@Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ this.onCompletion = new ConsumerOnCompletion();
+ }
+
+ @Override
protected void doStart() throws Exception {
super.doStart();
@@ -105,33 +113,13 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
propagatedHeaders.forEach((key, value) -> exchange.getIn().setHeader(key, value));
// add exchange callback
- exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
- @Override
- public void onComplete(Exchange exchange) {
- // at the moment we don't commit the offsets manually, we can add it in the future
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- // we do nothing here
- processRollback(exchange);
- }
- });
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
// send message to next processor in the route
getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange));
}
private void onErrorListener(final Throwable error) {
- final Exchange exchange = getEndpoint().createExchange();
-
- // set the thrown exception
- exchange.setException(error);
-
- // log exception if an exception occurred and was not handled
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing exchange", exchange,
- exchange.getException());
- }
+ getExceptionHandler().handleException("Error from Kafka consumer.", error);
}
/**
@@ -139,10 +127,18 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
*
* @param exchange the exchange
*/
- private void processRollback(Exchange exchange) {
+ protected void processRollback(Exchange exchange) {
final Exception cause = exchange.getException();
if (cause != null) {
getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
}
}
+
+ private class ConsumerOnCompletion extends SynchronizationAdapter {
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ processRollback(exchange);
+ }
+ }
}