You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zh...@apache.org on 2023/02/16 12:34:24 UTC

[camel] branch camel-3.18.x updated: CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)

This is an automated email from the ASF dual-hosted git repository.

zhfeng pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.18.x by this push:
     new 752a81883ac CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)
752a81883ac is described below

commit 752a81883acc27fcc0265b87dd650954df87d98d
Author: Zheng Feng <zh...@gmail.com>
AuthorDate: Thu Feb 16 20:30:37 2023 +0800

    CAMEL-19031: Check current saga status is RUNNING before add a step (#9344)
---
 .../apache/camel/processor/SagaTimeoutTest.java    | 50 ++++++++++++++++++++++
 .../apache/camel/saga/InMemorySagaCoordinator.java |  7 +++
 2 files changed, 57 insertions(+)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
index a61d1e3859d..03d0ba6936c 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SagaTimeoutTest.java
@@ -16,15 +16,21 @@
  */
 package org.apache.camel.processor;
 
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.CamelExecutionException;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.model.SagaCompletionMode;
+import org.apache.camel.model.SagaPropagation;
 import org.apache.camel.saga.InMemorySagaService;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
 public class SagaTimeoutTest extends ContextTestSupport {
 
     @Test
@@ -64,6 +70,29 @@ public class SagaTimeoutTest extends ContextTestSupport {
         compensate.assertIsNotSatisfied();
     }
 
+    @Test
+    public void testTimeoutMultiParticipants() throws Exception {
+
+        MockEndpoint compensate = getMockEndpoint("mock:compensate");
+        compensate.expectedMessageCount(1);
+
+        MockEndpoint complete = getMockEndpoint("mock:complete");
+        complete.expectedMessageCount(0);
+
+        MockEndpoint end = getMockEndpoint("mock:end");
+        end.expectedMessageCount(1);
+
+        CamelExecutionException ex = assertThrows(CamelExecutionException.class,
+                () -> {
+                    template.sendBody("direct:saga-multi-participants", "Hello");
+            });
+
+        assertEquals("Cannot begin: status is COMPENSATED", ex.getCause().getMessage());
+        end.assertIsSatisfied();
+        complete.assertIsSatisfied();
+        compensate.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
 
@@ -81,6 +110,27 @@ public class SagaTimeoutTest extends ContextTestSupport {
                         .compensation("mock:compensate").completion("mock:complete")
                         .to("mock:end");
 
+                from("direct:saga-multi-participants")
+                        .process(exchange -> {
+                            exchange.getMessage().setHeader("id", UUID.randomUUID().toString());
+                        })
+                        .saga()
+                        .propagation(SagaPropagation.REQUIRES_NEW)
+                        .to("direct:service1")
+                        .to("direct:service2");
+
+                from("direct:service1")
+                        .saga().option("id", header("id"))
+                        .propagation(SagaPropagation.MANDATORY).timeout(100, TimeUnit.MILLISECONDS)
+                        .compensation("mock:compensate").completion("mock:complete")
+                        .delay(300L)
+                        .to("mock:end");
+
+                from("direct:service2")
+                        .saga().option("id", header("id"))
+                        .propagation(SagaPropagation.MANDATORY).timeout(500, TimeUnit.MILLISECONDS)
+                        .compensation("mock:compensate").completion("mock:complete")
+                        .to("mock:end");
             }
         };
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
index a4adf635a06..1bb6d36bb8b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
+++ b/core/camel-support/src/main/java/org/apache/camel/saga/InMemorySagaCoordinator.java
@@ -75,6 +75,13 @@ public class InMemorySagaCoordinator implements CamelSagaCoordinator {
 
     @Override
     public CompletableFuture<Void> beginStep(Exchange exchange, CamelSagaStep step) {
+        Status status = currentStatus.get();
+        if (status != Status.RUNNING) {
+            CompletableFuture<Void> res = new CompletableFuture<>();
+            res.completeExceptionally(new IllegalStateException("Cannot begin: status is " + status));
+            return res;
+        }
+
         this.steps.add(step);
 
         if (!step.getOptions().isEmpty()) {