You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/15 06:24:19 UTC

[camel] branch master updated: CAMEL-16208: camel-vertx-kafka - Optimize a little bit

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 89575e5  CAMEL-16208: camel-vertx-kafka - Optimize a little bit
89575e5 is described below

commit 89575e552d6f7a11786191f543a56aac557717cb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Feb 15 07:23:27 2021 +0100

    CAMEL-16208: camel-vertx-kafka - Optimize a little bit
---
 .../component/vertx/kafka/VertxKafkaConsumer.java  | 42 ++++++++++------------
 1 file changed, 19 insertions(+), 23 deletions(-)

diff --git a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
index a857e44..ae99290 100644
--- a/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
+++ b/components/camel-vertx-kafka/camel-vertx-kafka-component/src/main/java/org/apache/camel/component/vertx/kafka/VertxKafkaConsumer.java
@@ -29,6 +29,7 @@ import org.apache.camel.component.vertx.kafka.configuration.VertxKafkaConfigurat
 import org.apache.camel.component.vertx.kafka.operations.VertxKafkaConsumerOperations;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +37,7 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
 
     private static final Logger LOG = LoggerFactory.getLogger(VertxKafkaConsumer.class);
 
+    private Synchronization onCompletion;
     private KafkaConsumer<Object, Object> kafkaConsumer;
 
     public VertxKafkaConsumer(final VertxKafkaEndpoint endpoint, final Processor processor) {
@@ -43,6 +45,12 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
     }
 
     @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+        this.onCompletion = new ConsumerOnCompletion();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
 
@@ -105,33 +113,13 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
         propagatedHeaders.forEach((key, value) -> exchange.getIn().setHeader(key, value));
 
         // add exchange callback
-        exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
-            @Override
-            public void onComplete(Exchange exchange) {
-                // at the moment we don't commit the offsets manually, we can add it in the future
-            }
-
-            @Override
-            public void onFailure(Exchange exchange) {
-                // we do nothing here
-                processRollback(exchange);
-            }
-        });
+        exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
         // send message to next processor in the route
         getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange [{}] done.", exchange));
     }
 
     private void onErrorListener(final Throwable error) {
-        final Exchange exchange = getEndpoint().createExchange();
-
-        // set the thrown exception
-        exchange.setException(error);
-
-        // log exception if an exception occurred and was not handled
-        if (exchange.getException() != null) {
-            getExceptionHandler().handleException("Error processing exchange", exchange,
-                    exchange.getException());
-        }
+        getExceptionHandler().handleException("Error from Kafka consumer.", error);
     }
 
     /**
@@ -139,10 +127,18 @@ public class VertxKafkaConsumer extends DefaultConsumer implements Suspendable {
      *
      * @param exchange the exchange
      */
-    private void processRollback(Exchange exchange) {
+    protected void processRollback(Exchange exchange) {
         final Exception cause = exchange.getException();
         if (cause != null) {
             getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
         }
     }
+
+    private class ConsumerOnCompletion extends SynchronizationAdapter {
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            processRollback(exchange);
+        }
+    }
 }