You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/01/07 17:01:34 UTC
svn commit: r732378 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/
test/java/org/apache/camel/processor/onexception/
Author: davsclaus
Date: Wed Jan 7 08:01:32 2009
New Revision: 732378
URL: http://svn.apache.org/viewvc?rev=732378&view=rev
Log:
CAMEL:1233: Pipeline shoud honor MEP
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java (contents, props changed)
- copied, changed from r732302, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java (contents, props changed)
- copied, changed from r732266, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=732378&r1=732377&r2=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Wed Jan 7 08:01:32 2009
@@ -203,7 +203,13 @@
// as the final processor on a pipeline might not
// have created any OUT; such as a mock:endpoint
// so lets assume the last IN is the OUT
- result.getOut(true).copyFrom(source.getIn());
+ if (result.getPattern().isOutCapable()) {
+ // only set OUT if its OUT capable
+ result.getOut(true).copyFrom(source.getIn());
+ } else {
+ // if not replace IN instead to keep the MEP
+ result.getIn().copyFrom(source.getIn());
+ }
}
result.getProperties().clear();
result.getProperties().putAll(source.getProperties());
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=732378&r1=732377&r2=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java Wed Jan 7 08:01:32 2009
@@ -50,7 +50,8 @@
url = "direct:sequential";
}
- Exchange exchange = template.send(url, new Processor() {
+ // use InOut
+ Exchange exchange = template.request(url, new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("input");
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java (from r732302, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java&r1=732302&r2=732378&rev=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java Wed Jan 7 08:01:32 2009
@@ -18,146 +18,90 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
/**
+ * Unit test for pipeline keeping the MEP (CAMEL-1233)
+ *
* @version $Revision$
*/
-public class PipelineTest extends ContextTestSupport {
-
- /**
- * Simple processor the copies the in to the out and increments a counter.
- * Used to verify that the pipeline actually takes the output of one stage of
- * the pipe and feeds it in as input into the next stage.
- */
- private final class InToOut implements Processor {
- public void process(Exchange exchange) throws Exception {
- exchange.getOut(true).copyFrom(exchange.getIn());
- Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
- if (counter == null) {
- counter = 0;
- }
- exchange.getOut().setHeader("copy-counter", counter + 1);
- }
- }
+public class PipelineMEPTest extends ContextTestSupport {
- /**
- * Simple processor the copies the in to the fault and increments a counter.
- */
- private final class InToFault implements Processor {
- public void process(Exchange exchange) throws Exception {
- exchange.getFault(true).setBody(exchange.getIn().getBody());
- Integer counter = exchange.getIn().getHeader("copy-counter", Integer.class);
- if (counter == null) {
- counter = 0;
- }
- exchange.getFault().setHeader("copy-counter", counter + 1);
- }
- }
+ public void testInOnly() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived(3);
- protected MockEndpoint resultEndpoint;
+ Exchange exchange = context.getEndpoint("direct:a").createExchange(ExchangePattern.InOnly);
+ exchange.getIn().setBody(1);
- public void testSendMessageThroughAPipeline() throws Exception {
- resultEndpoint.expectedBodiesReceived(4);
-
- Exchange results = template.send("direct:a", new Processor() {
- public void process(Exchange exchange) {
- // now lets fire in a message
- Message in = exchange.getIn();
- in.setBody(1);
- in.setHeader("foo", "bar");
- }
- });
+ Exchange out = template.send("direct:a", exchange);
+ assertNotNull(out);
+ assertEquals(ExchangePattern.InOnly, out.getPattern());
- resultEndpoint.assertIsSatisfied();
+ assertMockEndpointsSatisfied();
- assertEquals("Result body", 4, results.getOut().getBody());
+ // should keep MEP as InOnly
+ assertEquals(ExchangePattern.InOnly, mock.getExchanges().get(0).getPattern());
}
-
- public void testResultsReturned() throws Exception {
- Exchange exchange = template.send("direct:b", new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("Hello World");
- }
- });
-
- assertEquals("Hello World", exchange.getOut().getBody());
- assertEquals(3, exchange.getOut().getHeader("copy-counter"));
- }
+ public void testInOut() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived(3);
- /**
- * Disabled for now until we figure out fault processing in the pipeline.
- *
- * @throws Exception
- */
- public void testFaultStopsPipeline() throws Exception {
- Exchange exchange = template.send("direct:c", new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("Fault Message");
- }
- });
-
- // Check the fault..
- assertEquals("Fault Message", exchange.getFault().getBody());
- assertEquals(2, exchange.getFault().getHeader("copy-counter"));
-
- // Check the out Message.. It should have only been processed once.
- // since the fault should stop it from going to the next process.
- assertEquals(1, exchange.getOut().getHeader("copy-counter"));
- }
+ Exchange exchange = context.getEndpoint("direct:a").createExchange(ExchangePattern.InOut);
+ exchange.getIn().setBody(1);
- public void testOnlyProperties() {
- Exchange exchange = template.send("direct:b", new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setHeader("header", "headerValue");
- }
- });
-
- assertEquals("headerValue", exchange.getOut().getHeader("header"));
- assertEquals(3, exchange.getOut().getHeader("copy-counter"));
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
+ Exchange out = template.send("direct:a", exchange);
+ assertNotNull(out);
+ assertEquals(ExchangePattern.InOut, out.getPattern());
+
+ assertMockEndpointsSatisfied();
- resultEndpoint = getMockEndpoint("mock:result");
+ // should keep MEP as InOut
+ assertEquals(ExchangePattern.InOut, mock.getExchanges().get(0).getPattern());
}
protected RouteBuilder createRouteBuilder() {
- final Processor processor = new Processor() {
+ final Processor inProcessor = new Processor() {
+ public void process(Exchange exchange) {
+ Integer number = exchange.getIn().getBody(Integer.class);
+ if (number == null) {
+ number = 0;
+ }
+ number = number + 1;
+ exchange.getIn().setBody(number);
+ }
+ };
+
+ final Processor outProcessor = new Processor() {
public void process(Exchange exchange) {
Integer number = exchange.getIn().getBody(Integer.class);
if (number == null) {
number = 0;
}
- // todo set the endpoint name we were received from
- //exchange.setProperty(exchange.get);
number = number + 1;
+ // this is a bit evil we let you set on OUT body even if the MEP is InOnly
+ // however the result after the routing is correct using APIs to get the result
+ // however the exchange will carry body IN and OUT when the route completes, as
+ // we operate on the original exchange in this processor
+ // (= we are the first node in the route after the from conumer)
exchange.getOut().setBody(number);
}
};
return new RouteBuilder() {
public void configure() {
- // START SNIPPET: example
- from("direct:a").pipeline("direct:x", "direct:y", "direct:z", "mock:result");
- // END SNIPPET: example
-
- from("direct:x").process(processor);
- from("direct:y").process(processor);
- from("direct:z").process(processor);
-
- // Create a route that uses the InToOut processor 3 times. the copy-counter header should be == 3
- from("direct:b").process(new InToOut()).process(new InToOut()).process(new InToOut());
- // Create a route that uses the InToFault processor.. the last InToOut will not be called since the Fault occurs before.
- from("direct:c").process(new InToOut()).process(new InToFault()).process(new InToOut());
+ from("direct:a")
+ .process(outProcessor)
+ // this pipeline is not really needed by to have some more routing in there to test with
+ .pipeline("direct:x", "direct:y")
+ .process(inProcessor)
+ .to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineMEPTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java?rev=732378&r1=732377&r2=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/PipelineTest.java Wed Jan 7 08:01:32 2009
@@ -63,7 +63,7 @@
public void testSendMessageThroughAPipeline() throws Exception {
resultEndpoint.expectedBodiesReceived(4);
- Exchange results = template.send("direct:a", new Processor() {
+ Exchange results = template.request("direct:a", new Processor() {
public void process(Exchange exchange) {
// now lets fire in a message
Message in = exchange.getIn();
@@ -79,7 +79,7 @@
public void testResultsReturned() throws Exception {
- Exchange exchange = template.send("direct:b", new Processor() {
+ Exchange exchange = template.request("direct:b", new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setBody("Hello World");
}
@@ -95,7 +95,7 @@
* @throws Exception
*/
public void testFaultStopsPipeline() throws Exception {
- Exchange exchange = template.send("direct:c", new Processor() {
+ Exchange exchange = template.request("direct:c", new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setBody("Fault Message");
}
@@ -111,7 +111,7 @@
}
public void testOnlyProperties() {
- Exchange exchange = template.send("direct:b", new Processor() {
+ Exchange exchange = template.request("direct:b", new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setHeader("header", "headerValue");
}
@@ -124,7 +124,6 @@
@Override
protected void setUp() throws Exception {
super.setUp();
-
resultEndpoint = getMockEndpoint("mock:result");
}
@@ -135,8 +134,6 @@
if (number == null) {
number = 0;
}
- // todo set the endpoint name we were received from
- //exchange.setProperty(exchange.get);
number = number + 1;
exchange.getOut().setBody(number);
}
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java?rev=732378&r1=732377&r2=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java Wed Jan 7 08:01:32 2009
@@ -38,6 +38,7 @@
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob");
+ // InOnly
template.send("direct:seqential", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
@@ -61,7 +62,7 @@
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
resultEndpoint.expectedBodiesReceived("James", "Guillaume", "Hiram", "Rob", "Roman");
- Exchange result = template.send("direct:seqential", new Processor() {
+ Exchange result = template.request("direct:seqential", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("James,Guillaume,Hiram,Rob,Roman");
@@ -77,7 +78,7 @@
}
public void testEmptyBody() {
- Exchange result = template.send("direct:seqential", new Processor() {
+ Exchange result = template.request("direct:seqential", new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("foo", "bar");
}
@@ -92,6 +93,7 @@
resultEndpoint.expectsNoDuplicates(body());
resultEndpoint.expectedMessageCount(4);
+ // InOnly
template.send("direct:parallel", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
@@ -124,7 +126,7 @@
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
resultEndpoint.expectedMessageCount(5);
- Exchange result = template.send("direct:parallel", new Processor() {
+ Exchange result = template.request("direct:parallel", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("James,Guillaume,Hiram,Rob,Roman");
@@ -144,7 +146,7 @@
resultEndpoint.expectedMessageCount(5);
resultEndpoint.expectedBodiesReceivedInAnyOrder("James", "Guillaume", "Hiram", "Rob", "Roman");
- Exchange result = template.send("direct:parallel-streaming", new Processor() {
+ Exchange result = template.request("direct:parallel-streaming", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("James,Guillaume,Hiram,Rob,Roman");
@@ -164,7 +166,7 @@
resultEndpoint.expectedMessageCount(5);
resultEndpoint.expectedHeaderReceived("foo", "bar");
- Exchange result = template.send("direct:streaming", new Processor() {
+ template.request("direct:streaming", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("James,Guillaume,Hiram,Rob,Roman");
@@ -190,7 +192,7 @@
failedEndpoint.expectedMessageCount(1);
failedEndpoint.expectedHeaderReceived("foo", "bar");
- Exchange result = template.send("direct:exception", new Processor() {
+ Exchange result = template.request("direct:exception", new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody("James,Guillaume,Hiram,Rob,Exception");
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java (from r732266, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java&r1=732266&r2=732378&rev=732378&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java Wed Jan 7 08:01:32 2009
@@ -16,73 +16,83 @@
*/
package org.apache.camel.processor.onexception;
-import org.apache.camel.Body;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeException;
-import org.apache.camel.Header;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.component.mock.MockEndpoint;
/**
- * Unit test for the retry until predicate
+ * Unit test for testing possibility to modify exchange before redelivering
*/
-public class OnExceptionRetryUntilTest extends ContextTestSupport {
+public class InterceptAlterMessageBeforeRedeliveryTest extends ContextTestSupport {
- private static int invoked;
+ static int counter;
+
+ public void testInterceptAlterMessageBeforeRedelivery() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World123");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testInterceptAlterMessageWithHeadersBeforeRedelivery() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World123");
+ mock.expectedHeaderReceived("foo", "123");
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123");
+
+ assertMockEndpointsSatisfied();
+ }
@Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry jndi = super.createRegistry();
- jndi.bind("myRetryHandler", new MyRetryBean());
- return jndi;
+ protected void setUp() throws Exception {
+ super.setUp();
+ counter = 0;
}
- public void testRetryUntil() throws Exception {
- context.addRoutes(new RouteBuilder() {
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
@Override
public void configure() throws Exception {
- errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0));
+ // to execute unit test much faster we dont use delay between redeliveries
+ errorHandler(deadLetterChannel("mock:error").delay(0L));
// START SNIPPET: e1
- // we want to use a predicate for retries so we can determine in our bean
- // when retry should stop
- onException(MyFunctionalException.class)
- .retryUntil(bean("myRetryHandler"))
- .handled(true)
- .transform().constant("Sorry");
+ // we configure an interceptor that is triggered when the redelivery flag
+ // has been set on an exchange
+ intercept().when(header("org.apache.camel.Redelivered").isNotNull()).
+ process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // the message is being redelivered so we can alter it
+
+ // we just append the redelivery counter to the body
+ // you can of course do all kind of stuff instead
+ String body = exchange.getIn().getBody(String.class);
+ int count = exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
+
+ exchange.getIn().setBody(body + count);
+ }
+ });
// END SNIPPET: e1
+
from("direct:start").process(new Processor() {
public void process(Exchange exchange) throws Exception {
- throw new MyFunctionalException("Sorry you cannot do this");
+ // force some error so Camel will do redelivery
+ if (++counter <= 3) {
+ throw new MyTechnicalException("Forced by unit test");
+ }
}
- });
- }
- });
+ }).to("mock:result");
- Object out = template.requestBody("direct:start", "Hello World");
- assertEquals("Sorry", out);
- assertEquals(3, invoked);
+ }
+ };
}
- // START SNIPPET: e2
- public class MyRetryBean {
-
- // using bean binding we can bind the information from the exchange to the types we have in our method signature
- public boolean retryUntil(@Header(name = DeadLetterChannel.REDELIVERY_COUNTER) Integer counter, @Body String body, @ExchangeException Exception causedBy) {
- // NOTE: counter is the redelivery attempt, will start from 1
- invoked++;
-
- assertEquals("Hello World", body);
- assertTrue(causedBy instanceof MyFunctionalException);
-
- // we can of course do what ever we want to determine the result but this is a unit test so we end after 3 attempts
- return counter < 3;
- }
- }
- // END SNIPPET: e2
}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/InterceptAlterMessageBeforeRedeliveryTest.java
------------------------------------------------------------------------------
svn:mergeinfo =