You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/11/26 10:42:59 UTC
[camel-k-runtime] 03/18: YAML: add support for Aggregate EIP
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 6361af15ab3ed0acfb4c92aa043daa57b079e872
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Sun Nov 24 11:20:04 2019 +0100
YAML: add support for Aggregate EIP
---
.../k/loader/yaml/parser/AggregateStepParser.java | 64 ++++++++++++++++++
...efinitionTest.groovy => DefinitionsTest.groovy} | 2 +-
.../yaml/{RouteTest.groovy => RoutesTest.groovy} | 70 +++++++++-----------
.../apache/camel/k/loader/yaml/TestSupport.groovy | 38 ++++++++++-
.../k/loader/yaml/parser/AggregateTest.groovy | 75 ++++++++++++++++++++++
.../resources/routes/RoutesTest_aggregator.yaml | 25 ++++++++
.../test/resources/routes/RoutesTest_filter.yaml | 30 +++++++++
.../test/resources/routes/RoutesTest_split.yaml | 30 +++++++++
8 files changed, 293 insertions(+), 41 deletions(-)
diff --git a/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java
new file mode 100644
index 0000000..52b192b
--- /dev/null
+++ b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.loader.yaml.parser;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.camel.Expression;
+import org.apache.camel.k.annotation.yaml.YAMLStepParser;
+import org.apache.camel.k.loader.yaml.model.Step;
+import org.apache.camel.model.AggregateDefinition;
+import org.apache.camel.model.ExpressionSubElementDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.reifier.AggregateReifier;
+import org.apache.camel.reifier.ProcessorReifier;
+
+@YAMLStepParser("aggregate")
+public class AggregateStepParser implements ProcessorStepParser {
+ static {
+ ProcessorReifier.registerReifier(Definition.class, AggregateReifier::new);
+ }
+
+ @Override
+ public ProcessorDefinition<?> toProcessor(Context context) {
+ return context.node(Definition.class);
+ }
+
+ public static final class Definition extends AggregateDefinition implements HasExpression, Step.Definition {
+ @JsonIgnore
+ public void setExpression(Expression expression) {
+ super.setExpression(expression);
+ }
+
+ public void setCorrelationExpression(CorrelationExpression correlationExpression) {
+ super.setCorrelationExpression(correlationExpression);
+ }
+ }
+
+ public static final class CorrelationExpression extends ExpressionSubElementDefinition implements HasExpression {
+ @Override
+ public void setExpression(ExpressionDefinition expressionDefinition) {
+ super.setExpressionType(expressionDefinition);
+ }
+
+ @Override
+ public ExpressionDefinition getExpression() {
+ return super.getExpressionType();
+ }
+ }
+}
+
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
similarity index 99%
rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy
rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
index b63d4ba..0fe243d 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
@@ -27,7 +27,7 @@ import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
-class RouteDefinitionTest extends TestSupport {
+class DefinitionsTest extends TestSupport {
def "route with id"() {
given:
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
similarity index 63%
rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy
rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
index 72dd75a..23fd49b 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
@@ -18,27 +18,13 @@ package org.apache.camel.k.loader.yaml
import org.apache.camel.component.mock.MockEndpoint
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
-class RouteTest extends TestSupport {
+class RoutesTest extends TestSupport {
- def 'test split'() {
+ def 'split'() {
setup:
- def context = startContext('''
- - from:
- uri: "direct:route"
- steps:
- - split:
- tokenize: ","
- steps:
- - to: "mock:split"
- - to: "mock:route"
- - from:
- uri: "direct:flow"
- steps:
- - split:
- tokenize: ","
- - to: "mock:flow"
- ''')
+ def context = startContext()
mockEndpoint(context,'mock:split') {
expectedMessageCount = 3
@@ -63,35 +49,20 @@ class RouteTest extends TestSupport {
context?.stop()
}
- def 'test filter'() {
+ def 'filter'() {
setup:
- def context = startContext('''
- - from:
- uri: "direct:route"
- steps:
- - filter:
- simple: "${body.startsWith(\\"a\\")}"
- steps:
- - to: "mock:filter"
- - to: "mock:route"
- - from:
- uri: "direct:flow"
- steps:
- - filter:
- simple: "${body.startsWith(\\"a\\")}"
- - to: "mock:flow"
- ''')
+ def context = startContext()
mockEndpoint(context, 'mock:route') {
- expectedMessageCount = 2
+ expectedMessageCount 2
expectedBodiesReceived 'a', 'b'
}
mockEndpoint(context, 'mock:filter') {
- expectedMessageCount = 1
+ expectedMessageCount 1
expectedBodiesReceived 'a'
}
mockEndpoint(context,'mock:flow') {
- expectedMessageCount = 1
+ expectedMessageCount 1
expectedBodiesReceived 'a'
}
when:
@@ -106,4 +77,27 @@ class RouteTest extends TestSupport {
cleanup:
context?.stop()
}
+
+ def 'aggregator'() {
+ setup:
+ def context = startContext([
+ 'aggregatorStrategy': new UseLatestAggregationStrategy()
+ ])
+
+ mockEndpoint(context, 'mock:route') {
+ expectedMessageCount 2
+ expectedBodiesReceived '2', '4'
+ }
+ when:
+ context.createProducerTemplate().with {
+ sendBodyAndHeader('direct:route', '1', 'StockSymbol', 1)
+ sendBodyAndHeader('direct:route', '2', 'StockSymbol', 1)
+ sendBodyAndHeader('direct:route', '3', 'StockSymbol', 2)
+ sendBodyAndHeader('direct:route', '4', 'StockSymbol', 2)
+ }
+ then:
+ MockEndpoint.assertIsSatisfied(context)
+ cleanup:
+ context?.stop()
+ }
}
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
index 400c59f..fea7a41 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
@@ -43,9 +43,29 @@ class TestSupport extends Specification {
}
static CamelContext startContext(String content) {
+ return startContext(content, [:])
+ }
+
+ static CamelContext startContext(String content, Map<String, Object> beans) {
+ return startContext(
+ IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8),
+ beans
+ )
+ }
+
+ static CamelContext startContext(InputStream content) {
+ return startContext(content, [:])
+ }
+
+ static CamelContext startContext(InputStream content, Map<String, Object> beans) {
def context = new DefaultCamelContext()
- def istream = IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8)
- def builder = new YamlSourceLoader().builder(istream)
+ def builder = new YamlSourceLoader().builder(content)
+
+ if (beans) {
+ beans.each {
+ k, v -> context.registry.bind(k, v)
+ }
+ }
context.disableJMX()
context.setStreamCaching(true)
@@ -55,6 +75,20 @@ class TestSupport extends Specification {
return context
}
+ CamelContext startContext() {
+ return startContext([:])
+ }
+
+ CamelContext startContext(Map<String, Object> beans) {
+ def name = specificationContext.currentIteration.name.replace(' ', '_')
+ def path = "/routes/${specificationContext.currentSpec.name}_${name}.yaml"
+
+ return startContext(
+ TestSupport.class.getResourceAsStream(path) as InputStream,
+ beans
+ )
+ }
+
static MockEndpoint mockEndpoint(
CamelContext context,
String uri,
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy
new file mode 100644
index 0000000..8335dcf
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.loader.yaml.parser
+
+import org.apache.camel.k.loader.yaml.TestSupport
+import org.apache.camel.model.AggregateDefinition
+import org.apache.camel.model.language.SimpleExpression
+
+class AggregateTest extends TestSupport {
+
+ def "definition"() {
+ given:
+ def stepContext = stepContext('''
+ expression:
+ simple: "${header.ID}"
+ correlation-expression:
+ simple: "${header.Count}"
+ strategy-ref: "myAppender"
+ completion-size: 10
+ ''')
+ when:
+ def processor = new AggregateStepParser().toProcessor(stepContext)
+ then:
+ with(processor, AggregateDefinition) {
+ strategyRef == 'myAppender'
+ completionSize == 10
+
+ with(expression, SimpleExpression) {
+ expression == '${header.ID}'
+ }
+ with(correlationExpression?.expressionType, SimpleExpression) {
+ expression == '${header.Count}'
+ }
+ }
+ }
+
+ def "compact efinition"() {
+ given:
+ def stepContext = stepContext('''
+ simple: "${header.ID}"
+ correlation-expression:
+ simple: "${header.Count}"
+ strategy-ref: "myAppender"
+ completion-size: 10
+ ''')
+ when:
+ def processor = new AggregateStepParser().toProcessor(stepContext)
+ then:
+ with(processor, AggregateDefinition) {
+ strategyRef == 'myAppender'
+ completionSize == 10
+
+ with(expression, SimpleExpression) {
+ expression == '${header.ID}'
+ }
+ with(correlationExpression?.expressionType, SimpleExpression) {
+ expression == '${header.Count}'
+ }
+ }
+ }
+}
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml
new file mode 100644
index 0000000..1634826
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+ uri: "direct:route"
+ steps:
+ - aggregate:
+ strategy-ref: "aggregatorStrategy"
+ completion-size: 2
+ correlation-expression:
+ simple: "${header.StockSymbol}"
+ - to: "mock:route"
\ No newline at end of file
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml
new file mode 100644
index 0000000..e34b1ff
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+ uri: "direct:route"
+ steps:
+ - filter:
+ simple: "${body.startsWith(\"a\")}"
+ steps:
+ - to: "mock:filter"
+ - to: "mock:route"
+- from:
+ uri: "direct:flow"
+ steps:
+ - filter:
+ simple: "${body.startsWith(\"a\")}"
+ - to: "mock:flow"
\ No newline at end of file
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml
new file mode 100644
index 0000000..5011482
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+ uri: "direct:route"
+ steps:
+ - split:
+ tokenize: ","
+ steps:
+ - to: "mock:split"
+ - to: "mock:route"
+- from:
+ uri: "direct:flow"
+ steps:
+ - split:
+ tokenize: ","
+ - to: "mock:flow"
\ No newline at end of file