You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/12/22 05:18:50 UTC

[camel] branch master updated: CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d3a0d  CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811)
f5d3a0d is described below

commit f5d3a0de679be01cd3120c1ee17cbf1d5284f9e4
Author: mschnitzler <sc...@gmail.com>
AuthorDate: Tue Dec 22 06:15:31 2020 +0100

    CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811)
    
    * CAMEL-15578: add option "breakOnShutdown" to Loop EIP
    
    Setting the "breakOnShutdown" option on the Loop EIP
    allows to break out of the loop earlier if the context
    is shutting down.
    
    * CAMEL-15578: add missing license header
    
    * CAMEL-15578: add option "breakOnShutdown" to Loop EIP
---
 .../org/apache/camel/catalog/models/loop.json      |  1 +
 .../apache/camel/catalog/schemas/camel-spring.xsd  |  8 +++
 .../resources/org/apache/camel/model/loop.json     |  1 +
 .../org/apache/camel/model/LoopDefinition.java     | 22 +++++++
 .../org/apache/camel/processor/LoopProcessor.java  | 34 ++++++++--
 .../java/org/apache/camel/reifier/LoopReifier.java |  3 +-
 .../camel/processor/LoopBreakOnShutdownTest.java   | 75 ++++++++++++++++++++++
 .../camel/processor/LoopNoBreakOnShutdownTest.java | 72 +++++++++++++++++++++
 .../java/org/apache/camel/xml/in/ModelParser.java  |  1 +
 9 files changed, 212 insertions(+), 5 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
index 9f23152..82adb12 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
@@ -14,6 +14,7 @@
     "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...]
     "copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." },
     "doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." },
+    "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 0c5d4e3..0530ef7 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -5586,6 +5586,14 @@ null. Default value: false
             ]]></xs:documentation>
           </xs:annotation>
         </xs:attribute>
+        <xs:attribute name="breakOnShutdown" type="xs:string">
+          <xs:annotation>
+            <xs:documentation xml:lang="en"><![CDATA[
+If the breakOnShutdown attribute is true, then the loop will not iterate until
+it reaches the end when Camel is shut down. Default value: false
+            ]]></xs:documentation>
+          </xs:annotation>
+        </xs:attribute>
       </xs:extension>
     </xs:complexContent>
   </xs:complexType>
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
index 9f23152..82adb12 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
@@ -14,6 +14,7 @@
     "expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...]
     "copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." },
     "doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." },
+    "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
index 7ec080d..02e62c5 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
@@ -40,6 +40,9 @@ public class LoopDefinition extends OutputExpressionNode {
     @XmlAttribute
     @Metadata(javaType = "java.lang.Boolean")
     private String doWhile;
+    @XmlAttribute
+    @Metadata(javaType = "java.lang.Boolean")
+    private String breakOnShutdown;
 
     public LoopDefinition() {
     }
@@ -92,6 +95,25 @@ public class LoopDefinition extends OutputExpressionNode {
         this.copy = copy;
     }
 
+    public LoopDefinition breakOnShutdown() {
+        setBreakOnShutdown(Boolean.toString(true));
+        return this;
+    }
+
+    /**
+     * If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is
+     * shut down.
+     *
+     * @param breakOnShutdown a Boolean-parsable String
+     */
+    public void setBreakOnShutdown(String breakOnShutdown) {
+        this.breakOnShutdown = breakOnShutdown;
+    }
+
+    public String getBreakOnShutdown() {
+        return breakOnShutdown;
+    }
+
     @Override
     public String toString() {
         return "Loop[" + getExpression() + " -> " + getOutputs() + "]";
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 834dc8a..1a5160d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -24,10 +24,12 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
 import org.slf4j.Logger;
@@ -38,32 +40,36 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
 /**
  * The processor which sends messages in a loop.
  */
-public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
+public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware, ShutdownAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
 
     private String id;
     private String routeId;
+    private LoopState state;
+    private boolean shutdownPending;
     private final CamelContext camelContext;
     private final ReactiveExecutor reactiveExecutor;
     private final Expression expression;
     private final Predicate predicate;
     private final boolean copy;
+    private final boolean breakOnShutdown;
 
     public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate,
-                         boolean copy) {
+                         boolean copy, boolean breakOnShutdown) {
         super(processor);
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.expression = expression;
         this.predicate = predicate;
         this.copy = copy;
+        this.breakOnShutdown = breakOnShutdown;
     }
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            LoopState state = new LoopState(exchange, callback);
+            state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
                 reactiveExecutor.scheduleSync(state);
@@ -78,6 +84,21 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
         }
     }
 
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        return !breakOnShutdown;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return state.getPendingSize();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        shutdownPending = true;
+    }
+
     /**
      * Class holding state for loop processing
      */
