You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/12 15:26:22 UTC

[2/2] git commit: CAMEL-6627: Splitter should close iterator when exception occurred (eg when done) to ensure not locking any files etc on windows etc.

CAMEL-6627: Splitter should close iterator when exception occurred (eg when done) to ensure not locking any files etc on windows etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76d8bee1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76d8bee1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76d8bee1

Branch: refs/heads/camel-2.11.x
Commit: 76d8bee132f1dcbecdf2c3cdcb0479f0fa725d18
Parents: 001b95b
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Aug 12 14:47:18 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Aug 12 15:25:39 2013 +0200

----------------------------------------------------------------------
 .../camel/processor/MulticastProcessor.java     |  28 ++--
 .../org/apache/camel/processor/Splitter.java    | 130 +++++++++++--------
 2 files changed, 94 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/76d8bee1/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index ecc682f..bd93719 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -57,6 +58,7 @@ import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -198,7 +200,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
         final AtomicExchange result = new AtomicExchange();
-        final Iterable<ProcessorExchangePair> pairs;
+        Iterable<ProcessorExchangePair> pairs = null;
 
         try {
             boolean sync = true;
@@ -222,14 +224,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
             // and do the done work
-            doDone(exchange, null, callback, true, false);
+            doDone(exchange, null, pairs, callback, true, false);
             return true;
         }
 
         // multicasting was processed successfully
         // and do the done work
         Exchange subExchange = result.get() != null ? result.get() : null;
-        doDone(exchange, subExchange, callback, true, true);
+        doDone(exchange, subExchange, pairs, callback, true, true);
         return true;
     }
 
@@ -603,7 +605,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                             result.set(subExchange);
                         }
                         // and do the done work
-                        doDone(original, subExchange, callback, false, true);
+                        doDone(original, subExchange, pairs, callback, false, true);
                         return;
                     }
 
@@ -613,7 +615,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                         // wrap in exception to explain where it failed
                         subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
                         // and do the done work
-                        doDone(original, subExchange, callback, false, true);
+                        doDone(original, subExchange, pairs, callback, false, true);
                         return;
                     }
 
@@ -647,7 +649,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                                 result.set(subExchange);
                             }
                             // and do the done work
-                            doDone(original, subExchange, callback, false, true);
+                            doDone(original, subExchange, pairs, callback, false, true);
                             return;
                         }
 
@@ -658,7 +660,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                             // wrap in exception to explain where it failed
                             subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
                             // and do the done work
-                            doDone(original, subExchange, callback, false, true);
+                            doDone(original, subExchange, pairs, callback, false, true);
                             return;
                         }
 
@@ -667,7 +669,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
 
                     // do the done work
                     subExchange = result.get() != null ? result.get() : null;
