You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/10/19 18:02:24 UTC

[camel] branch main updated: Allow accessing Exchange during saga in LRAClient (#11779)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b36c9909ca Allow accessing Exchange during saga in LRAClient (#11779)
1b36c9909ca is described below

commit 1b36c9909ca9dc47651513a9a3aec591d215eba3
Author: Johannes Boßle <jo...@knowis.de>
AuthorDate: Thu Oct 19 20:02:18 2023 +0200

    Allow accessing Exchange during saga in LRAClient (#11779)
    
    * provide context information via exchange to saga artefacts
    
    * provide context information via exchange to saga artefacts
    
    * allow to access exchange in preparing the request
    
    * use static methods from CompleteableFuture
    
    * add note about interface changes due to exchange access in LRAClient
    
    ---------
    
    Co-authored-by: Johannes Boßle <jb...@knowis.de>
---
 .../java/org/apache/camel/service/lra/LRAClient.java   | 18 +++++++++---------
 .../apache/camel/service/lra/LRASagaCoordinator.java   | 10 +++++-----
 .../org/apache/camel/service/lra/LRASagaService.java   | 16 +++++++---------
 .../org/apache/camel/component/saga/SagaProducer.java  |  4 ++--
 .../camel/processor/saga/RequiredSagaProcessor.java    |  2 +-
 .../camel/processor/saga/RequiresNewSagaProcessor.java |  2 +-
 .../org/apache/camel/processor/saga/SagaProcessor.java |  4 ++--
 .../org/apache/camel/saga/CamelSagaCoordinator.java    |  4 ++--
 .../java/org/apache/camel/saga/CamelSagaService.java   |  3 ++-
 .../org/apache/camel/saga/InMemorySagaCoordinator.java |  4 ++--
 .../org/apache/camel/saga/InMemorySagaService.java     |  3 ++-
 .../modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc | 10 ++++++++++
 12 files changed, 45 insertions(+), 35 deletions(-)

diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
index 77833c16264..01dbb2fc6e3 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
@@ -64,8 +64,8 @@ public class LRAClient implements Closeable {
                 .build();
     }
 
-    public CompletableFuture<URL> newLRA() {
-        HttpRequest request = prepareRequest(URI.create(lraUrl + COORDINATOR_PATH_START))
+    public CompletableFuture<URL> newLRA(Exchange exchange) {
+        HttpRequest request = prepareRequest(URI.create(lraUrl + COORDINATOR_PATH_START), exchange)
                 .POST(HttpRequest.BodyPublishers.ofString(""))
                 .build();
 
@@ -94,7 +94,7 @@ public class LRAClient implements Closeable {
         });
     }
 
-    public CompletableFuture<Void> join(final URL lra, LRASagaStep step) {
+    public CompletableFuture<Void> join(final URL lra, LRASagaStep step, Exchange exchange) {
         return CompletableFuture.supplyAsync(() -> {
             LRAUrlBuilder participantBaseUrl = new LRAUrlBuilder()
                     .host(sagaService.getLocalParticipantUrl())
@@ -115,7 +115,7 @@ public class LRAClient implements Closeable {
             if (step.getTimeoutInMilliseconds().isPresent()) {
                 lraEndpoint = lraEndpoint + "?" + HEADER_TIME_LIMIT + "=" + step.getTimeoutInMilliseconds().get();
             }
-            HttpRequest request = prepareRequest(URI.create(lraEndpoint))
+            HttpRequest request = prepareRequest(URI.create(lraEndpoint), exchange)
                     .setHeader(HEADER_LINK, link.toString())
                     .setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, lra.toString())
                     .setHeader("Content-Type", "text/plain")
@@ -134,8 +134,8 @@ public class LRAClient implements Closeable {
                 });
     }
 
-    public CompletableFuture<Void> complete(URL lra) {
-        HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CLOSE))
+    public CompletableFuture<Void> complete(URL lra, Exchange exchange) {
+        HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CLOSE), exchange)
                 .setHeader("Content-Type", "text/plain")
                 .PUT(HttpRequest.BodyPublishers.ofString(""))
                 .build();
@@ -151,8 +151,8 @@ public class LRAClient implements Closeable {
         });
     }
 
