You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/03/22 17:53:33 UTC
[1/5] camel git commit: CAMEL-9745: Splitter - Should skip null
messages if iterator returns null
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x d86dd18f8 -> 77fd40bf6
refs/heads/camel-2.17.x f78935eee -> 2363157a8
refs/heads/master 07b489872 -> aa21cf13c
CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/aa21cf13
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/aa21cf13
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/aa21cf13
Branch: refs/heads/master
Commit: aa21cf13ce213aacf9477e416afb39808f7b3481
Parents: 07b4898
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:51:58 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:51:58 2016 +0100
----------------------------------------------------------------------
.../camel/processor/MulticastProcessor.java | 9 ++
.../org/apache/camel/processor/Splitter.java | 48 ++++++-----
.../camel/processor/SplitIteratorNullTest.java | 91 ++++++++++++++++++++
.../tarfile/TarSplitterRouteIssueTest.java | 2 -
4 files changed, 127 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/aa21cf13/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 5f96ba2..fc7da80 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -293,6 +293,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
final ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
+
final Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.intValue(), pairs, it);
@@ -590,6 +595,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.get(), pairs, it);
http://git-wip-us.apache.org/repos/asf/camel/blob/aa21cf13/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 40ca426..fba3f71 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -180,28 +180,32 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
public ProcessorExchangePair next() {
Object part = iterator.next();
- // create a correlated copy as the new exchange to be routed in the splitter from the copy
- // and do not share the unit of work
- Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
- // If the splitter has an aggregation strategy
- // then the StreamCache created by the child routes must not be
- // closed by the unit of work of the child route, but by the unit of
- // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
- // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
- if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
- newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
- }
- // if we share unit of work, we need to prepare the child exchange
- if (isShareUnitOfWork()) {
- prepareSharedUnitOfWork(newExchange, copy);
- }
- if (part instanceof Message) {
- newExchange.setIn((Message) part);
+ if (part != null) {
+ // create a correlated copy as the new exchange to be routed in the splitter from the copy
+ // and do not share the unit of work
+ Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+ // If the splitter has an aggregation strategy
+ // then the StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
+ // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
+ }
+ // if we share unit of work, we need to prepare the child exchange
+ if (isShareUnitOfWork()) {
+ prepareSharedUnitOfWork(newExchange, copy);
+ }
+ if (part instanceof Message) {
+ newExchange.setIn((Message) part);
+ } else {
+ Message in = newExchange.getIn();
+ in.setBody(part);
+ }
+ return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
} else {
- Message in = newExchange.getIn();
- in.setBody(part);
+ return null;
}
- return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
}
public void remove() {
@@ -235,7 +239,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
try {
for (ProcessorExchangePair pair : pairs) {
- result.add(pair);
+ if (pair != null) {
+ result.add(pair);
+ }
}
} finally {
if (pairs instanceof Closeable) {
http://git-wip-us.apache.org/repos/asf/camel/blob/aa21cf13/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
new file mode 100644
index 0000000..c44ec85
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class SplitIteratorNullTest extends ContextTestSupport {
+
+ private MyIterator myIterator = new MyIterator();
+
+ public void testSplitIteratorNull() throws Exception {
+ assertFalse(myIterator.isNullReturned());
+ getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ assertTrue(myIterator.isNullReturned());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(constant(myIterator))
+ .to("mock:line");
+ }
+ };
+ }
+
+ private class MyIterator implements Iterator<String> {
+
+ private int count = 4;
+ private boolean nullReturned;
+
+ @Override
+ public boolean hasNext() {
+ // we return true one extra time, and cause next to return null
+ return count > 0;
+ }
+
+ @Override
+ public String next() {
+ count--;
+ if (count == 0) {
+ nullReturned = true;
+ return null;
+ } else if (count == 1) {
+ return "C";
+ } else if (count == 2) {
+ return "B";
+ } else {
+ return "A";
+ }
+ }
+
+ public boolean isNullReturned() {
+ return nullReturned;
+ }
+
+ @Override
+ public void remove() {
+ // noop
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super String> action) {
+ // noop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/aa21cf13/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
index d5702eb..1e38a2f 100644
--- a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
+++ b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
import org.junit.Test;
public class TarSplitterRouteIssueTest extends CamelTestSupport {
@@ -32,7 +31,6 @@ public class TarSplitterRouteIssueTest extends CamelTestSupport {
}
@Test
- @Ignore("CAMEL-9735: There are 3 files in the .tar file but the TarIterator has a bug causing +1 extra")
public void testSplitter() throws Exception {
getMockEndpoint("mock:entry").expectedMessageCount(3);
[4/5] camel git commit: CAMEL-9745: Splitter - Should skip null
messages if iterator returns null
Posted by da...@apache.org.
CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1177cf29
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1177cf29
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1177cf29
Branch: refs/heads/camel-2.16.x
Commit: 1177cf29f00d34525a707bd46877b5b58ffb92bb
Parents: d86dd18
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:51:58 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:53:14 2016 +0100
----------------------------------------------------------------------
.../camel/processor/MulticastProcessor.java | 9 ++
.../org/apache/camel/processor/Splitter.java | 48 ++++++-----
.../camel/processor/SplitIteratorNullTest.java | 91 ++++++++++++++++++++
.../tarfile/TarSplitterRouteIssueTest.java | 2 -
4 files changed, 127 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 65a4d51..c8470e3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -293,6 +293,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
final ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
+
final Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.intValue(), pairs, it);
@@ -590,6 +595,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.get(), pairs, it);
http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 55a9bd9..11682ef 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -176,28 +176,32 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
public ProcessorExchangePair next() {
Object part = iterator.next();
- // create a correlated copy as the new exchange to be routed in the splitter from the copy
- // and do not share the unit of work
- Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
- // If the splitter has an aggregation strategy
- // then the StreamCache created by the child routes must not be
- // closed by the unit of work of the child route, but by the unit of
- // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
- // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
- if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
- newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
- }
- // if we share unit of work, we need to prepare the child exchange
- if (isShareUnitOfWork()) {
- prepareSharedUnitOfWork(newExchange, copy);
- }
- if (part instanceof Message) {
- newExchange.setIn((Message) part);
+ if (part != null) {
+ // create a correlated copy as the new exchange to be routed in the splitter from the copy
+ // and do not share the unit of work
+ Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+ // If the splitter has an aggregation strategy
+ // then the StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
+ // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
+ }
+ // if we share unit of work, we need to prepare the child exchange
+ if (isShareUnitOfWork()) {
+ prepareSharedUnitOfWork(newExchange, copy);
+ }
+ if (part instanceof Message) {
+ newExchange.setIn((Message) part);
+ } else {
+ Message in = newExchange.getIn();
+ in.setBody(part);
+ }
+ return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
} else {
- Message in = newExchange.getIn();
- in.setBody(part);
+ return null;
}
- return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
}
public void remove() {
@@ -231,7 +235,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
try {
for (ProcessorExchangePair pair : pairs) {
- result.add(pair);
+ if (pair != null) {
+ result.add(pair);
+ }
}
} finally {
if (pairs instanceof Closeable) {
http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
new file mode 100644
index 0000000..c44ec85
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class SplitIteratorNullTest extends ContextTestSupport {
+
+ private MyIterator myIterator = new MyIterator();
+
+ public void testSplitIteratorNull() throws Exception {
+ assertFalse(myIterator.isNullReturned());
+ getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ assertTrue(myIterator.isNullReturned());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(constant(myIterator))
+ .to("mock:line");
+ }
+ };
+ }
+
+ private class MyIterator implements Iterator<String> {
+
+ private int count = 4;
+ private boolean nullReturned;
+
+ @Override
+ public boolean hasNext() {
+ // we return true one extra time, and cause next to return null
+ return count > 0;
+ }
+
+ @Override
+ public String next() {
+ count--;
+ if (count == 0) {
+ nullReturned = true;
+ return null;
+ } else if (count == 1) {
+ return "C";
+ } else if (count == 2) {
+ return "B";
+ } else {
+ return "A";
+ }
+ }
+
+ public boolean isNullReturned() {
+ return nullReturned;
+ }
+
+ @Override
+ public void remove() {
+ // noop
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super String> action) {
+ // noop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1177cf29/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
index 0a67b66..a54bea0 100644
--- a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
+++ b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
import org.junit.Test;
public class TarSplitterRouteIssueTest extends CamelTestSupport {
@@ -32,7 +31,6 @@ public class TarSplitterRouteIssueTest extends CamelTestSupport {
}
@Test
- @Ignore("CAMEL-9735: There are 3 files in the .tar file but the TarIterator has a bug causing +1 extra")
public void testSplitter() throws Exception {
getMockEndpoint("mock:entry").expectedMessageCount(3);
[5/5] camel git commit: CAMEL-9745: Splitter - Should skip null
messages if iterator returns null
Posted by da...@apache.org.
CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77fd40bf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77fd40bf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77fd40bf
Branch: refs/heads/camel-2.16.x
Commit: 77fd40bf6c0827970c5781b829de50e8e101f369
Parents: 1177cf2
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:52:56 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:53:21 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/processor/SplitIteratorNullTest.java | 5 -----
1 file changed, 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/77fd40bf/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
index c44ec85..f612c27 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import java.util.Iterator;
-import java.util.function.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -83,9 +82,5 @@ public class SplitIteratorNullTest extends ContextTestSupport {
// noop
}
- @Override
- public void forEachRemaining(Consumer<? super String> action) {
- // noop
- }
}
}
[2/5] camel git commit: CAMEL-9745: Splitter - Should skip null
messages if iterator returns null
Posted by da...@apache.org.
CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/586609f1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/586609f1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/586609f1
Branch: refs/heads/camel-2.17.x
Commit: 586609f1619ad8266541042d2537d477fee3dee8
Parents: f78935e
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:51:58 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:52:26 2016 +0100
----------------------------------------------------------------------
.../camel/processor/MulticastProcessor.java | 9 ++
.../org/apache/camel/processor/Splitter.java | 48 ++++++-----
.../camel/processor/SplitIteratorNullTest.java | 91 ++++++++++++++++++++
.../tarfile/TarSplitterRouteIssueTest.java | 2 -
4 files changed, 127 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 5f96ba2..fc7da80 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -293,6 +293,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
final ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
+
final Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.intValue(), pairs, it);
@@ -590,6 +595,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
while (it.hasNext()) {
ProcessorExchangePair pair = it.next();
+ // in case the iterator returns null then continue to next
+ if (pair == null) {
+ continue;
+ }
Exchange subExchange = pair.getExchange();
updateNewExchange(subExchange, total.get(), pairs, it);
http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 40ca426..fba3f71 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -180,28 +180,32 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
public ProcessorExchangePair next() {
Object part = iterator.next();
- // create a correlated copy as the new exchange to be routed in the splitter from the copy
- // and do not share the unit of work
- Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
- // If the splitter has an aggregation strategy
- // then the StreamCache created by the child routes must not be
- // closed by the unit of work of the child route, but by the unit of
- // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
- // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
- if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
- newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
- }
- // if we share unit of work, we need to prepare the child exchange
- if (isShareUnitOfWork()) {
- prepareSharedUnitOfWork(newExchange, copy);
- }
- if (part instanceof Message) {
- newExchange.setIn((Message) part);
+ if (part != null) {
+ // create a correlated copy as the new exchange to be routed in the splitter from the copy
+ // and do not share the unit of work
+ Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+ // If the splitter has an aggregation strategy
+ // then the StreamCache created by the child routes must not be
+ // closed by the unit of work of the child route, but by the unit of
+ // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
+ // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
+ if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
+ newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
+ }
+ // if we share unit of work, we need to prepare the child exchange
+ if (isShareUnitOfWork()) {
+ prepareSharedUnitOfWork(newExchange, copy);
+ }
+ if (part instanceof Message) {
+ newExchange.setIn((Message) part);
+ } else {
+ Message in = newExchange.getIn();
+ in.setBody(part);
+ }
+ return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
} else {
- Message in = newExchange.getIn();
- in.setBody(part);
+ return null;
}
- return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
}
public void remove() {
@@ -235,7 +239,9 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
try {
for (ProcessorExchangePair pair : pairs) {
- result.add(pair);
+ if (pair != null) {
+ result.add(pair);
+ }
}
} finally {
if (pairs instanceof Closeable) {
http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
new file mode 100644
index 0000000..c44ec85
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Iterator;
+import java.util.function.Consumer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class SplitIteratorNullTest extends ContextTestSupport {
+
+ private MyIterator myIterator = new MyIterator();
+
+ public void testSplitIteratorNull() throws Exception {
+ assertFalse(myIterator.isNullReturned());
+ getMockEndpoint("mock:line").expectedBodiesReceived("A", "B", "C");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ assertTrue(myIterator.isNullReturned());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(constant(myIterator))
+ .to("mock:line");
+ }
+ };
+ }
+
+ private class MyIterator implements Iterator<String> {
+
+ private int count = 4;
+ private boolean nullReturned;
+
+ @Override
+ public boolean hasNext() {
+ // we return true one extra time, and cause next to return null
+ return count > 0;
+ }
+
+ @Override
+ public String next() {
+ count--;
+ if (count == 0) {
+ nullReturned = true;
+ return null;
+ } else if (count == 1) {
+ return "C";
+ } else if (count == 2) {
+ return "B";
+ } else {
+ return "A";
+ }
+ }
+
+ public boolean isNullReturned() {
+ return nullReturned;
+ }
+
+ @Override
+ public void remove() {
+ // noop
+ }
+
+ @Override
+ public void forEachRemaining(Consumer<? super String> action) {
+ // noop
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/586609f1/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
index d5702eb..1e38a2f 100644
--- a/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
+++ b/components/camel-tarfile/src/test/java/org/apache/camel/dataformat/tarfile/TarSplitterRouteIssueTest.java
@@ -20,7 +20,6 @@ import java.io.File;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Ignore;
import org.junit.Test;
public class TarSplitterRouteIssueTest extends CamelTestSupport {
@@ -32,7 +31,6 @@ public class TarSplitterRouteIssueTest extends CamelTestSupport {
}
@Test
- @Ignore("CAMEL-9735: There are 3 files in the .tar file but the TarIterator has a bug causing +1 extra")
public void testSplitter() throws Exception {
getMockEndpoint("mock:entry").expectedMessageCount(3);
[3/5] camel git commit: CAMEL-9745: Splitter - Should skip null
messages if iterator returns null
Posted by da...@apache.org.
CAMEL-9745: Splitter - Should skip null messages if iterator returns null
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2363157a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2363157a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2363157a
Branch: refs/heads/camel-2.17.x
Commit: 2363157a81051d92397ffcccfd2ca08fb5b8c93b
Parents: 586609f
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Mar 22 17:52:56 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 22 17:52:56 2016 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/processor/SplitIteratorNullTest.java | 5 -----
1 file changed, 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2363157a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
index c44ec85..f612c27 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitIteratorNullTest.java
@@ -17,7 +17,6 @@
package org.apache.camel.processor;
import java.util.Iterator;
-import java.util.function.Consumer;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -83,9 +82,5 @@ public class SplitIteratorNullTest extends ContextTestSupport {
// noop
}
- @Override
- public void forEachRemaining(Consumer<? super String> action) {
- // noop
- }
}
}