You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2009/05/27 11:38:16 UTC

svn commit: r779072 - in /servicemix/components/engines/servicemix-camel/trunk/src: main/java/org/apache/servicemix/camel/ test/java/org/apache/servicemix/camel/

Author: gertv
Date: Wed May 27 09:38:16 2009
New Revision: 779072

URL: http://svn.apache.org/viewvc?rev=779072&view=rev
Log:
SMXCOMP-495: Allow Camel Exchange to outlive the underlying JBI MessageExchange

Modified:
    servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
    servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java
    servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java
    servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java
    servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java

Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/CamelProviderEndpoint.java Wed May 27 09:38:16 2009
@@ -20,6 +20,7 @@
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.RobustInOnly;
 import javax.xml.namespace.QName;
 
@@ -103,9 +104,9 @@
                     } else {
                         e = new Exception(t);
                     }
-                    fail(exchange, e);
+                    fail(camelExchange, e);
                 } else {
-                    done(exchange);
+                    done(camelExchange);
                 }
             } else {
                 if (logger.isDebugEnabled()) {
@@ -124,16 +125,16 @@
                     } else {
                         e = new Exception(t);
                     }
-                    fail(exchange, e);
+                    fail(camelExchange, e);
                 } else {
                     boolean txSync = exchange.isTransacted() && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
 //                    if (camelExchange.getOut(false) == null) {
 //                        camelExchange.getOut().copyFrom(camelExchange.getIn());
 //                    }
                     if (txSync) {
-                        sendSync(exchange);
+                        sendSync(camelExchange);
                     } else {
-                        send(exchange);
+                        send(camelExchange);
                     }
                 }
             }
@@ -143,4 +144,31 @@
         }
     }
 
+    /*
+     * Send the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+     */
+    private void send(JbiExchange camelExchange) throws MessagingException {
+        send(camelExchange.detach());
+    }
+
+    /*
+     * Synchronously send the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+     */
+    private void sendSync(JbiExchange camelExchange) throws MessagingException {
+        sendSync(camelExchange.detach());
+    }
+
+    /*
+     * Send a DONE status for the underlying JBI MessageExchange and detach it from the Camel JbiExchange
+     */
+    private void done(JbiExchange camelExchange) throws MessagingException {
+        done(camelExchange.detach());
+    }
+
+    /*
+     * Make the underlying JBI MessageExchange fail and detach it from the Camel JbiExchange
+     */
+    private void fail(JbiExchange camelExchange, Exception e) throws MessagingException {
+        fail(camelExchange.detach(), e);
+    }
 }

Modified: servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/main/java/org/apache/servicemix/camel/JbiExchange.java Wed May 27 09:38:16 2009
@@ -45,13 +45,10 @@
         populateProperties();
     }
 
-
-
     public JbiExchange(CamelContext context, JbiBinding binding, MessageExchange messageExchange) {
         super(context);
         this.binding = binding;
         this.messageExchange = messageExchange;
-
         setPattern(ExchangePattern.fromWsdlUri(messageExchange.getPattern().toString()));
         populateProperties();
     }
@@ -82,8 +79,12 @@
     }
 
     @Override
-    public org.apache.camel.Exchange newInstance() {        
-        return new JbiExchange(this.getContext(), this.getBinding(), this.getMessageExchange());
+    public org.apache.camel.Exchange newInstance() {    
+        if (messageExchange == null) {
+            return new JbiExchange(this.getContext(), this.getBinding());
+        } else {
+            return new JbiExchange(this.getContext(), this.getBinding(), this.getMessageExchange());
+        }
     }
     
     @Override
@@ -197,4 +198,11 @@
         }
     }
 
+    public MessageExchange detach() {
+        try {
+            return messageExchange;
+        } finally {
+            messageExchange = null;
+        }
+    }
 }

Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiExchangeTest.java Wed May 27 09:38:16 2009
@@ -27,6 +27,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.servicemix.jbi.helper.MessageExchangePattern;
+import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.messaging.FaultImpl;
 import org.apache.servicemix.tck.mock.MockMessageExchange;
 import org.apache.servicemix.tck.mock.MockNormalizedMessage;
@@ -87,6 +88,22 @@
         assertSame(fault, exchange.getFaultMessage());
     }
     
