You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2019/11/26 10:42:59 UTC

[camel-k-runtime] 03/18: YAML: add support for Aggregate EIP

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 6361af15ab3ed0acfb4c92aa043daa57b079e872
Author: lburgazzoli <lb...@gmail.com>
AuthorDate: Sun Nov 24 11:20:04 2019 +0100

    YAML: add support for Aggregate EIP
---
 .../k/loader/yaml/parser/AggregateStepParser.java  | 64 ++++++++++++++++++
 ...efinitionTest.groovy => DefinitionsTest.groovy} |  2 +-
 .../yaml/{RouteTest.groovy => RoutesTest.groovy}   | 70 +++++++++-----------
 .../apache/camel/k/loader/yaml/TestSupport.groovy  | 38 ++++++++++-
 .../k/loader/yaml/parser/AggregateTest.groovy      | 75 ++++++++++++++++++++++
 .../resources/routes/RoutesTest_aggregator.yaml    | 25 ++++++++
 .../test/resources/routes/RoutesTest_filter.yaml   | 30 +++++++++
 .../test/resources/routes/RoutesTest_split.yaml    | 30 +++++++++
 8 files changed, 293 insertions(+), 41 deletions(-)

diff --git a/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java
new file mode 100644
index 0000000..52b192b
--- /dev/null
+++ b/camel-k-loader-yaml/src/main/java/org/apache/camel/k/loader/yaml/parser/AggregateStepParser.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.loader.yaml.parser;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.camel.Expression;
+import org.apache.camel.k.annotation.yaml.YAMLStepParser;
+import org.apache.camel.k.loader.yaml.model.Step;
+import org.apache.camel.model.AggregateDefinition;
+import org.apache.camel.model.ExpressionSubElementDefinition;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.reifier.AggregateReifier;
+import org.apache.camel.reifier.ProcessorReifier;
+
+@YAMLStepParser("aggregate")
+public class AggregateStepParser implements ProcessorStepParser {
+    static {
+        ProcessorReifier.registerReifier(Definition.class, AggregateReifier::new);
+    }
+
+    @Override
+    public ProcessorDefinition<?> toProcessor(Context context) {
+        return context.node(Definition.class);
+    }
+
+    public static final class Definition extends AggregateDefinition implements HasExpression, Step.Definition {
+        @JsonIgnore
+        public void setExpression(Expression expression) {
+            super.setExpression(expression);
+        }
+
+        public void setCorrelationExpression(CorrelationExpression correlationExpression) {
+            super.setCorrelationExpression(correlationExpression);
+        }
+    }
+
+    public static final class CorrelationExpression extends ExpressionSubElementDefinition implements HasExpression {
+        @Override
+        public void setExpression(ExpressionDefinition expressionDefinition) {
+            super.setExpressionType(expressionDefinition);
+        }
+
+        @Override
+        public ExpressionDefinition getExpression() {
+            return super.getExpressionType();
+        }
+    }
+}
+
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
similarity index 99%
rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy
rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
index b63d4ba..0fe243d 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteDefinitionTest.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/DefinitionsTest.groovy
@@ -27,7 +27,7 @@ import org.apache.commons.io.IOUtils
 
 import java.nio.charset.StandardCharsets
 
