You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/02 10:25:32 UTC

svn commit: r959916 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/Splitter.java test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java

Author: davsclaus
Date: Fri Jul  2 08:25:32 2010
New Revision: 959916

URL: http://svn.apache.org/viewvc?rev=959916&view=rev
Log:
CAMEL-2897: Fixed issue with splitting files on Windows using tokenizer (Scanner) which must be closed when no longer in need to avoid Windows keeping lock on the file.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=959916&r1=959915&r2=959916&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Fri Jul  2 08:25:32 2010
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Scanner;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -33,7 +35,10 @@ import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
 import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import static org.apache.camel.util.ObjectHelper.notNull;
 
@@ -46,6 +51,8 @@ import static org.apache.camel.util.Obje
  * @version $Revision$
  */
 public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
+    private static final transient Log LOG = LogFactory.getLog(Splitter.class);
+
     private final Expression expression;
 
     public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
@@ -98,17 +105,33 @@ public class Splitter extends MulticastP
     }
 
     @SuppressWarnings("unchecked")
-    private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, Object value) {
+    private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
         final Iterator iterator = ObjectHelper.createIterator(value);
         return new Iterable() {
 
             public Iterator iterator() {
                 return new Iterator() {
-
                     private int index;
+                    private boolean closed;
 
                     public boolean hasNext() {
-                        return iterator.hasNext();
+                        if (closed) {
+                            return false;
+                        }
+
+                        boolean answer = iterator.hasNext();
+                        if (!answer) {
+                            // we are now closed
+                            closed = true;
+                            // nothing more so we need to close the expression value in case it needs to be
+                            if (value instanceof Closeable) {
+                                IOHelper.close((Closeable) value, value.getClass().getName(), LOG);
+                            } else if (value instanceof Scanner) {
+                                // special for Scanner as it does not implement Closeable
+                                ((Scanner) value).close();
+                            }
+                        }
+                        return answer;
                     }
 
                     public Object next() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java?rev=959916&r1=959915&r2=959916&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/FileSplitStreamingWithChoiceTest.java Fri Jul  2 08:25:32 2010
@@ -30,7 +30,6 @@ public class FileSplitStreamingWithChoic
     protected void setUp() throws Exception {
         deleteDirectory("target/filesplit");
         super.setUp();
-
     }
 
     public void testSplitStreamingWithChoice() throws Exception {
@@ -39,6 +38,9 @@ public class FileSplitStreamingWithChoic
         MockEndpoint mock = getMockEndpoint("mock:body");
         mock.expectedBodiesReceived("line1", "line2", "line3");
 
+        // should be moved to this directory after we are done
+        mock.expectedFileExists("target/filesplit/.camel/splitme.txt");
+
         String body = "line1\nline2\nline3";
         template.sendBodyAndHeader("file://target/filesplit", body, Exchange.FILE_NAME, "splitme.txt");