You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2022/10/13 09:21:32 UTC

[camel] 01/01: CAMEL-18608: camel-etcd3 - Synchronous commits in aggregation

This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch CAMEL-18608/keep-commit-synchronous
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c58609442d82042d87343d6149b5abfef488e52d
Author: Nicolas Filotto <nf...@talend.com>
AuthorDate: Thu Oct 13 11:21:05 2022 +0200

    CAMEL-18608: camel-etcd3 - Synchronous commits in aggregation
---
 .../etcd3/processor/aggregate/Etcd3AggregationRepository.java     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
index 79e865cecc3..0e80b729862 100644
--- a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
+++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java
@@ -180,7 +180,8 @@ public class Etcd3AggregationRepository extends ServiceSupport
                     .Then(Op.put(ByteSequence
                             .from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder),
                             PutOption.DEFAULT))
-                    .commit();
+                    .commit()
+                    .get();
         } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e.getMessage(), e);
             throw new RuntimeCamelException(e.getMessage(), e);
@@ -409,7 +410,8 @@ public class Etcd3AggregationRepository extends ServiceSupport
                                     Op.put(ByteSequence
                                             .from(String.format("%s/%s", persistencePrefixName, key).getBytes()),
                                             convertToEtcd3Format(removedHolder), PutOption.DEFAULT))
-                            .commit();
+                            .commit()
+                            .get();
                     LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.",
                             exchange.getExchangeId(), key);
                     LOG.trace(
@@ -476,7 +478,7 @@ public class Etcd3AggregationRepository extends ServiceSupport
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected void doStart() {
         if (client == null) {
             client = Client.builder().endpoints(endpoint).build();
             shutdownClient = true;