@@ -111,9 +132,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                 boolean cont = continueProcessing(current, "so breaking out of loop", LOG);
                 boolean doWhile = predicate == null || predicate.matches(current);
                 boolean doLoop = expression == null || index < count;
+                boolean isStopping = shutdownPending && breakOnShutdown;
 
                 // iterate
-                if (cont && doWhile && doLoop) {
+                if (cont && doWhile && doLoop && !isStopping) {
                     // and prepare for next iteration
                     current = prepareExchange(exchange, index);
 
@@ -143,6 +165,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             }
         }
 
+        public int getPendingSize() {
+            return Math.max(count - index, 0);
+        }
+
         @Override
         public String toString() {
             return "LoopState";
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
index 219d658..bcdc731 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
@@ -35,6 +35,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
         Processor output = this.createChildProcessor(true);
         boolean isCopy = parseBoolean(definition.getCopy(), false);
         boolean isWhile = parseBoolean(definition.getDoWhile(), false);
+        boolean isBreakOnShutdown = parseBoolean(definition.getBreakOnShutdown(), false);
 
         Predicate predicate = null;
         Expression expression = null;
@@ -43,7 +44,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
         } else {
             expression = createExpression(definition.getExpression());
         }
-        return new LoopProcessor(camelContext, output, expression, predicate, isCopy);
+        return new LoopProcessor(camelContext, output, expression, predicate, isCopy, isBreakOnShutdown);
     }
 
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java
new file mode 100644
index 0000000..5127a7d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+class LoopBreakOnShutdownTest extends ContextTestSupport {
+
+    private static final int LOOP_COUNT = 100;
+
+    @Test
+    void testLoopBreakOnShutdown() {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        CompletableFuture<Object> future = template.asyncSendBody("seda:foo", 0);
+        await().atMost(1, SECONDS).until(future::isDone);
+
+        context.stop();
+
+        int received = mock.getReceivedCounter();
+        assertThat(received, is(lessThan(LOOP_COUNT)));
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("seda:foo")
+                        .startupOrder(1)
+                        .loop(LOOP_COUNT).breakOnShutdown().delay(50)
+                        .to("seda:bar");
+
+                from("seda:bar")
+                        .startupOrder(2)
+                        .shutdownRoute(ShutdownRoute.Defer)
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java
new file mode 100644
index 0000000..90ed28b
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+class LoopNoBreakOnShutdownTest extends ContextTestSupport {
+
+    private static final int LOOP_COUNT = 100;
+
+    @Test
+    void testLoopNoBreakOnShutdown() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMinimumMessageCount(LOOP_COUNT);
+
+        CompletableFuture<Object> future = template.asyncSendBody("seda:foo", "foo");
+        await().atMost(1, SECONDS).until(future::isDone);
+
+        context.stop();
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            public void configure() {
+
+                from("seda:foo")
+                        .startupOrder(1)
+                        .loop(LOOP_COUNT).delay(50)
+                        .to("seda:bar");
+
+                from("seda:bar")
+                        .startupOrder(2)
+                        .shutdownRoute(ShutdownRoute.Defer)
+                        .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index fca47d5..7424b51 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -573,6 +573,7 @@ public class ModelParser extends BaseParser {
     protected LoopDefinition doParseLoopDefinition() throws IOException, XmlPullParserException {
         return doParse(new LoopDefinition(), (def, key, val) -> {
             switch (key) {
+                case "breakOnShutdown": def.setBreakOnShutdown(val); break;
                 case "copy": def.setCopy(val); break;
                 case "doWhile": def.setDoWhile(val); break;
                 default: return processorDefinitionAttributeHandler().accept(def, key, val);