-class RouteDefinitionTest extends TestSupport {
+class DefinitionsTest extends TestSupport {
 
     def "route with id"() {
         given:
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
similarity index 63%
rename from camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy
rename to camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
index 72dd75a..23fd49b 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RouteTest.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/RoutesTest.groovy
@@ -18,27 +18,13 @@ package org.apache.camel.k.loader.yaml
 
 
 import org.apache.camel.component.mock.MockEndpoint
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
 
-class RouteTest extends TestSupport {
+class RoutesTest extends TestSupport {
 
-    def 'test split'() {
+    def 'split'() {
         setup:
-            def context = startContext('''
-                - from:
-                    uri: "direct:route"
-                    steps:
-                      - split:
-                          tokenize: ","
-                          steps:
-                            - to: "mock:split"
-                      - to: "mock:route"
-                - from:
-                    uri: "direct:flow"
-                    steps:
-                      - split:
-                          tokenize: ","
-                      - to: "mock:flow"
-            ''')
+            def context = startContext()
 
             mockEndpoint(context,'mock:split') {
                 expectedMessageCount = 3
@@ -63,35 +49,20 @@ class RouteTest extends TestSupport {
             context?.stop()
     }
 
-    def 'test filter'() {
+    def 'filter'() {
         setup:
-            def context = startContext('''
-                - from:
-                    uri: "direct:route"
-                    steps:
-                      - filter:
-                          simple: "${body.startsWith(\\"a\\")}"
-                          steps:
-                            - to: "mock:filter"
-                      - to: "mock:route"
-                - from:
-                    uri: "direct:flow"
-                    steps:
-                      - filter:
-                          simple: "${body.startsWith(\\"a\\")}"
-                      - to: "mock:flow"
-            ''')
+            def context = startContext()
 
             mockEndpoint(context, 'mock:route') {
-                expectedMessageCount = 2
+                expectedMessageCount 2
                 expectedBodiesReceived 'a', 'b'
             }
             mockEndpoint(context, 'mock:filter') {
-                expectedMessageCount = 1
+                expectedMessageCount 1
                 expectedBodiesReceived 'a'
             }
             mockEndpoint(context,'mock:flow') {
-                expectedMessageCount = 1
+                expectedMessageCount 1
                 expectedBodiesReceived 'a'
             }
         when:
@@ -106,4 +77,27 @@ class RouteTest extends TestSupport {
         cleanup:
             context?.stop()
     }
+
+    def 'aggregator'() {
+        setup:
+            def context = startContext([
+                'aggregatorStrategy': new UseLatestAggregationStrategy()
+            ])
+
+            mockEndpoint(context, 'mock:route') {
+                expectedMessageCount 2
+                expectedBodiesReceived '2', '4'
+            }
+        when:
+            context.createProducerTemplate().with {
+                sendBodyAndHeader('direct:route', '1', 'StockSymbol', 1)
+                sendBodyAndHeader('direct:route', '2', 'StockSymbol', 1)
+                sendBodyAndHeader('direct:route', '3', 'StockSymbol', 2)
+                sendBodyAndHeader('direct:route', '4', 'StockSymbol', 2)
+            }
+        then:
+            MockEndpoint.assertIsSatisfied(context)
+        cleanup:
+            context?.stop()
+    }
 }
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
index 400c59f..fea7a41 100644
--- a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/TestSupport.groovy
@@ -43,9 +43,29 @@ class TestSupport extends Specification {
     }
 
     static CamelContext startContext(String content) {
+        return startContext(content, [:])
+    }
+
+    static CamelContext startContext(String content, Map<String, Object> beans) {
+        return startContext(
+                IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8),
+                beans
+        )
+    }
+
+    static CamelContext startContext(InputStream content) {
+        return startContext(content, [:])
+    }
+
+    static CamelContext startContext(InputStream content, Map<String, Object> beans) {
         def context = new DefaultCamelContext()
-        def istream = IOUtils.toInputStream(content.stripMargin(), StandardCharsets.UTF_8)
-        def builder = new YamlSourceLoader().builder(istream)
+        def builder = new YamlSourceLoader().builder(content)
+
+        if (beans) {
+            beans.each {
+                k, v -> context.registry.bind(k, v)
+            }
+        }
 
         context.disableJMX()
         context.setStreamCaching(true)
@@ -55,6 +75,20 @@ class TestSupport extends Specification {
         return context
     }
 
+    CamelContext startContext() {
+        return startContext([:])
+    }
+
+    CamelContext startContext(Map<String, Object> beans) {
+        def name = specificationContext.currentIteration.name.replace(' ', '_')
+        def path = "/routes/${specificationContext.currentSpec.name}_${name}.yaml"
+
+        return startContext(
+                TestSupport.class.getResourceAsStream(path) as InputStream,
+                beans
+        )
+    }
+
     static MockEndpoint mockEndpoint(
             CamelContext context,
             String uri,
diff --git a/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy
new file mode 100644
index 0000000..8335dcf
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/groovy/org/apache/camel/k/loader/yaml/parser/AggregateTest.groovy
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.k.loader.yaml.parser
+
+import org.apache.camel.k.loader.yaml.TestSupport
+import org.apache.camel.model.AggregateDefinition
+import org.apache.camel.model.language.SimpleExpression
+
+class AggregateTest extends TestSupport {
+
+    def "definition"() {
+        given:
+            def stepContext = stepContext('''
+                 expression:
+                     simple: "${header.ID}"
+                 correlation-expression:
+                     simple: "${header.Count}"
+                 strategy-ref: "myAppender"
+                 completion-size: 10
+            ''')
+        when:
+            def processor = new AggregateStepParser().toProcessor(stepContext)
+        then:
+            with(processor, AggregateDefinition) {
+                strategyRef == 'myAppender'
+                completionSize == 10
+
+                with(expression, SimpleExpression) {
+                    expression ==  '${header.ID}'
+                }
+                with(correlationExpression?.expressionType, SimpleExpression) {
+                    expression == '${header.Count}'
+                }
+            }
+    }
+
+    def "compact efinition"() {
+        given:
+            def stepContext = stepContext('''
+                 simple: "${header.ID}"
+                 correlation-expression:
+                     simple: "${header.Count}"
+                 strategy-ref: "myAppender"
+                 completion-size: 10
+            ''')
+        when:
+            def processor = new AggregateStepParser().toProcessor(stepContext)
+        then:
+            with(processor, AggregateDefinition) {
+                strategyRef == 'myAppender'
+                completionSize == 10
+
+                with(expression, SimpleExpression) {
+                    expression ==  '${header.ID}'
+                }
+                with(correlationExpression?.expressionType, SimpleExpression) {
+                    expression == '${header.Count}'
+                }
+            }
+    }
+}
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml
new file mode 100644
index 0000000..1634826
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_aggregator.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+    uri: "direct:route"
+    steps:
+      - aggregate:
+          strategy-ref: "aggregatorStrategy"
+          completion-size: 2
+          correlation-expression:
+            simple: "${header.StockSymbol}"
+      - to: "mock:route"
\ No newline at end of file
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml
new file mode 100644
index 0000000..e34b1ff
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_filter.yaml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+    uri: "direct:route"
+    steps:
+      - filter:
+          simple: "${body.startsWith(\"a\")}"
+          steps:
+            - to: "mock:filter"
+      - to: "mock:route"
+- from:
+    uri: "direct:flow"
+    steps:
+      - filter:
+          simple: "${body.startsWith(\"a\")}"
+      - to: "mock:flow"
\ No newline at end of file
diff --git a/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml
new file mode 100644
index 0000000..5011482
--- /dev/null
+++ b/camel-k-loader-yaml/src/test/resources/routes/RoutesTest_split.yaml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+- from:
+    uri: "direct:route"
+    steps:
+      - split:
+          tokenize: ","
+          steps:
+            - to: "mock:split"
+      - to: "mock:route"
+- from:
+    uri: "direct:flow"
+    steps:
+      - split:
+          tokenize: ","
+      - to: "mock:flow"
\ No newline at end of file