-    public CompletableFuture<Void> compensate(URL lra) {
-        HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CANCEL))
+    public CompletableFuture<Void> compensate(URL lra, Exchange exchange) {
+        HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CANCEL), exchange)
                 .setHeader("Content-Type", "text/plain")
                 .PUT(HttpRequest.BodyPublishers.ofString(""))
                 .build();
@@ -168,7 +168,7 @@ public class LRAClient implements Closeable {
         });
     }
 
-    protected HttpRequest.Builder prepareRequest(URI uri) {
+    protected HttpRequest.Builder prepareRequest(URI uri, Exchange exchange) {
         return HttpRequest.newBuilder().uri(uri);
     }
 
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
index 58c4063370d..67de593cd28 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
@@ -45,17 +45,17 @@ public class LRASagaCoordinator implements CamelSagaCoordinator {
                 throw ex;
             });
         }
-        return sagaService.getClient().join(this.lraURL, sagaStep);
+        return sagaService.getClient().join(this.lraURL, sagaStep, exchange);
     }
 
     @Override
-    public CompletableFuture<Void> compensate() {
-        return sagaService.getClient().compensate(this.lraURL);
+    public CompletableFuture<Void> compensate(Exchange exchange) {
+        return sagaService.getClient().compensate(this.lraURL, exchange);
     }
 
     @Override
-    public CompletableFuture<Void> complete() {
-        return sagaService.getClient().complete(this.lraURL);
+    public CompletableFuture<Void> complete(Exchange exchange) {
+        return sagaService.getClient().complete(this.lraURL, exchange);
     }
 
     @Override
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
index 9e66c82a75a..67bfdf64ffd 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
@@ -22,10 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.saga.CamelSagaCoordinator;
@@ -64,18 +61,19 @@ public class LRASagaService extends ServiceSupport implements StaticService, Cam
     }
 
     @Override
-    public CompletableFuture<CamelSagaCoordinator> newSaga() {
-        return client.newLRA()
+    public CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange) {
+        return client.newLRA(exchange)
                 .thenApply(url -> new LRASagaCoordinator(LRASagaService.this, url));
     }
 
+
     @Override
     public CompletableFuture<CamelSagaCoordinator> getSaga(String id) {
-        CompletableFuture<CamelSagaCoordinator> coordinator = new CompletableFuture<>();
+        CompletableFuture<CamelSagaCoordinator> coordinator;
         try {
-            coordinator.complete(new LRASagaCoordinator(this, new URL(id)));
+            coordinator = CompletableFuture.completedFuture(new LRASagaCoordinator(this, new URL(id)));
         } catch (Exception ex) {
-            coordinator.completeExceptionally(ex);
+            coordinator = CompletableFuture.failedFuture(ex);
         }
         return coordinator;
     }
diff --git a/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java b/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
index 81e29360d50..d03ef49ee5d 100644
--- a/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
+++ b/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
@@ -62,9 +62,9 @@ public class SagaProducer extends DefaultAsyncProducer {
             return coordinator;
         }).thenCompose(coordinator -> {
             if (success) {
-                return coordinator.complete();
+                return coordinator.complete(exchange);
             } else {
-                return coordinator.compensate();
+                return coordinator.compensate(exchange);
             }
         }).whenComplete((res, ex) -> {
             if (ex != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
index bb4efc07609..091cc690b81 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
@@ -46,7 +46,7 @@ public class RequiredSagaProcessor extends SagaProcessor {
                         coordinatorFuture = CompletableFuture.completedFuture(existingCoordinator);
                         inheritedCoordinator = true;
                     } else {
-                        coordinatorFuture = sagaService.newSaga();
+                        coordinatorFuture = sagaService.newSaga(exchange);
                         inheritedCoordinator = false;
                     }
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
index 2ac9de348ca..54f268df7ad 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
@@ -36,7 +36,7 @@ public class RequiresNewSagaProcessor extends SagaProcessor {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         getCurrentSagaCoordinator(exchange).whenComplete((existingCoordinator, ex) -> ifNotException(ex, exchange, callback,
-                () -> sagaService.newSaga().whenComplete((newCoordinator, ex2) -> ifNotException(ex2, exchange, true,
+                () -> sagaService.newSaga(exchange).whenComplete((newCoordinator, ex2) -> ifNotException(ex2, exchange, true,
                         newCoordinator, existingCoordinator, callback, () -> {
                             setCurrentSagaCoordinator(exchange, newCoordinator);
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
index d7108dd95cc..277299bcc72 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
@@ -77,7 +77,7 @@ public abstract class SagaProcessor extends DelegateAsyncProcessor implements Tr
         if (this.completionMode == SagaCompletionMode.AUTO) {
             if (exchange.getException() != null) {
                 if (coordinator != null) {
-                    coordinator.compensate().whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
+                    coordinator.compensate(exchange).whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
                         setCurrentSagaCoordinator(exchange, previousCoordinator);
                         callback.done(false);
                     }));
@@ -86,7 +86,7 @@ public abstract class SagaProcessor extends DelegateAsyncProcessor implements Tr
                     callback.done(false);
                 }
             } else {
-                coordinator.complete().whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
+                coordinator.complete(exchange).whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
                     setCurrentSagaCoordinator(exchange, previousCoordinator);
                     callback.done(false);
                 }));
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
index a54131a225a..fe26f489f3a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
@@ -29,8 +29,8 @@ public interface CamelSagaCoordinator extends HasId {
 
     CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step);
 
-    CompletableFuture<Void> compensate();
+    CompletableFuture<Void> compensate(Exchange exchange);
 
-    CompletableFuture<Void> complete();
+    CompletableFuture<Void> complete(Exchange exchange);
 
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
index a7367a42600..e37201da7c5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
@@ -19,6 +19,7 @@ package org.apache.camel.saga;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
 import org.apache.camel.Service;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.camel.Service;
  */
 public interface CamelSagaService extends Service, CamelContextAware {
 
-    CompletableFuture<CamelSagaCoordinator> newSaga();
+    CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange);
 
     CompletableFuture<CamelSagaCoordinator> getSaga(String id);
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index b5f18e879be..472d21ad975 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -112,7 +112,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
     }
 
     @Override
-    public CompletableFuture<Void> compensate() {
+    public CompletableFuture<Void> compensate(Exchange exchange) {
         boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING);
 
         if (doAction) {
@@ -130,7 +130,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
     }
 
     @Override
-    public CompletableFuture<Void> complete() {
+    public CompletableFuture<Void> complete(Exchange exchange) {
         boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPLETING);
 
         if (doAction) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
index f1ae58a72c1..6b826df3716 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 
@@ -45,7 +46,7 @@ public class InMemorySagaService extends ServiceSupport implements CamelSagaServ
     private long retryDelayInMilliseconds = DEFAULT_RETRY_DELAY_IN_MILLISECONDS;
 
     @Override
-    public CompletableFuture<CamelSagaCoordinator> newSaga() {
+    public CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange) {
         ObjectHelper.notNull(camelContext, "camelContext");
 
         String uuid = camelContext.getUuidGenerator().generateUuid();
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
index 4e202e976fd..6338c0acf13 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
@@ -13,3 +13,13 @@ in case of mis-configuration or invalid hostname.
 
 This can be disabled by setting `preValidateHostAndPort=false`, which will postpone validation
 to consumer is started, and will instead re-connect endlessly (5 sec delay by default) until success.
+
+=== camel-saga, camel-lra
+
+The `org.apache.camel.service.lra.LRAClient` can now access `Exchange` to retrieve further context information. Therefore, there are following changes in interface methods
+- `org.apache.camel.saga.CamelSagaService.compensate()` changed to `org.apache.camel.saga.CamelSagaService.compensate(Exchange exchange)`
+- `org.apache.camel.saga.CamelSagaService.complete()` changed to `org.apache.camel.saga.CamelSagaService.complete(Exchange exchange)`
+- `org.apache.camel.saga.CamelSagaCoordinator.newSaga` is now `org.apache.camel.saga.CamelSagaCoordinator.newSaga(Exchange exchange)`
+to support the transport of `Exchange`.
+
+As result of interface changes also the known implementation classes and usages have been adopted.