You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2022/03/23 08:00:37 UTC

[GitHub] [camel-quarkus] alexkazan87 edited a comment on issue #3642: Camel can not support kafka transactions.

alexkazan87 edited a comment on issue #3642:
URL: https://github.com/apache/camel-quarkus/issues/3642#issuecomment-1074110332


   Yes, you are right, `_Kafka transaction does not support JTA_`. 
   
   The main problem is that I enabled the kafka transactions with transaction.id property for exactly once and the camel code before it sent any event, It did not initialize it.  This is not related to transacted(). if the camel team can solve this then we can move to other trouble to me.  
   
   `_I assume that you want to do Kafka transaction within transacted() in camel?_` 
   Yes, exactly!
   
    My main goal, in the end, is to synchronise the JTA(Narayana) with the Kafka transaction manager having only one compact transaction lifecycle through transacted(). This is something that I can manage later by injecting an alternative TransactionManager by using the KafkaTransactionManager. This is a unique feature not only in camel but also in quarkus  - kafka generally.
   
   Code:
   
        @ApplicationScoped
        public class MyRoute extends RouteBuilder {
        private static final Queue<String> messageQueue = new LinkedBlockingQueue();
   
       @Inject
       RequiresNewJtaTransactionPolicy requiresNewJtaTransactionPolicy;
   
       @ConsumeEvent(Constants.MONEY_TRANSFER_EVENT)
       public void pushMoneyEvent(String order) {
           messageQueue .add(order);
       }
   
       @Override
       public void configure() {
   
           onException(CamelException.class)
                   .backOffMultiplier(2)
                   .maximumRedeliveries(5)
                   .retryAttemptedLogLevel(LoggingLevel.INFO)
                   .handled(true)
                   .log("After 5 redeliveries send elk ${body}")
                   .process(processor -> {
                               //elk....
                           }
                   )
                   .end();
   
           from("timer:foo?period={{timer.period}}&delay={{timer.delay}}")
                   .routeId("FromTimer2Kafka")
                   .process(exchange -> {
                       if (!messageQueue .isEmpty()) {
                           String pp = messageQueue .poll();
                           exchange.getIn().setHeader("NEW_EVENT", "CREATED");
                           exchange.getIn().setBody(pp);
                       }
                   })
                   .transacted()
                   .policy(requiresNewJtaTransactionPolicy)
                   .filter(header("NEW_EVENT").isEqualTo("CREATED"))
                   .log("Before process! Event : \"${body}\" with headers  \"${headers}\" ")
                   .doTry()
                       .to("kafka:{{kafka.topic.name}}")
                       .log("Message sent to the topic! : \"${body}\" ")
                   .doCatch(Exception.class)
                       .log("Exception : ${exception.message}")
                       .process(exchange -> {
                           Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
                           log.error("Exception:{}", cause.getMessage());
                           throw new CamelException(cause.getMessage());
                       })
                   .end();
       }
   }
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org