-                    doDone(original, subExchange, callback, false, true);
+                    doDone(original, subExchange, pairs, callback, false, true);
                 }
             });
         } finally {
@@ -733,11 +735,19 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
      *
      * @param original    the original exchange
      * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part
+     * @param pairs       the pairs with the exchanges to process
      * @param callback    the callback
      * @param doneSync    the <tt>doneSync</tt> parameter to call on callback
      * @param exhaust     whether or not error handling is exhausted
      */
-    protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) {
+    protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs,
+                          AsyncCallback callback, boolean doneSync, boolean exhaust) {
+
+        // we are done so close the pairs iterator
+        if (pairs != null && pairs instanceof Closeable) {
+            IOHelper.close((Closeable) pairs, "pairs", LOG);
+        }
+
         // cleanup any per exchange aggregation strategy
         removeAggregationStrategyFromExchange(original);
         if (original.getException() != null || subExchange != null && subExchange.getException() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/76d8bee1/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 6afd235..8c9e69f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -122,69 +122,89 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
     }
 
     private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
-        final Iterator<?> iterator = ObjectHelper.createIterator(value);
-        return new Iterable<ProcessorExchangePair>() {
-            // create a copy which we use as master to copy during splitting
-            // this avoids any side effect reflected upon the incoming exchange
-            private final Exchange copy = copyExchangeNoAttachments(exchange, true);
-            private final RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
-
-            public Iterator<ProcessorExchangePair> iterator() {
-                return new Iterator<ProcessorExchangePair>() {
-                    private int index;
-                    private boolean closed;
-
-                    public boolean hasNext() {
-                        if (closed) {
-                            return false;
-                        }
+        return new SplitterIterable(exchange, value);
+    }
 
-                        boolean answer = iterator.hasNext();
-                        if (!answer) {
-                            // we are now closed
-                            closed = true;
-                            // nothing more so we need to close the expression value in case it needs to be
-                            if (value instanceof Closeable) {
-                                IOHelper.close((Closeable) value, value.getClass().getName(), LOG);
-                            } else if (value instanceof Scanner) {
-                                // special for Scanner as it does not implement Closeable
-                                Scanner scanner = (Scanner) value;
-                                scanner.close();
-                                
-                                IOException ioException = scanner.ioException();
-                                if (ioException != null) {
-                                    throw new RuntimeCamelException("Scanner aborted because of an IOException!", ioException);
-                                }
-                            }
-                        }
-                        return answer;
+    private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {
+
+        // create a copy which we use as master to copy during splitting
+        // this avoids any side effect reflected upon the incoming exchange
+        final Object value;
+        final Iterator<?> iterator;
+        private final Exchange copy;
+        private final RouteContext routeContext;
+
+        private SplitterIterable(Exchange exchange, Object value) {
+            this.value = value;
+            this.iterator = ObjectHelper.createIterator(value);
+            this.copy = copyExchangeNoAttachments(exchange, true);
+            this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
+        }
+
+        @Override
+        public Iterator<ProcessorExchangePair> iterator() {
+            return new Iterator<ProcessorExchangePair>() {
+                private int index;
+                private boolean closed;
+
+                public boolean hasNext() {
+                    if (closed) {
+                        return false;
                     }
 
-                    public ProcessorExchangePair next() {
-                        Object part = iterator.next();
-                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
-                        // and do not share the unit of work
-                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
-                        // if we share unit of work, we need to prepare the child exchange
-                        if (isShareUnitOfWork()) {
-                            prepareSharedUnitOfWork(newExchange, copy);
+                    boolean answer = iterator.hasNext();
+                    if (!answer) {
+                        // we are now closed
+                        closed = true;
+                        // nothing more so we need to close the expression value in case it needs to be
+                        try {
+                            close();
+                        } catch (IOException e) {
+                            throw new RuntimeCamelException("Scanner aborted because of an IOException!", e);
                         }
-                        if (part instanceof Message) {
-                            newExchange.setIn((Message) part);
-                        } else {
-                            Message in = newExchange.getIn();
-                            in.setBody(part);
-                        }
-                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
                     }
+                    return answer;
+                }
 
-                    public void remove() {
-                        throw new UnsupportedOperationException("Remove is not supported by this iterator");
+                public ProcessorExchangePair next() {
+                    Object part = iterator.next();
+                    // create a correlated copy as the new exchange to be routed in the splitter from the copy
+                    // and do not share the unit of work
+                    Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
+                    // if we share unit of work, we need to prepare the child exchange
+                    if (isShareUnitOfWork()) {
+                        prepareSharedUnitOfWork(newExchange, copy);
                     }
-                };
-            }
+                    if (part instanceof Message) {
+                        newExchange.setIn((Message) part);
+                    } else {
+                        Message in = newExchange.getIn();
+                        in.setBody(part);
+                    }
+                    return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
+                }
+
+                public void remove() {
+                    throw new UnsupportedOperationException("Remove is not supported by this iterator");
+                }
+            };
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (value instanceof Closeable) {
+                IOHelper.close((Closeable) value, value.getClass().getName(), LOG);
+            } else if (value instanceof Scanner) {
+                // special for Scanner as it does not implement Closeable
+                Scanner scanner = (Scanner) value;
+                scanner.close();
 
-        };
+                IOException ioException = scanner.ioException();
+                if (ioException != null) {
+                    throw ioException;
+                }
+            }
+        }
     }
 
     private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {