You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/06/20 11:08:21 UTC
svn commit: r669832 - in
/servicemix/components/engines/servicemix-eip/trunk/src:
main/java/org/apache/servicemix/eip/patterns/
test/java/org/apache/servicemix/eip/
Author: gnodet
Date: Fri Jun 20 02:08:21 2008
New Revision: 669832
URL: http://svn.apache.org/viewvc?rev=669832&view=rev
Log:
SM-454: Make the AsyncBridge asynchronous
Added:
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java
- copied, changed from r669702, servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java?rev=669832&r1=669831&r2=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java Fri Jun 20 02:08:21 2008
@@ -172,76 +172,100 @@
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
*/
public void process(MessageExchange exchange) throws Exception {
- // Handle an exchange as a PROVIDER
+ // Three exchanges are involved: the first InOut will be called t0,
+ // the InOnly send will be called t1 and the InOnly received will be called t2
+
if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
- // receive the InOut request
- // => send the In to the target
+ // Step1: receive t0 as the first message
if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
- final String correlationId = (String) requestCorrId.evaluate(exchange, exchange.getMessage("in"));
+ MessageExchange t0 = exchange;
+ MessageExchange t1;
+ final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
if (correlationId == null || correlationId.length() == 0) {
throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
}
- store.store(correlationId, exchange);
- MessageExchange tme = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
- : getExchangeFactory().createInOnlyExchange();
- target.configureTarget(tme, getContext());
- MessageUtil.transferInToIn(exchange, tme);
- tme.setProperty(responseCorrIdProperty, correlationId);
- tme.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
- sendSync(tme);
- // an error
- if (tme.getStatus() == ExchangeStatus.ERROR) {
- store.load(correlationId);
- fail(exchange, tme.getError());
- return;
- // a fault ?
- } else if (tme.getStatus() == ExchangeStatus.ACTIVE) {
- store.load(correlationId);
- MessageUtil.transferFaultToFault(tme, exchange);
- send(tme);
- done(tme);
- return;
- // request sent
- } else {
- Date exchangeTimeout = getTimeout(exchange);
- if (exchangeTimeout != null) {
- getTimerManager().schedule(new TimerListener() {
- public void timerExpired(Timer timer) {
- AsyncBridge.this.onTimeout(correlationId);
- }
- }, exchangeTimeout);
- }
- }
- // receive the done / error for the InOut request
+ store.store(correlationId + ".t0", t0);
+ t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
+ : getExchangeFactory().createInOnlyExchange();
+ target.configureTarget(t1, getContext());
+ MessageUtil.transferInToIn(t0, t1);
+ t1.setProperty(responseCorrIdProperty, correlationId);
+ t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
+ send(t1);
+ // Receive the done / error from t0
} else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
- // ignore these exchanges
- // Receive the response
- } else if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
- final String correlationId = (String) responseCorrId.evaluate(exchange, exchange.getMessage("in"));
+ MessageExchange t0 = exchange;
+ MessageExchange t1;
+ MessageExchange t2;
+ final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
+ t1 = (MessageExchange) store.load(correlationId + ".t1");
+ t2 = (MessageExchange) store.load(correlationId + ".t2");
+ if (t1 != null) {
+ done(t1);
+ }
+ if (t2 != null) {
+ done(t2);
+ }
+ // Receive the response from t2
+ } else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchange t0;
+ MessageExchange t2 = exchange;
+ final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
if (correlationId == null || correlationId.length() == 0) {
throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
}
- MessageExchange request = (MessageExchange) store.load(correlationId);
+ t0 = (MessageExchange) store.load(correlationId + ".t0");
+ store.store(correlationId + ".t2", t2);
// The request is found and has not timed out
- if (request != null) {
- MessageUtil.transferInToOut(exchange, request);
- sendSync(request);
+ if (t0 != null) {
+ MessageUtil.transferInToOut(t2, t0);
+ send(t0);
}
- done(exchange);
} else {
throw new IllegalStateException();
}
// Handle an exchange as a CONSUMER
} else {
- throw new IllegalStateException();
+ // Step 2: receive t1 response
+ // If this is an error or a fault, transfer it from t1 to t0 and send,
+ // else, start a timeout to wait for t2
+ MessageExchange t1 = exchange;
+ // an error
+ final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
+ if (t1.getStatus() == ExchangeStatus.ERROR) {
+ MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+ // t1 response may come after t0, so in case this happens, we need to discard t1
+ if (t0 != null) {
+ fail(t0, t1.getError());
+ }
+ // a fault ?
+ } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+ // t1 response may come after t0, so in case this happens, we need to discard t1
+ if (t0 != null) {
+ store.store(correlationId + ".t1", t1);
+ MessageUtil.transferFaultToFault(t1, t0);
+ send(t0);
+ }
+ // request sent successfully, start the timeout
+ } else {
+ Date exchangeTimeout = getTimeout(t1);
+ if (exchangeTimeout != null) {
+ getTimerManager().schedule(new TimerListener() {
+ public void timerExpired(Timer timer) {
+ AsyncBridge.this.onTimeout(correlationId);
+ }
+ }, exchangeTimeout);
+ }
+ }
}
}
protected void onTimeout(String correlationId) {
try {
- MessageExchange request = (MessageExchange) store.load(correlationId);
- if (request != null) {
- fail(request, new TimeoutException());
+ MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+ if (t0 != null) {
+ fail(t0, new TimeoutException());
}
} catch (Exception e) {
LOG.debug("Exception caught when handling timeout", e);
Copied: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java (from r669702, servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java)
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java?p2=servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java&p1=servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java&r1=669702&r2=669832&rev=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java Fri Jun 20 02:08:21 2008
@@ -16,62 +16,12 @@
*/
package org.apache.servicemix.eip;
-import java.util.concurrent.TimeoutException;
+import org.apache.servicemix.id.IdGenerator;
+import org.apache.servicemix.store.memory.MemoryStore;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOut;
-import javax.xml.namespace.QName;
+public class AsyncBridgeStreamsTest extends AsyncBridgeTest {
-import org.apache.servicemix.components.util.TraceComponent;
-import org.apache.servicemix.eip.patterns.AsyncBridge;
-import org.apache.servicemix.eip.patterns.WireTap;
-
-public class AsyncBridgeTest extends AbstractEIPTest {
-
- protected AsyncBridge asyncBridge;
-
- protected void setUp() throws Exception {
- super.setUp();
-
- asyncBridge = new AsyncBridge();
- asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
- asyncBridge.setTimeout(2000);
- configurePattern(asyncBridge);
- activateComponent(asyncBridge, "asyncBridge");
- }
-
- protected void configureContainer() throws Exception {
- }
-
- public void testInOut() throws Exception {
- WireTap wireTap = new WireTap();
- wireTap.setCopyProperties(true);
- wireTap.setTarget(createServiceExchangeTarget(new QName("asyncBridge")));
- activateComponent(wireTap, "target");
-
- InOut me = client.createInOutExchange();
- me.setService(new QName("asyncBridge"));
- me.getInMessage().setContent(createSource("<hello/>"));
- client.sendSync(me);
- assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
- assertNotNull(me.getOutMessage());
- client.done(me);
-
- Thread.sleep(100);
- }
-
- public void testInOutWithTimeOut() throws Exception {
- activateComponent(new TraceComponent(), "target");
-
- InOut me = client.createInOutExchange();
- me.setService(new QName("asyncBridge"));
- me.getInMessage().setContent(createSource("<hello/>"));
- client.sendSync(me);
- assertEquals(ExchangeStatus.ERROR, me.getStatus());
- assertTrue(me.getError() instanceof TimeoutException);
-
- Thread.sleep(100);
+ protected void configurePattern(EIPEndpoint endpoint) {
+ endpoint.setStore(new MemoryStore(new IdGenerator()));
}
-
-
-}
+}
\ No newline at end of file
Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java?rev=669832&r1=669831&r2=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java Fri Jun 20 02:08:21 2008
@@ -19,36 +19,35 @@
import java.util.concurrent.TimeoutException;
import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
import javax.xml.namespace.QName;
-import org.apache.servicemix.components.util.TraceComponent;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.eip.patterns.AsyncBridge;
-import org.apache.servicemix.eip.patterns.WireTap;
+import org.apache.servicemix.jbi.util.MessageUtil;
public class AsyncBridgeTest extends AbstractEIPTest {
protected AsyncBridge asyncBridge;
-
+
protected void setUp() throws Exception {
super.setUp();
-
- asyncBridge = new AsyncBridge();
- asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
- asyncBridge.setTimeout(2000);
- configurePattern(asyncBridge);
- activateComponent(asyncBridge, "asyncBridge");
}
-
+
protected void configureContainer() throws Exception {
+ // Use seda flow, so don't call base class
}
public void testInOut() throws Exception {
- WireTap wireTap = new WireTap();
- wireTap.setCopyProperties(true);
- wireTap.setTarget(createServiceExchangeTarget(new QName("asyncBridge")));
- activateComponent(wireTap, "target");
-
+ createAsyncbridge(false);
+
+ activateComponent(new TestComponent(false, false, true, false), "target");
+
InOut me = client.createInOutExchange();
me.setService(new QName("asyncBridge"));
me.getInMessage().setContent(createSource("<hello/>"));
@@ -56,22 +55,167 @@
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertNotNull(me.getOutMessage());
client.done(me);
-
- Thread.sleep(100);
}
-
+
+ public void testInOutWithError() throws Exception {
+ createAsyncbridge(false);
+
+ activateComponent(new TestComponent(false, true, false, false), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ }
+
public void testInOutWithTimeOut() throws Exception {
- activateComponent(new TraceComponent(), "target");
-
+ createAsyncbridge(false);
+
+ activateComponent(new TestComponent(false, false, false, false), "target");
+
InOut me = client.createInOutExchange();
me.setService(new QName("asyncBridge"));
me.getInMessage().setContent(createSource("<hello/>"));
client.sendSync(me);
assertEquals(ExchangeStatus.ERROR, me.getStatus());
assertTrue(me.getError() instanceof TimeoutException);
-
- Thread.sleep(100);
}
-
-
+
+ public void testInOutWithDoneThenForward() throws Exception {
+ createAsyncbridge(false);
+
+ activateComponent(new TestComponent(false, false, false, true), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getOutMessage());
+ client.done(me);
+ }
+
+ public void testInOutWithRobustInOnly() throws Exception {
+ createAsyncbridge(true);
+
+ activateComponent(new TestComponent(false, false, true, false), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getOutMessage());
+ client.done(me);
+ }
+
+ public void testInOutWithRobustInOnlyAndFault() throws Exception {
+ createAsyncbridge(true);
+
+ activateComponent(new TestComponent(true, false, false, false), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getFault());
+ client.done(me);
+ }
+
+ public void testInOutWithRobustInOnlyAndError() throws Exception {
+ createAsyncbridge(true);
+
+ activateComponent(new TestComponent(false, true, false, false), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ }
+
+ public void testInOutWithRobustInOnlyAndTimeout() throws Exception {
+ createAsyncbridge(true);
+
+ activateComponent(new TestComponent(false, false, false, false), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ assertTrue(me.getError() instanceof TimeoutException);
+ }
+
+ public void testInOutWithRobustInOnlyAndDoneThenForward() throws Exception {
+ createAsyncbridge(true);
+
+ activateComponent(new TestComponent(false, false, false, true), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getOutMessage());
+ client.done(me);
+ }
+
+ private void createAsyncbridge(boolean robustInOnly) throws Exception {
+ asyncBridge = new AsyncBridge();
+ asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
+ asyncBridge.setTimeout(1000);
+ asyncBridge.setUseRobustInOnly(robustInOnly);
+ configurePattern(asyncBridge);
+ activateComponent(asyncBridge, "asyncBridge");
+ }
+
+ private class TestComponent extends ComponentSupport implements MessageExchangeListener {
+ private boolean sendFault;
+ private boolean sendError;
+ private boolean forward;
+ private boolean doneThenForward;
+
+ public TestComponent(boolean sendFault, boolean sendError, boolean forward, boolean doneThenForward) {
+ this.sendFault = sendFault;
+ this.sendError = sendError;
+ this.forward = forward;
+ this.doneThenForward = doneThenForward;
+ }
+
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (sendFault) {
+ Fault f = exchange.createFault();
+ f.setContent(createSource("<fault/>"));
+ fail(exchange, f);
+ } else if (sendError) {
+ fail(exchange, new Exception());
+ } else if (forward) {
+ MessageExchange e = createRobustInOnlyExchange(exchange);
+ MessageUtil.transferInToIn(exchange, e);
+ e.setService(new QName("asyncBridge"));
+ sendSync(e);
+ done(exchange);
+ } else if (doneThenForward) {
+ NormalizedMessage in = MessageUtil.copyIn(exchange);
+ done(exchange);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new MessagingException(e);
+ }
+ MessageExchange e = createRobustInOnlyExchange(exchange);
+ MessageUtil.transferToIn(in, e);
+ e.setService(new QName("asyncBridge"));
+ sendSync(e);
+ } else {
+ done(exchange);
+ }
+ }
+ }
+ }
+
}