You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/05 13:37:18 UTC

svn commit: r1210432 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/support/ test/java/org/apache/camel/component/file/ test/java/org/apache/camel/processor/aggregator/

Author: davsclaus
Date: Mon Dec  5 12:37:17 2011
New Revision: 1210432

URL: http://svn.apache.org/viewvc?rev=1210432&view=rev
Log:
CAMEL-4742: TokenizePair as predicate must close input stream. Fixed some tests on slower boxes / Windows.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java Mon Dec  5 12:37:17 2011
@@ -24,6 +24,7 @@ import java.util.Scanner;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Predicate;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 
@@ -51,15 +52,42 @@ public class TokenPairExpressionIterator
     }
 
     @Override
+    public boolean matches(Exchange exchange) {
+        // as a predicate we must close the stream, as we do not return an iterator that can be used
+        // afterwards to iterate the input stream
+        Object value = doEvaluate(exchange, true);
+        return ObjectHelper.evaluateValuePredicate(value);
+    }
+
+    @Override
     public Object evaluate(Exchange exchange) {
+        // as we return an iterator to access the input stream, we should not close it
+        return doEvaluate(exchange, false);
+    }
+
+    /**
+     * Strategy to evaluate the exchange
+     *
+     * @param exchange   the exchange
+     * @param closeStream whether to close the stream before returning from this method.
+     * @return the evaluated value
+     */
+    protected Object doEvaluate(Exchange exchange, boolean closeStream) {
+        InputStream in = null;
         try {
-            InputStream in = exchange.getIn().getMandatoryBody(InputStream.class);
+            in = exchange.getIn().getMandatoryBody(InputStream.class);
             // we may read from a file, and want to support custom charset defined on the exchange
             String charset = IOHelper.getCharsetName(exchange);
             return createIterator(in, charset);
         } catch (InvalidPayloadException e) {
             exchange.setException(e);
+            // must close input stream
+            IOHelper.close(in);
             return null;
+        } finally {
+            if (closeStream) {
+                IOHelper.close(in);
+            }
         }
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java Mon Dec  5 12:37:17 2011
@@ -48,7 +48,7 @@ public class FileConsumerSuspendAndResum
 
         assertMockEndpointsSatisfied();
 
-        Thread.sleep(100);
+        Thread.sleep(250);
 
         // the route is suspended by the policy so we should only receive one
         String[] files = new File("target/suspended/").getAbsoluteFile().list();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java Mon Dec  5 12:37:17 2011
@@ -44,6 +44,8 @@ public class NewFileConsumerTest extends
         assertMockEndpointsSatisfied();
         oneExchangeDone.matchesMockWaitTime();
 
+        Thread.sleep(250);
+
         assertTrue("Should have invoked postPollCheck", myFile.isPost());
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Dec  5 12:37:17 2011
@@ -280,52 +280,6 @@ public class AggregateProcessorTest exte
         ap.stop();
     }
     
-    public void testAggregateInitialCompletionInterval() throws Exception {
-        // camel context must be started
-        context.start();
-
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("A+B", "C+D");
-        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
-
-        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
-        Expression corr = header("id");
-        AggregationStrategy as = new BodyInAggregatingStrategy();
-
-        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
-        ap.setCompletionInterval(2000);
-        ap.start();
-
-        Exchange e1 = new DefaultExchange(context);
-        e1.getIn().setBody("A");
-        e1.getIn().setHeader("id", 123);
-
-        Exchange e2 = new DefaultExchange(context);
-        e2.getIn().setBody("B");
-        e2.getIn().setHeader("id", 123);
-
-        Exchange e3 = new DefaultExchange(context);
-        e3.getIn().setBody("C");
-        e3.getIn().setHeader("id", 123);
-
-        Exchange e4 = new DefaultExchange(context);
-        e4.getIn().setBody("D");
-        e4.getIn().setHeader("id", 123);
-
-        ap.process(e1);
-        
-        Thread.sleep(1500L);
-        ap.process(e2);
-        
-        Thread.sleep(500L);
-        ap.process(e3);
-        ap.process(e4);
-
-        assertMockEndpointsSatisfied();
-
-        ap.stop();
-    }
-
     public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+C+END");