You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/12/22 05:18:50 UTC
[camel] branch master updated: CAMEL-15578: Loop EIP - Add option
to break out if shutting down Camel (#4811)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new f5d3a0d CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811)
f5d3a0d is described below
commit f5d3a0de679be01cd3120c1ee17cbf1d5284f9e4
Author: mschnitzler <sc...@gmail.com>
AuthorDate: Tue Dec 22 06:15:31 2020 +0100
CAMEL-15578: Loop EIP - Add option to break out if shutting down Camel (#4811)
* CAMEL-15578: add option "breakOnShutdown" to Loop EIP
Setting the "breakOnShutdown" option on the Loop EIP
allows to break out of the loop earlier if the context
is shutting down.
* CAMEL-15578: add missing license header
* CAMEL-15578: add option "breakOnShutdown" to Loop EIP
---
.../org/apache/camel/catalog/models/loop.json | 1 +
.../apache/camel/catalog/schemas/camel-spring.xsd | 8 +++
.../resources/org/apache/camel/model/loop.json | 1 +
.../org/apache/camel/model/LoopDefinition.java | 22 +++++++
.../org/apache/camel/processor/LoopProcessor.java | 34 ++++++++--
.../java/org/apache/camel/reifier/LoopReifier.java | 3 +-
.../camel/processor/LoopBreakOnShutdownTest.java | 75 ++++++++++++++++++++++
.../camel/processor/LoopNoBreakOnShutdownTest.java | 72 +++++++++++++++++++++
.../java/org/apache/camel/xml/in/ModelParser.java | 1 +
9 files changed, 212 insertions(+), 5 deletions(-)
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
index 9f23152..82adb12 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/loop.json
@@ -14,6 +14,7 @@
"expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...]
"copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." },
"doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." },
+ "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
}
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index 0c5d4e3..0530ef7 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -5586,6 +5586,14 @@ null. Default value: false
]]></xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="breakOnShutdown" type="xs:string">
+ <xs:annotation>
+ <xs:documentation xml:lang="en"><![CDATA[
+If the breakOnShutdown attribute is true, then the loop will not iterate until
+it reaches the end when Camel is shut down. Default value: false
+ ]]></xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:extension>
</xs:complexContent>
</xs:complexType>
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
index 9f23152..82adb12 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/loop.json
@@ -14,6 +14,7 @@
"expression": { "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "joor", "jsonpath", "language", "method", "mvel", "ognl", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "secret": false, "description": "Expression to [...]
"copy": { "kind": "attribute", "displayName": "Copy", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the copy attribute is true, a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over, so each iteration may have different message content." },
"doWhile": { "kind": "attribute", "displayName": "Do While", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables the while loop that loops until the predicate evaluates to false or null." },
+ "breakOnShutdown": { "kind": "attribute", "displayName": "Break On Shutdown", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is shut down." },
"id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
"description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
}
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
index 7ec080d..02e62c5 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/LoopDefinition.java
@@ -40,6 +40,9 @@ public class LoopDefinition extends OutputExpressionNode {
@XmlAttribute
@Metadata(javaType = "java.lang.Boolean")
private String doWhile;
+ @XmlAttribute
+ @Metadata(javaType = "java.lang.Boolean")
+ private String breakOnShutdown;
public LoopDefinition() {
}
@@ -92,6 +95,25 @@ public class LoopDefinition extends OutputExpressionNode {
this.copy = copy;
}
+ public LoopDefinition breakOnShutdown() {
+ setBreakOnShutdown(Boolean.toString(true));
+ return this;
+ }
+
+ /**
+ * If the breakOnShutdown attribute is true, then the loop will not iterate until it reaches the end when Camel is
+ * shut down.
+ *
+ * @param breakOnShutdown a Boolean-parsable String
+ */
+ public void setBreakOnShutdown(String breakOnShutdown) {
+ this.breakOnShutdown = breakOnShutdown;
+ }
+
+ public String getBreakOnShutdown() {
+ return breakOnShutdown;
+ }
+
@Override
public String toString() {
return "Loop[" + getExpression() + " -> " + getOutputs() + "]";
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 834dc8a..1a5160d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -24,10 +24,12 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.slf4j.Logger;
@@ -38,32 +40,36 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
/**
* The processor which sends messages in a loop.
*/
-public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
+public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware, ShutdownAware {
private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
private String id;
private String routeId;
+ private LoopState state;
+ private boolean shutdownPending;
private final CamelContext camelContext;
private final ReactiveExecutor reactiveExecutor;
private final Expression expression;
private final Predicate predicate;
private final boolean copy;
+ private final boolean breakOnShutdown;
public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate,
- boolean copy) {
+ boolean copy, boolean breakOnShutdown) {
super(processor);
this.camelContext = camelContext;
this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.expression = expression;
this.predicate = predicate;
this.copy = copy;
+ this.breakOnShutdown = breakOnShutdown;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- LoopState state = new LoopState(exchange, callback);
+ state = new LoopState(exchange, callback);
if (exchange.isTransacted()) {
reactiveExecutor.scheduleSync(state);
@@ -78,6 +84,21 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
}
}
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ return !breakOnShutdown;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return state.getPendingSize();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ shutdownPending = true;
+ }
+
/**
* Class holding state for loop processing
*/
@@ -111,9 +132,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
boolean cont = continueProcessing(current, "so breaking out of loop", LOG);
boolean doWhile = predicate == null || predicate.matches(current);
boolean doLoop = expression == null || index < count;
+ boolean isStopping = shutdownPending && breakOnShutdown;
// iterate
- if (cont && doWhile && doLoop) {
+ if (cont && doWhile && doLoop && !isStopping) {
// and prepare for next iteration
current = prepareExchange(exchange, index);
@@ -143,6 +165,10 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
}
}
+ public int getPendingSize() {
+ return Math.max(count - index, 0);
+ }
+
@Override
public String toString() {
return "LoopState";
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
index 219d658..bcdc731 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/LoopReifier.java
@@ -35,6 +35,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
Processor output = this.createChildProcessor(true);
boolean isCopy = parseBoolean(definition.getCopy(), false);
boolean isWhile = parseBoolean(definition.getDoWhile(), false);
+ boolean isBreakOnShutdown = parseBoolean(definition.getBreakOnShutdown(), false);
Predicate predicate = null;
Expression expression = null;
@@ -43,7 +44,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
} else {
expression = createExpression(definition.getExpression());
}
- return new LoopProcessor(camelContext, output, expression, predicate, isCopy);
+ return new LoopProcessor(camelContext, output, expression, predicate, isCopy, isBreakOnShutdown);
}
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java
new file mode 100644
index 0000000..5127a7d
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopBreakOnShutdownTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+class LoopBreakOnShutdownTest extends ContextTestSupport {
+
+ private static final int LOOP_COUNT = 100;
+
+ @Test
+ void testLoopBreakOnShutdown() {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ CompletableFuture<Object> future = template.asyncSendBody("seda:foo", 0);
+ await().atMost(1, SECONDS).until(future::isDone);
+
+ context.stop();
+
+ int received = mock.getReceivedCounter();
+ assertThat(received, is(lessThan(LOOP_COUNT)));
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("seda:foo")
+ .startupOrder(1)
+ .loop(LOOP_COUNT).breakOnShutdown().delay(50)
+ .to("seda:bar");
+
+ from("seda:bar")
+ .startupOrder(2)
+ .shutdownRoute(ShutdownRoute.Defer)
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java
new file mode 100644
index 0000000..90ed28b
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/LoopNoBreakOnShutdownTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ShutdownRoute;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+
+class LoopNoBreakOnShutdownTest extends ContextTestSupport {
+
+ private static final int LOOP_COUNT = 100;
+
+ @Test
+ void testLoopNoBreakOnShutdown() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMinimumMessageCount(LOOP_COUNT);
+
+ CompletableFuture<Object> future = template.asyncSendBody("seda:foo", "foo");
+ await().atMost(1, SECONDS).until(future::isDone);
+
+ context.stop();
+
+ mock.assertIsSatisfied();
+ }
+
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+ public void configure() {
+
+ from("seda:foo")
+ .startupOrder(1)
+ .loop(LOOP_COUNT).delay(50)
+ .to("seda:bar");
+
+ from("seda:bar")
+ .startupOrder(2)
+ .shutdownRoute(ShutdownRoute.Defer)
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index fca47d5..7424b51 100644
--- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -573,6 +573,7 @@ public class ModelParser extends BaseParser {
protected LoopDefinition doParseLoopDefinition() throws IOException, XmlPullParserException {
return doParse(new LoopDefinition(), (def, key, val) -> {
switch (key) {
+ case "breakOnShutdown": def.setBreakOnShutdown(val); break;
case "copy": def.setCopy(val); break;
case "doWhile": def.setDoWhile(val); break;
default: return processorDefinitionAttributeHandler().accept(def, key, val);