You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/06/20 11:08:21 UTC

svn commit: r669832 - in /servicemix/components/engines/servicemix-eip/trunk/src: main/java/org/apache/servicemix/eip/patterns/ test/java/org/apache/servicemix/eip/

Author: gnodet
Date: Fri Jun 20 02:08:21 2008
New Revision: 669832

URL: http://svn.apache.org/viewvc?rev=669832&view=rev
Log:
SM-454: Make the AsyncBridge asynchronous

Added:
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java
      - copied, changed from r669702, servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java?rev=669832&r1=669831&r2=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java Fri Jun 20 02:08:21 2008
@@ -172,76 +172,100 @@
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
      */
     public void process(MessageExchange exchange) throws Exception {
-        // Handle an exchange as a PROVIDER
+        // Three exchanges are involved: the first InOut will be called t0,
+        // the InOnly send will be called t1 and the InOnly received will be called t2
+
         if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
-            // receive the InOut request
-            //   => send the In to the target
+            // Step1: receive t0 as the first message
             if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
-                final String correlationId = (String) requestCorrId.evaluate(exchange, exchange.getMessage("in"));
+                MessageExchange t0 = exchange;
+                MessageExchange t1;
+                final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
                 if (correlationId == null || correlationId.length() == 0) {
                     throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
                 }
-                store.store(correlationId, exchange);
-                MessageExchange tme = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
-                                                      : getExchangeFactory().createInOnlyExchange();
-                target.configureTarget(tme, getContext());
-                MessageUtil.transferInToIn(exchange, tme);
-                tme.setProperty(responseCorrIdProperty, correlationId);
-                tme.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
-                sendSync(tme);
-                // an error
-                if (tme.getStatus() == ExchangeStatus.ERROR) {
-                    store.load(correlationId);
-                    fail(exchange, tme.getError());
-                    return;
-                // a fault ?
-                } else if (tme.getStatus() == ExchangeStatus.ACTIVE) {
-                    store.load(correlationId);
-                    MessageUtil.transferFaultToFault(tme, exchange);
-                    send(tme);
-                    done(tme);
-                    return;
-                // request sent
-                } else {
-                    Date exchangeTimeout = getTimeout(exchange);
-                    if (exchangeTimeout != null) {
-                        getTimerManager().schedule(new TimerListener() {
-                            public void timerExpired(Timer timer) {
-                                AsyncBridge.this.onTimeout(correlationId);
-                            }
-                        }, exchangeTimeout);
-                    }
-                }
-            // receive the done / error for the InOut request
+                store.store(correlationId + ".t0", t0);
+                t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
+                                     : getExchangeFactory().createInOnlyExchange();
+                target.configureTarget(t1, getContext());
+                MessageUtil.transferInToIn(t0, t1);
+                t1.setProperty(responseCorrIdProperty, correlationId);
+                t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
+                send(t1);
+            // Receive the done / error from t0
             } else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
-                // ignore these exchanges
-            // Receive the response
-            } else if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
-                final String correlationId = (String) responseCorrId.evaluate(exchange, exchange.getMessage("in"));
+                MessageExchange t0 = exchange;
+                MessageExchange t1;
+                MessageExchange t2;
+                final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
+                t1 = (MessageExchange) store.load(correlationId + ".t1");
+                t2 = (MessageExchange) store.load(correlationId + ".t2");
+                if (t1 != null) {
+                    done(t1);
+                }
+                if (t2 != null) {
+                    done(t2);
+                }
+            // Receive the response from t2
+            } else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchange t0;
+                MessageExchange t2 = exchange;
+                final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
                 if (correlationId == null || correlationId.length() == 0) {
                     throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
                 }
-                MessageExchange request = (MessageExchange) store.load(correlationId);
+                t0 = (MessageExchange) store.load(correlationId + ".t0");
+                store.store(correlationId + ".t2", t2);
                 // The request is found and has not timed out
