You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/13 10:31:10 UTC

[camel] branch main updated: CAMEL-18608: camel-etcd3 - Synchronous commits in aggregation (#8537)

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 33be3357356 CAMEL-18608: camel-etcd3 - Synchronous commits in aggregation (#8537)
33be3357356 is described below

commit 33be3357356e9abf99b06ab413ee88e4ec1c24f4
Author: Nicolas Filotto <es...@users.noreply.github.com>
AuthorDate: Thu Oct 13 12:31:03 2022 +0200

    CAMEL-18608: camel-etcd3 - Synchronous commits in aggregation (#8537)
    
    ## Motivation
    
    The test `org.apache.camel.component.etcd3.AggregateEtcd3ManualTest.testABC` regularly fails on the Jenkins build so it needs to be fixed.
    
    ## Modifications:
    
    * Ensure that the commit of a transaction is made synchronously to prevent aggregation miss
---
 .../etcd3/processor/aggregate/Etcd3AggregationRepository.java     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
index 79e865cecc3..0e80b729862 100644
--- a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
+++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
@@ -180,7 +180,8 @@ public class Etcd3AggregationRepository extends ServiceSupport
                     .Then(Op.put(ByteSequence
                             .from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder),
                             PutOption.DEFAULT))
-                    .commit();
+                    .commit()
+                    .get();
         } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e.getMessage(), e);
             throw new RuntimeCamelException(e.getMessage(), e);
@@ -409,7 +410,8 @@ public class Etcd3AggregationRepository extends ServiceSupport
                                     Op.put(ByteSequence
                                             .from(String.format("%s/%s", persistencePrefixName, key).getBytes()),
                                             convertToEtcd3Format(removedHolder), PutOption.DEFAULT))
-                            .commit();
+                            .commit()
+                            .get();
                     LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.",
                             exchange.getExchangeId(), key);
                     LOG.trace(
@@ -476,7 +478,7 @@ public class Etcd3AggregationRepository extends ServiceSupport
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected void doStart() {
         if (client == null) {
             client = Client.builder().endpoints(endpoint).build();
             shutdownClient = true;