+    public void testDetach() throws Exception {
+        MessageExchange exchange = createMockExchange();
+        StringSource body = new StringSource("<question>Will this still be there?</question>");
+        exchange.setMessage(exchange.createMessage(), "in");
+        exchange.getMessage("in").setContent(body);
+        exchange.getMessage("in").setProperty("key", "value");
+        JbiExchange camelExchange = new JbiExchange(new DefaultCamelContext(), new JbiBinding(), exchange);
+        assertEquals(body, camelExchange.getIn().getBody());
+        // now detach the Camel Exchange from the underlying JBI MessageExchange
+        assertSame(exchange, camelExchange.detach());
+        assertNull(camelExchange.getMessageExchange());
+        // and make sure that all the data is still there
+        assertEquals(body, camelExchange.getIn().getBody());
+        assertEquals("value", camelExchange.getIn().getHeader("key"));
+    }
+    
     private MessageExchange createMockExchange() {
         return new MockMessageExchange() {
             @Override

Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOnlyTest.java Wed May 27 09:38:16 2009
@@ -25,12 +25,12 @@
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.converter.jaxp.StringSource;
 import org.apache.camel.Processor;
 import org.apache.camel.Exchange;
 import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.client.ServiceMixClient;
 import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.jaxp.StringSource;
 
 /**
  * Tests on handling JBI InOnly exchanges by Camel 
@@ -48,6 +48,20 @@
         exchange.setService(new QName("urn:test", "in-only"));
         exchange.getInMessage().setContent(new StringSource(MESSAGE));
         client.send(exchange);
+        client.receive(1000);
+        assertEquals(ExchangeStatus.DONE, exchange.getStatus());
+        done.assertIsSatisfied();
+    }
+    
+    public void testInOnlyExchangeForwardAndConvertBody() throws Exception {
+        MockEndpoint done = getMockEndpoint("mock:done");
+        done.expectedBodiesReceived(MESSAGE);
+        
+        ServiceMixClient client = new DefaultServiceMixClient(jbiContainer);
+        InOnly exchange = client.createInOnlyExchange();
+        exchange.setService(new QName("urn:test", "forward"));
+        exchange.getInMessage().setContent(new StringSource(MESSAGE));
+        client.sendSync(exchange);
         
         done.assertIsSatisfied();
     }
@@ -69,6 +83,19 @@
         client.sendSync(exchange);
         assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
     }
+    
+    public void testInOnlyToAggregator() throws Exception {
+        ServiceMixClient smxClient = getServicemixClient();
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        for (int i = 0; i < 50; i++) {
+            InOnly exchange = smxClient.createInOnlyExchange();
+            exchange.setService(new QName("urn:test", "in-only-aggregator"));
+            exchange.getInMessage().setProperty("key", "aggregate-this");
+            exchange.getInMessage().setContent(new StringSource("<request>Could you please aggregate this?</request>"));
+            smxClient.send(exchange);
+        }
+        getMockEndpoint("mock:aggregated").assertIsSatisfied();
+    }
 
     @Override
     protected void appendJbiActivationSpecs(List<ActivationSpec> activationSpecList) {
@@ -81,14 +108,18 @@
 
             @Override
             public void configure() throws Exception {
+                from("jbi:service:urn:test:forward").to("jbi:service:urn:test:in-only?mep=in-only");
                 from("jbi:service:urn:test:in-only").convertBodyTo(String.class).to("mock:done");
                 from("jbi:service:urn:test:in-only-error").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         throw new Exception("Error");
                     }
                 });
+                from("jbi:service:urn:test:in-only-aggregator")
+                    .aggregator(header("key"))
+                    .setHeader("aggregated").constant(true)
+                    .to("mock:aggregated");
             }
-            
         };
     }
 }

Modified: servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java?rev=779072&r1=779071&r2=779072&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java (original)
+++ servicemix/components/engines/servicemix-camel/trunk/src/test/java/org/apache/servicemix/camel/JbiInOptionalOutCamelTest.java Wed May 27 09:38:16 2009
@@ -39,6 +39,8 @@
         client.sendBody("direct:in-optional-out", new StringSource("<request>Does this MEP confuse you?</request>"));
         
         inonly.assertIsSatisfied();
+        // let's wait for a moment to ensure that all pending Exchanges are handled
+        Thread.sleep(500);
     }
     
     public void testInOptionalOutFromJbiWithDone() throws Exception {