-                if (request != null) {
-                    MessageUtil.transferInToOut(exchange, request);
-                    sendSync(request);
+                if (t0 != null) {
+                    MessageUtil.transferInToOut(t2, t0);
+                    send(t0);
                 }
-                done(exchange);
             } else {
                 throw new IllegalStateException();
             }
         // Handle an exchange as a CONSUMER
         } else {
-            throw new IllegalStateException();
+            // Step 2: receive t1 response
+            // If this is an error or a fault, transfer it from t1 to t0 and send,
+            // else, start a timeout to wait for t2
+            MessageExchange t1 = exchange;
+            // an error
+            final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
+            if (t1.getStatus() == ExchangeStatus.ERROR) {
+                MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+                // t1 response may come after t0, so in case this happens, we need to discard t1
+                if (t0 != null) {
+                    fail(t0, t1.getError());
+                }
+            // a fault ?
+            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+                // t1 response may come after t0, so in case this happens, we need to discard t1
+                if (t0 != null) {
+                    store.store(correlationId + ".t1", t1);
+                    MessageUtil.transferFaultToFault(t1, t0);
+                    send(t0);
+                }
+            // request sent successfully, start the timeout
+            } else {
+                Date exchangeTimeout = getTimeout(t1);
+                if (exchangeTimeout != null) {
+                    getTimerManager().schedule(new TimerListener() {
+                        public void timerExpired(Timer timer) {
+                            AsyncBridge.this.onTimeout(correlationId);
+                        }
+                    }, exchangeTimeout);
+                }
+            }
         }
     }
     
     protected void onTimeout(String correlationId) {
         try {
-            MessageExchange request = (MessageExchange) store.load(correlationId);
-            if (request != null) {
-                fail(request, new TimeoutException());
+            MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
+            if (t0 != null) {
+                fail(t0, new TimeoutException());
             }
         } catch (Exception e) {
             LOG.debug("Exception caught when handling timeout", e);

Copied: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java (from r669702, servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java)
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java?p2=servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java&p1=servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java&r1=669702&r2=669832&rev=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeStreamsTest.java Fri Jun 20 02:08:21 2008
@@ -16,62 +16,12 @@
  */
 package org.apache.servicemix.eip;
 
-import java.util.concurrent.TimeoutException;
+import org.apache.servicemix.id.IdGenerator;
+import org.apache.servicemix.store.memory.MemoryStore;
 
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOut;
-import javax.xml.namespace.QName;
+public class AsyncBridgeStreamsTest extends AsyncBridgeTest {
 
-import org.apache.servicemix.components.util.TraceComponent;
-import org.apache.servicemix.eip.patterns.AsyncBridge;
-import org.apache.servicemix.eip.patterns.WireTap;
-
-public class AsyncBridgeTest extends AbstractEIPTest {
-
-    protected AsyncBridge asyncBridge;
-    
-    protected void setUp() throws Exception {
-        super.setUp();
-
-        asyncBridge = new AsyncBridge();
-        asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
-        asyncBridge.setTimeout(2000);
-        configurePattern(asyncBridge);
-        activateComponent(asyncBridge, "asyncBridge");
-    }
-    
-    protected void configureContainer() throws Exception {
-    }
-    
-    public void testInOut() throws Exception {
-        WireTap wireTap = new WireTap();
-        wireTap.setCopyProperties(true);
-        wireTap.setTarget(createServiceExchangeTarget(new QName("asyncBridge")));
-        activateComponent(wireTap, "target");
-        
-        InOut me = client.createInOutExchange();
-        me.setService(new QName("asyncBridge"));
-        me.getInMessage().setContent(createSource("<hello/>"));
-        client.sendSync(me);
-        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
-        assertNotNull(me.getOutMessage());
-        client.done(me);
-        
-        Thread.sleep(100);
-    }
-    
-    public void testInOutWithTimeOut() throws Exception {
-        activateComponent(new TraceComponent(), "target");
-        
-        InOut me = client.createInOutExchange();
-        me.setService(new QName("asyncBridge"));
-        me.getInMessage().setContent(createSource("<hello/>"));
-        client.sendSync(me);
-        assertEquals(ExchangeStatus.ERROR, me.getStatus());
-        assertTrue(me.getError() instanceof TimeoutException);
-        
-        Thread.sleep(100);
+    protected void configurePattern(EIPEndpoint endpoint) {
+        endpoint.setStore(new MemoryStore(new IdGenerator()));
     }
-    
-    
-}
+}
\ No newline at end of file

Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java?rev=669832&r1=669831&r2=669832&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java Fri Jun 20 02:08:21 2008
@@ -19,36 +19,35 @@
 import java.util.concurrent.TimeoutException;
 
 import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
 import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
 import javax.xml.namespace.QName;
 
-import org.apache.servicemix.components.util.TraceComponent;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.eip.patterns.AsyncBridge;
-import org.apache.servicemix.eip.patterns.WireTap;
+import org.apache.servicemix.jbi.util.MessageUtil;
 
 public class AsyncBridgeTest extends AbstractEIPTest {
 
     protected AsyncBridge asyncBridge;
-    
+
     protected void setUp() throws Exception {
         super.setUp();
-
-        asyncBridge = new AsyncBridge();
-        asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
-        asyncBridge.setTimeout(2000);
-        configurePattern(asyncBridge);
-        activateComponent(asyncBridge, "asyncBridge");
     }
-    
+
     protected void configureContainer() throws Exception {
+        // Use seda flow, so don't call base class
     }
     
     public void testInOut() throws Exception {
-        WireTap wireTap = new WireTap();
-        wireTap.setCopyProperties(true);
-        wireTap.setTarget(createServiceExchangeTarget(new QName("asyncBridge")));
-        activateComponent(wireTap, "target");
-        
+        createAsyncbridge(false);
+
+        activateComponent(new TestComponent(false, false, true, false), "target");
+
         InOut me = client.createInOutExchange();
         me.setService(new QName("asyncBridge"));
         me.getInMessage().setContent(createSource("<hello/>"));
@@ -56,22 +55,167 @@
         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
         assertNotNull(me.getOutMessage());
         client.done(me);
-        
-        Thread.sleep(100);
     }
-    
+
+    public void testInOutWithError() throws Exception {
+        createAsyncbridge(false);
+
+        activateComponent(new TestComponent(false, true, false, false), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+    }
+
     public void testInOutWithTimeOut() throws Exception {
-        activateComponent(new TraceComponent(), "target");
-        
+        createAsyncbridge(false);
+
+        activateComponent(new TestComponent(false, false, false, false), "target");
+
         InOut me = client.createInOutExchange();
         me.setService(new QName("asyncBridge"));
         me.getInMessage().setContent(createSource("<hello/>"));
         client.sendSync(me);
         assertEquals(ExchangeStatus.ERROR, me.getStatus());
         assertTrue(me.getError() instanceof TimeoutException);
-        
-        Thread.sleep(100);
     }
-    
-    
+
+    public void testInOutWithDoneThenForward() throws Exception {
+        createAsyncbridge(false);
+
+        activateComponent(new TestComponent(false, false, false, true), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        client.done(me);
+    }
+
+    public void testInOutWithRobustInOnly() throws Exception {
+        createAsyncbridge(true);
+
+        activateComponent(new TestComponent(false, false, true, false), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        client.done(me);
+    }
+
+    public void testInOutWithRobustInOnlyAndFault() throws Exception {
+        createAsyncbridge(true);
+
+        activateComponent(new TestComponent(true, false, false, false), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getFault());
+        client.done(me);
+    }
+
+    public void testInOutWithRobustInOnlyAndError() throws Exception {
+        createAsyncbridge(true);
+
+        activateComponent(new TestComponent(false, true, false, false), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+    }
+
+    public void testInOutWithRobustInOnlyAndTimeout() throws Exception {
+        createAsyncbridge(true);
+
+        activateComponent(new TestComponent(false, false, false, false), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ERROR, me.getStatus());
+        assertTrue(me.getError() instanceof TimeoutException);
+    }
+
+    public void testInOutWithRobustInOnlyAndDoneThenForward() throws Exception {
+        createAsyncbridge(true);
+
+        activateComponent(new TestComponent(false, false, false, true), "target");
+
+        InOut me = client.createInOutExchange();
+        me.setService(new QName("asyncBridge"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.sendSync(me);
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getOutMessage());
+        client.done(me);
+    }
+
+    private void createAsyncbridge(boolean robustInOnly) throws Exception {
+        asyncBridge = new AsyncBridge();
+        asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
+        asyncBridge.setTimeout(1000);
+        asyncBridge.setUseRobustInOnly(robustInOnly);
+        configurePattern(asyncBridge);
+        activateComponent(asyncBridge, "asyncBridge");
+    }
+
+    private class TestComponent extends ComponentSupport implements MessageExchangeListener {
+        private boolean sendFault;
+        private boolean sendError;
+        private boolean forward;
+        private boolean doneThenForward;
+
+        public TestComponent(boolean sendFault, boolean sendError, boolean forward, boolean doneThenForward) {
+            this.sendFault = sendFault;
+            this.sendError = sendError;
+            this.forward = forward;
+            this.doneThenForward = doneThenForward;
+        }
+
+        public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+                if (sendFault) {
+                    Fault f = exchange.createFault();
+                    f.setContent(createSource("<fault/>"));
+                    fail(exchange, f);
+                } else if (sendError) {
+                    fail(exchange, new Exception());
+                } else if (forward) {
+                    MessageExchange e = createRobustInOnlyExchange(exchange);
+                    MessageUtil.transferInToIn(exchange, e);
+                    e.setService(new QName("asyncBridge"));
+                    sendSync(e);
+                    done(exchange);
+                } else if (doneThenForward) {
+                    NormalizedMessage in = MessageUtil.copyIn(exchange);
+                    done(exchange);
+                    try {
+                        Thread.sleep(500);
+                    } catch (InterruptedException e) {
+                        throw new MessagingException(e);
+                    }
+                    MessageExchange e = createRobustInOnlyExchange(exchange);
+                    MessageUtil.transferToIn(in, e);
+                    e.setService(new QName("asyncBridge"));
+                    sendSync(e);
+                } else {
+                    done(exchange);
+                }
+            }
+        }
+    }
+
 }