You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2023/02/16 12:32:54 UTC
[camel] branch camel-3.x updated: CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)
This is an automated email from the ASF dual-hosted git repository.
zhfeng pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new f8a5e74ecd8 CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)
f8a5e74ecd8 is described below
commit f8a5e74ecd80921d6d2cd0bab5e06884111427b7
Author: Zheng Feng <zh...@gmail.com>
AuthorDate: Thu Feb 16 20:30:37 2023 +0800
CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)
---
.../apache/camel/processor/SagaTimeoutTest.java | 50 ++++++++++++++++++++++
.../apache/camel/saga/InMemorySagaCoordinator.java | 7 +++
2 files changed, 57 insertions(+)
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
index a61d1e3859d..03d0ba6936c 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
@@ -16,15 +16,21 @@
*/
package org.apache.camel.processor;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.SagaCompletionMode;
+import org.apache.camel.model.SagaPropagation;
import org.apache.camel.saga.InMemorySagaService;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
public class SagaTimeoutTest extends ContextTestSupport {
@Test
@@ -64,6 +70,29 @@ public class SagaTimeoutTest extends ContextTestSupport {
compensate.assertIsNotSatisfied();
}
+ @Test
+ public void testTimeoutMultiParticipants() throws Exception {
+
+ MockEndpoint compensate = getMockEndpoint("mock:compensate");
+ compensate.expectedMessageCount(1);
+
+ MockEndpoint complete = getMockEndpoint("mock:complete");
+ complete.expectedMessageCount(0);
+
+ MockEndpoint end = getMockEndpoint("mock:end");
+ end.expectedMessageCount(1);
+
+ CamelExecutionException ex = assertThrows(CamelExecutionException.class,
+ () -> {
+ template.sendBody("direct:saga-multi-participants", "Hello");
+ });
+
+ assertEquals("Cannot begin: status is COMPENSATED", ex.getCause().getMessage());
+ end.assertIsSatisfied();
+ complete.assertIsSatisfied();
+ compensate.assertIsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -81,6 +110,27 @@ public class SagaTimeoutTest extends ContextTestSupport {
.compensation("mock:compensate").completion("mock:complete")
.to("mock:end");
+ from("direct:saga-multi-participants")
+ .process(exchange -> {
+ exchange.getMessage().setHeader("id", UUID.randomUUID().toString());
+ })
+ .saga()
+ .propagation(SagaPropagation.REQUIRES_NEW)
+ .to("direct:service1")
+ .to("direct:service2");
+
+ from("direct:service1")
+ .saga().option("id", header("id"))
+ .propagation(SagaPropagation.MANDATORY).timeout(100, TimeUnit.MILLISECONDS)
+ .compensation("mock:compensate").completion("mock:complete")
+ .delay(300L)
+ .to("mock:end");
+
+ from("direct:service2")
+ .saga().option("id", header("id"))
+ .propagation(SagaPropagation.MANDATORY).timeout(500, TimeUnit.MILLISECONDS)
+ .compensation("mock:compensate").completion("mock:complete")
+ .to("mock:end");
}
};
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index a4adf635a06..1bb6d36bb8b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -75,6 +75,13 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
@Override
public CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step) {
+ Status status = currentStatus.get();
+ if (status != Status.RUNNING) {
+ CompletableFuture<Void> res = new CompletableFuture<>();
+ res.completeExceptionally(new IllegalStateException("Cannot begin: status is " + status));
+ return res;
+ }
+
this.steps.add(step);
if (!step.getOptions().isEmpty()) {