You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/22 17:53:36 UTC

[4/5] camel git commit: CAMEL-9745: Splitter - Should skip null messages if iterator returns null

CAMEL-9745: Splitter - Should skip null messages if iterator returns null


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1177cf29
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1177cf29
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1177cf29

Branch: refs/heads/camel-2.16.x
Commit: 1177cf29f00d34525a707bd46877b5b58ffb92bb
Parents: d86dd18
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:51:58 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:53:14 2016 +0100

----------------------------------------------------------------------
 .../camel/processor/MulticastProcessor.java     |  9 ++
 .../org/apache/camel/processor/Splitter.java    | 48 ++++++-----
 .../camel/processor/SplitIteratorNullTest.java  | 91 ++++++++++++++++++++
 .../tarfile/TarSplitterRouteIssueTest.java      |  2 -
 4 files changed, 127 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 65a4d51..c8470e3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -293,6 +293,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
 
             while (it.hasNext()) {
                 final ProcessorExchangePair pair = it.next();
+                // in case the iterator returns null then continue to next
+                if (pair == null) {
+                    continue;
+                }
+
                 final Exchange subExchange = pair.getExchange();
                 updateNewExchange(subExchange, total.intValue(), pairs, it);
 
@@ -590,6 +595,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
 
         while (it.hasNext()) {
             ProcessorExchangePair pair = it.next();
+            // in case the iterator returns null then continue to next
+            if (pair == null) {
+                continue;
+            }
             Exchange subExchange = pair.getExchange();
             updateNewExchange(subExchange, total.get(), pairs, it);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 55a9bd9..11682ef 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -176,28 +176,32 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
 
                 public ProcessorExchangePair next() {
                     Object part = iterator.next();
-                    // create a correlated copy as the new exchange to be routed in the splitter from the copy
-                    // and do not share the unit of work
-                    Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
-                    // If the splitter has an aggregation strategy
-                    // then the StreamCache created by the child routes must not be 
-                    // closed by the unit of work of the child route, but by the unit of 
-                    // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
-                    // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
-                    if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
-                        newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
-                    }
-                    // if we share unit of work, we need to prepare the child exchange
-                    if (isShareUnitOfWork()) {
-                        prepareSharedUnitOfWork(newExchange, copy);
-                    }
-                    if (part instanceof Message) {
-                        newExchange.setIn((Message) part);
+                    if (part != null) {
+                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
+                        // and do not share the unit of work
+                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+                        // If the splitter has an aggregation strategy
+                        // then the StreamCache created by the child routes must not be
+                        // closed by the unit of work of the child route, but by the unit of
+                        // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
+                        // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
+                        if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+                            newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
+                        }
+                        // if we share unit of work, we need to prepare the child exchange
+                        if (isShareUnitOfWork()) {
+                            prepareSharedUnitOfWork(newExchange, copy);
+                        }
+                        if (part instanceof Message) {
+                            newExchange.setIn((Message) part);
+                        } else {
+                            Message in = newExchange.getIn();
+                            in.setBody(part);
+                        }
+                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
                     } else {
-                        Message in = newExchange.getIn();
-                        in.setBody(part);
+                        return null;
                     }
-                    return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
                 }
 
                 public void remove() {
@@ -231,7 +235,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
         Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
         try {
             for (ProcessorExchangePair pair : pairs) {
-                result.add(pair);
+                if (pair != null) {
+                    result.add(pair);
+                }
             }
         } finally {
             if (pairs instanceof Closeable) {

http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
new file mode 100644
index 0000000..c44ec85
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class SplitIteratorNullTest extends ContextTestSupport {
+
+    private MyIterator myIterator = new MyIterator();
+
+    public void testSplitIteratorNull() throws Exception {
+        assertFalse(myIterator.isNullReturned());
+        getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+        assertTrue(myIterator.isNullReturned());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(constant(myIterator))
+                        .to("mock:line");
+            }
+        };
+    }
+
+    private class MyIterator implements Iterator<String> {
+
+        private int count = 4;
+        private boolean nullReturned;
+
+        @Override
+        public boolean hasNext() {
+            // we return true one extra time, and cause next to return null
+            return count > 0;
+        }
+
+        @Override
+        public String next() {
+            count--;
+            if (count == 0) {
+                nullReturned = true;
+                return null;
+            } else if (count == 1) {
+                return "C";
+            } else if (count == 2) {
+                return "B";
+            } else {
+                return "A";
+            }
+        }
+
+        public boolean isNullReturned() {
+            return nullReturned;
+        }
+
+        @Override
+        public void remove() {
+            // noop
+        }
+
+        @Override
+        public void forEachRemaining(Consumer<? super String> action) {
+            // noop
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
index 0a67b66..a54bea0 100644
--- a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
+++ b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
@@ -20,7 +20,6 @@ import java.io.File;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TarSplitterRouteIssueTest extends CamelTestSupport {
@@ -32,7 +31,6 @@ public class TarSplitterRouteIssueTest extends CamelTestSupport {
     }
 
     @Test
-    @Ignore("CAMEL-9735: There are 3 files in the .tar file but the TarIterator has a bug causing +1 extra")
     public void testSplitter() throws Exception {
         getMockEndpoint("mock:entry").expectedMessageCount(3);