You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/10/19 18:02:24 UTC
[camel] branch main updated: Allow accessing Exchange during saga in LRAClient (#11779)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1b36c9909ca Allow accessing Exchange during saga in LRAClient (#11779)
1b36c9909ca is described below
commit 1b36c9909ca9dc47651513a9a3aec591d215eba3
Author: Johannes Boßle <jo...@knowis.de>
AuthorDate: Thu Oct 19 20:02:18 2023 +0200
Allow accessing Exchange during saga in LRAClient (#11779)
* provide context information via exchange to saga artefacts
* provide context information via exchange to saga artefacts
* allow to access exchange in preparing the request
* use static methods from CompleteableFuture
* add note about interface changes due to exchange access in LRAClient
---------
Co-authored-by: Johannes Boßle <jb...@knowis.de>
---
.../java/org/apache/camel/service/lra/LRAClient.java | 18 +++++++++---------
.../apache/camel/service/lra/LRASagaCoordinator.java | 10 +++++-----
.../org/apache/camel/service/lra/LRASagaService.java | 16 +++++++---------
.../org/apache/camel/component/saga/SagaProducer.java | 4 ++--
.../camel/processor/saga/RequiredSagaProcessor.java | 2 +-
.../camel/processor/saga/RequiresNewSagaProcessor.java | 2 +-
.../org/apache/camel/processor/saga/SagaProcessor.java | 4 ++--
.../org/apache/camel/saga/CamelSagaCoordinator.java | 4 ++--
.../java/org/apache/camel/saga/CamelSagaService.java | 3 ++-
.../org/apache/camel/saga/InMemorySagaCoordinator.java | 4 ++--
.../org/apache/camel/saga/InMemorySagaService.java | 3 ++-
.../modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc | 10 ++++++++++
12 files changed, 45 insertions(+), 35 deletions(-)
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
index 77833c16264..01dbb2fc6e3 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRAClient.java
@@ -64,8 +64,8 @@ public class LRAClient implements Closeable {
.build();
}
- public CompletableFuture<URL> newLRA() {
- HttpRequest request = prepareRequest(URI.create(lraUrl + COORDINATOR_PATH_START))
+ public CompletableFuture<URL> newLRA(Exchange exchange) {
+ HttpRequest request = prepareRequest(URI.create(lraUrl + COORDINATOR_PATH_START), exchange)
.POST(HttpRequest.BodyPublishers.ofString(""))
.build();
@@ -94,7 +94,7 @@ public class LRAClient implements Closeable {
});
}
- public CompletableFuture<Void> join(final URL lra, LRASagaStep step) {
+ public CompletableFuture<Void> join(final URL lra, LRASagaStep step, Exchange exchange) {
return CompletableFuture.supplyAsync(() -> {
LRAUrlBuilder participantBaseUrl = new LRAUrlBuilder()
.host(sagaService.getLocalParticipantUrl())
@@ -115,7 +115,7 @@ public class LRAClient implements Closeable {
if (step.getTimeoutInMilliseconds().isPresent()) {
lraEndpoint = lraEndpoint + "?" + HEADER_TIME_LIMIT + "=" + step.getTimeoutInMilliseconds().get();
}
- HttpRequest request = prepareRequest(URI.create(lraEndpoint))
+ HttpRequest request = prepareRequest(URI.create(lraEndpoint), exchange)
.setHeader(HEADER_LINK, link.toString())
.setHeader(Exchange.SAGA_LONG_RUNNING_ACTION, lra.toString())
.setHeader("Content-Type", "text/plain")
@@ -134,8 +134,8 @@ public class LRAClient implements Closeable {
});
}
- public CompletableFuture<Void> complete(URL lra) {
- HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CLOSE))
+ public CompletableFuture<Void> complete(URL lra, Exchange exchange) {
+ HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CLOSE), exchange)
.setHeader("Content-Type", "text/plain")
.PUT(HttpRequest.BodyPublishers.ofString(""))
.build();
@@ -151,8 +151,8 @@ public class LRAClient implements Closeable {
});
}
- public CompletableFuture<Void> compensate(URL lra) {
- HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CANCEL))
+ public CompletableFuture<Void> compensate(URL lra, Exchange exchange) {
+ HttpRequest request = prepareRequest(URI.create(lra.toString() + COORDINATOR_PATH_CANCEL), exchange)
.setHeader("Content-Type", "text/plain")
.PUT(HttpRequest.BodyPublishers.ofString(""))
.build();
@@ -168,7 +168,7 @@ public class LRAClient implements Closeable {
});
}
- protected HttpRequest.Builder prepareRequest(URI uri) {
+ protected HttpRequest.Builder prepareRequest(URI uri, Exchange exchange) {
return HttpRequest.newBuilder().uri(uri);
}
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
index 58c4063370d..67de593cd28 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaCoordinator.java
@@ -45,17 +45,17 @@ public class LRASagaCoordinator implements CamelSagaCoordinator {
throw ex;
});
}
- return sagaService.getClient().join(this.lraURL, sagaStep);
+ return sagaService.getClient().join(this.lraURL, sagaStep, exchange);
}
@Override
- public CompletableFuture<Void> compensate() {
- return sagaService.getClient().compensate(this.lraURL);
+ public CompletableFuture<Void> compensate(Exchange exchange) {
+ return sagaService.getClient().compensate(this.lraURL, exchange);
}
@Override
- public CompletableFuture<Void> complete() {
- return sagaService.getClient().complete(this.lraURL);
+ public CompletableFuture<Void> complete(Exchange exchange) {
+ return sagaService.getClient().complete(this.lraURL, exchange);
}
@Override
diff --git a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
index 9e66c82a75a..67bfdf64ffd 100644
--- a/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
+++ b/components/camel-lra/src/main/java/org/apache/camel/service/lra/LRASagaService.java
@@ -22,10 +22,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.saga.CamelSagaCoordinator;
@@ -64,18 +61,19 @@ public class LRASagaService extends ServiceSupport implements StaticService, Cam
}
@Override
- public CompletableFuture<CamelSagaCoordinator> newSaga() {
- return client.newLRA()
+ public CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange) {
+ return client.newLRA(exchange)
.thenApply(url -> new LRASagaCoordinator(LRASagaService.this, url));
}
+
@Override
public CompletableFuture<CamelSagaCoordinator> getSaga(String id) {
- CompletableFuture<CamelSagaCoordinator> coordinator = new CompletableFuture<>();
+ CompletableFuture<CamelSagaCoordinator> coordinator;
try {
- coordinator.complete(new LRASagaCoordinator(this, new URL(id)));
+ coordinator = CompletableFuture.completedFuture(new LRASagaCoordinator(this, new URL(id)));
} catch (Exception ex) {
- coordinator.completeExceptionally(ex);
+ coordinator = CompletableFuture.failedFuture(ex);
}
return coordinator;
}
diff --git a/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java b/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
index 81e29360d50..d03ef49ee5d 100644
--- a/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
+++ b/components/camel-saga/src/main/java/org/apache/camel/component/saga/SagaProducer.java
@@ -62,9 +62,9 @@ public class SagaProducer extends DefaultAsyncProducer {
return coordinator;
}).thenCompose(coordinator -> {
if (success) {
- return coordinator.complete();
+ return coordinator.complete(exchange);
} else {
- return coordinator.compensate();
+ return coordinator.compensate(exchange);
}
}).whenComplete((res, ex) -> {
if (ex != null) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
index bb4efc07609..091cc690b81 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiredSagaProcessor.java
@@ -46,7 +46,7 @@ public class RequiredSagaProcessor extends SagaProcessor {
coordinatorFuture = CompletableFuture.completedFuture(existingCoordinator);
inheritedCoordinator = true;
} else {
- coordinatorFuture = sagaService.newSaga();
+ coordinatorFuture = sagaService.newSaga(exchange);
inheritedCoordinator = false;
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
index 2ac9de348ca..54f268df7ad 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/RequiresNewSagaProcessor.java
@@ -36,7 +36,7 @@ public class RequiresNewSagaProcessor extends SagaProcessor {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
getCurrentSagaCoordinator(exchange).whenComplete((existingCoordinator, ex) -> ifNotException(ex, exchange, callback,
- () -> sagaService.newSaga().whenComplete((newCoordinator, ex2) -> ifNotException(ex2, exchange, true,
+ () -> sagaService.newSaga(exchange).whenComplete((newCoordinator, ex2) -> ifNotException(ex2, exchange, true,
newCoordinator, existingCoordinator, callback, () -> {
setCurrentSagaCoordinator(exchange, newCoordinator);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
index d7108dd95cc..277299bcc72 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/saga/SagaProcessor.java
@@ -77,7 +77,7 @@ public abstract class SagaProcessor extends DelegateAsyncProcessor implements Tr
if (this.completionMode == SagaCompletionMode.AUTO) {
if (exchange.getException() != null) {
if (coordinator != null) {
- coordinator.compensate().whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
+ coordinator.compensate(exchange).whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
setCurrentSagaCoordinator(exchange, previousCoordinator);
callback.done(false);
}));
@@ -86,7 +86,7 @@ public abstract class SagaProcessor extends DelegateAsyncProcessor implements Tr
callback.done(false);
}
} else {
- coordinator.complete().whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
+ coordinator.complete(exchange).whenComplete((done, ex) -> ifNotException(ex, exchange, callback, () -> {
setCurrentSagaCoordinator(exchange, previousCoordinator);
callback.done(false);
}));
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
index a54131a225a..fe26f489f3a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaCoordinator.java
@@ -29,8 +29,8 @@ public interface CamelSagaCoordinator extends HasId {
CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step);
- CompletableFuture<Void> compensate();
+ CompletableFuture<Void> compensate(Exchange exchange);
- CompletableFuture<Void> complete();
+ CompletableFuture<Void> complete(Exchange exchange);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
index a7367a42600..e37201da7c5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/CamelSagaService.java
@@ -19,6 +19,7 @@ package org.apache.camel.saga;
import java.util.concurrent.CompletableFuture;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
import org.apache.camel.Service;
/**
@@ -26,7 +27,7 @@ import org.apache.camel.Service;
*/
public interface CamelSagaService extends Service, CamelContextAware {
- CompletableFuture<CamelSagaCoordinator> newSaga();
+ CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange);
CompletableFuture<CamelSagaCoordinator> getSaga(String id);
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index b5f18e879be..472d21ad975 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -112,7 +112,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
}
@Override
- public CompletableFuture<Void> compensate() {
+ public CompletableFuture<Void> compensate(Exchange exchange) {
boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPENSATING);
if (doAction) {
@@ -130,7 +130,7 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
}
@Override
- public CompletableFuture<Void> complete() {
+ public CompletableFuture<Void> complete(Exchange exchange) {
boolean doAction = currentStatus.compareAndSet(Status.RUNNING, Status.COMPLETING);
if (doAction) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
index f1ae58a72c1..6b826df3716 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaService.java
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -45,7 +46,7 @@ public class InMemorySagaService extends ServiceSupport implements CamelSagaServ
private long retryDelayInMilliseconds = DEFAULT_RETRY_DELAY_IN_MILLISECONDS;
@Override
- public CompletableFuture<CamelSagaCoordinator> newSaga() {
+ public CompletableFuture<CamelSagaCoordinator> newSaga(Exchange exchange) {
ObjectHelper.notNull(camelContext, "camelContext");
String uuid = camelContext.getUuidGenerator().generateUuid();
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
index 4e202e976fd..6338c0acf13 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_2.adoc
@@ -13,3 +13,13 @@ in case of mis-configuration or invalid hostname.
This can be disabled by setting `preValidateHostAndPort=false`, which will postpone validation
to consumer is started, and will instead re-connect endlessly (5 sec delay by default) until success.
+
+=== camel-saga, camel-lra
+
+The `org.apache.camel.service.lra.LRAClient` can now access `Exchange` to retrieve further context information. Therefore, there are following changes in interface methods
+- `org.apache.camel.saga.CamelSagaService.compensate()` changed to `org.apache.camel.saga.CamelSagaService.compensate(Exchange exchange)`
+- `org.apache.camel.saga.CamelSagaService.complete()` changed to `org.apache.camel.saga.CamelSagaService.complete(Exchange exchange)`
+- `org.apache.camel.saga.CamelSagaCoordinator.newSaga` is now `org.apache.camel.saga.CamelSagaCoordinator.newSaga(Exchange exchange)`
+to support the transport of `Exchange`.
+
+As result of interface changes also the known implementation classes and usages have been adopted.