You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/05 13:37:18 UTC
svn commit: r1210432 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/support/
test/java/org/apache/camel/component/file/
test/java/org/apache/camel/processor/aggregator/
Author: davsclaus
Date: Mon Dec 5 12:37:17 2011
New Revision: 1210432
URL: http://svn.apache.org/viewvc?rev=1210432&view=rev
Log:
CAMEL-4742: TokenizePair as predicate must close input stream. Fixed some tests on slower boxes / Windows.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java Mon Dec 5 12:37:17 2011
@@ -24,6 +24,7 @@ import java.util.Scanner;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Predicate;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -51,15 +52,42 @@ public class TokenPairExpressionIterator
}
@Override
+ public boolean matches(Exchange exchange) {
+ // as a predicate we must close the stream, as we do not return an iterator that can be used
+ // afterwards to iterate the input stream
+ Object value = doEvaluate(exchange, true);
+ return ObjectHelper.evaluateValuePredicate(value);
+ }
+
+ @Override
public Object evaluate(Exchange exchange) {
+ // as we return an iterator to access the input stream, we should not close it
+ return doEvaluate(exchange, false);
+ }
+
+ /**
+ * Strategy to evaluate the exchange
+ *
+ * @param exchange the exchange
+ * @param closeStream whether to close the stream before returning from this method.
+ * @return the evaluated value
+ */
+ protected Object doEvaluate(Exchange exchange, boolean closeStream) {
+ InputStream in = null;
try {
- InputStream in = exchange.getIn().getMandatoryBody(InputStream.class);
+ in = exchange.getIn().getMandatoryBody(InputStream.class);
// we may read from a file, and want to support custom charset defined on the exchange
String charset = IOHelper.getCharsetName(exchange);
return createIterator(in, charset);
} catch (InvalidPayloadException e) {
exchange.setException(e);
+ // must close input stream
+ IOHelper.close(in);
return null;
+ } finally {
+ if (closeStream) {
+ IOHelper.close(in);
+ }
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSuspendAndResumeTest.java Mon Dec 5 12:37:17 2011
@@ -48,7 +48,7 @@ public class FileConsumerSuspendAndResum
assertMockEndpointsSatisfied();
- Thread.sleep(100);
+ Thread.sleep(250);
// the route is suspended by the policy so we should only receive one
String[] files = new File("target/suspended/").getAbsoluteFile().list();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/NewFileConsumerTest.java Mon Dec 5 12:37:17 2011
@@ -44,6 +44,8 @@ public class NewFileConsumerTest extends
assertMockEndpointsSatisfied();
oneExchangeDone.matchesMockWaitTime();
+ Thread.sleep(250);
+
assertTrue("Should have invoked postPollCheck", myFile.isPost());
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=1210432&r1=1210431&r2=1210432&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Dec 5 12:37:17 2011
@@ -280,52 +280,6 @@ public class AggregateProcessorTest exte
ap.stop();
}
- public void testAggregateInitialCompletionInterval() throws Exception {
- // camel context must be started
- context.start();
-
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("A+B", "C+D");
- mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
-
- Processor done = new SendProcessor(context.getEndpoint("mock:result"));
- Expression corr = header("id");
- AggregationStrategy as = new BodyInAggregatingStrategy();
-
- AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
- ap.setCompletionInterval(2000);
- ap.start();
-
- Exchange e1 = new DefaultExchange(context);
- e1.getIn().setBody("A");
- e1.getIn().setHeader("id", 123);
-
- Exchange e2 = new DefaultExchange(context);
- e2.getIn().setBody("B");
- e2.getIn().setHeader("id", 123);
-
- Exchange e3 = new DefaultExchange(context);
- e3.getIn().setBody("C");
- e3.getIn().setHeader("id", 123);
-
- Exchange e4 = new DefaultExchange(context);
- e4.getIn().setBody("D");
- e4.getIn().setHeader("id", 123);
-
- ap.process(e1);
-
- Thread.sleep(1500L);
- ap.process(e2);
-
- Thread.sleep(500L);
- ap.process(e3);
- ap.process(e4);
-
- assertMockEndpointsSatisfied();
-
- ap.stop();
- }
-
public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("A+C+END");