You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2008/11/05 08:39:57 UTC

svn commit: r711523 - in /activemq/camel/trunk/components/camel-spring-integration: ./ src/main/java/org/apache/camel/component/spring/integration/ src/main/java/org/apache/camel/component/spring/integration/adapter/ src/main/java/org/apache/camel/comp...

Author: ningjiang
Date: Tue Nov  4 23:39:57 2008
New Revision: 711523

URL: http://svn.apache.org/viewvc?rev=711523&view=rev
Log:
CAMEL-1053 upgraded spring-integration to 1.0.0.RC1

Modified:
    activemq/camel/trunk/components/camel-spring-integration/pom.xml
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationConsumer.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationEndpoint.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/AbstractCamelAdapter.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/config/CamelTargetAdapterParser.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/converter/SpringIntegrationConverter.java
    activemq/camel/trunk/components/camel-spring-integration/src/main/resources/schema/camel-spring-integration.xsd
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/HelloWorldService.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationOneWayConsumerTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationProducerTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationTwoWayConsumerTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapterTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/ConfigurationTest.java
    activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
    activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/producer.xml
    activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/twoWayConsumer.xml

Modified: activemq/camel/trunk/components/camel-spring-integration/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/pom.xml?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/pom.xml (original)
+++ activemq/camel/trunk/components/camel-spring-integration/pom.xml Tue Nov  4 23:39:57 2008
@@ -13,7 +13,7 @@
 
   <properties>
 	<camel.osgi.export.pkg>org.apache.camel.component.spring.integration.*</camel.osgi.export.pkg>
-    <spring-integration-version>1.0.0.M6</spring-integration-version>
+    <spring-integration-version>1.0.0.RC1</spring-integration-version>
   </properties>
 
   <version>2.0-SNAPSHOT</version>

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java Tue Nov  4 23:39:57 2008
@@ -33,26 +33,26 @@
         // Helper class
     }
 
-    public static org.springframework.integration.message.Message createSpringIntegrationMessage(Exchange exchange) {
+    public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange) {
         return createSpringIntegrationMessage(exchange, null);
     }
 
     @SuppressWarnings("unchecked")
-    public static org.springframework.integration.message.Message createSpringIntegrationMessage(Exchange exchange, Map<String, Object> headers) {
+    public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange, Map<String, Object> headers) {
         org.apache.camel.Message message = exchange.getIn();
         GenericMessage siMessage = new GenericMessage(message.getBody(), headers);
         return siMessage;
     }
 
     @SuppressWarnings("unchecked")
-    public static org.springframework.integration.message.Message storeToSpringIntegrationMessage(org.apache.camel.Message message) {
+    public static org.springframework.integration.core.Message storeToSpringIntegrationMessage(org.apache.camel.Message message) {
         GenericMessage siMessage = new GenericMessage(message.getBody());
         return siMessage;
     }
 
-    public static void storeToCamelMessage(org.springframework.integration.message.Message siMessage, org.apache.camel.Message cMessage) {
+    public static void storeToCamelMessage(org.springframework.integration.core.Message siMessage, org.apache.camel.Message cMessage) {
         cMessage.setBody(siMessage.getPayload());
-        //TODO copy the message header
+        cMessage.setHeaders(siMessage.getHeaders());
     }
 
 }

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationConsumer.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationConsumer.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationConsumer.java Tue Nov  4 23:39:57 2008
@@ -19,13 +19,18 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.spring.SpringCamelContext;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.springframework.integration.channel.AbstractPollableChannel;
-import org.springframework.integration.channel.ChannelRegistry;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.config.MessageBusParser;
+import org.springframework.integration.channel.BeanFactoryChannelResolver;
+import org.springframework.integration.channel.ChannelResolver;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.message.MessageHandler;
 
 /**
  * A consumer of exchanges for the Spring Integration
@@ -35,12 +40,12 @@
  *
  * @version $Revision$
  */
-public class SpringIntegrationConsumer  extends ScheduledPollConsumer<SpringIntegrationExchange> {
+public class SpringIntegrationConsumer  extends DefaultConsumer<SpringIntegrationExchange> implements MessageHandler {
     private SpringCamelContext context;
-    private AbstractPollableChannel inputChannel;
+    private DirectChannel inputChannel;
     private MessageChannel outputChannel;
     private String inputChannelName;
-    private ChannelRegistry channelRegistry;
+    private ChannelResolver channelResolver;
     private SpringIntegrationEndpoint endpoint;
 
     public SpringIntegrationConsumer(SpringIntegrationEndpoint endpoint, Processor processor) {
@@ -48,20 +53,20 @@
         this.endpoint = endpoint;
         context = (SpringCamelContext) endpoint.getCamelContext();
         if (context != null && endpoint.getMessageChannel() == null) {
-            channelRegistry = (ChannelRegistry) context.getApplicationContext().getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
+            channelResolver = new BeanFactoryChannelResolver(context.getApplicationContext());
             inputChannelName = endpoint.getDefaultChannel();
             if (ObjectHelper.isNullOrBlank(inputChannelName)) {
                 inputChannelName = endpoint.getInputChannel();
             }
             if (!ObjectHelper.isNullOrBlank(inputChannelName)) {
-                inputChannel = (AbstractPollableChannel) channelRegistry.lookupChannel(inputChannelName);
+                inputChannel = (DirectChannel) channelResolver.resolveChannelName(inputChannelName);
                 ObjectHelper.notNull(inputChannel, "The inputChannel with the name [" + inputChannelName + "]");
             } else {
                 throw new RuntimeCamelException("Can't find the right inputChannelName, please check your configuration.");
             }
         } else {
             if (endpoint.getMessageChannel() != null) {
-                inputChannel = (AbstractPollableChannel)endpoint.getMessageChannel();
+                inputChannel = (DirectChannel)endpoint.getMessageChannel();
             } else {
                 throw new RuntimeCamelException("Can't find the right message channel, please check your configuration.");
             }
@@ -71,16 +76,30 @@
         }
 
     }
+    
+    protected void doStop() throws Exception {
+        inputChannel.unsubscribe(this);
+        super.doStop();
+    }
 
-    @Override
-    protected void poll() throws Exception {
-        org.springframework.integration.message.Message siInMessage = inputChannel.receive(this.getDelay());
+    protected void doStart() throws Exception {
+        super.doStart();
+        inputChannel.subscribe(this);
+    }
+    
+    public void handleMessage(org.springframework.integration.core.Message<?> siInMessage) {        
         SpringIntegrationExchange  exchange = getEndpoint().createExchange();
         exchange.setIn(new SpringIntegrationMessage(siInMessage));
-        getProcessor().process(exchange);
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            //TODO need to find a way to deal with this exception
+            //Now I just throw it out
+            throw new RuntimeCamelException(e);
+        }
         if (endpoint.isInOut()) {
             // get the output channel from message header
-            Object returnAddress = siInMessage.getHeaders().getReturnAddress();
+            Object returnAddress = siInMessage.getHeaders().getReplyChannel();
             MessageChannel reply = null;
 
             if (returnAddress != null) {
@@ -95,7 +114,7 @@
                     reply = outputChannel;
                 } else {
                     if (ObjectHelper.isNullOrBlank(endpoint.getOutputChannel())) {
-                        outputChannel = (MessageChannel) channelRegistry.lookupChannel(endpoint.getOutputChannel());
+                        outputChannel = (MessageChannel) channelResolver.resolveChannelName(endpoint.getOutputChannel());
                         ObjectHelper.notNull(inputChannel, "The outputChannel with the name [" + endpoint.getOutputChannel() + "]");
                         reply = outputChannel;
                     } else {
@@ -104,14 +123,10 @@
                 }
             }
             // put the message back the outputChannel if we need
-            org.springframework.integration.message.Message siOutMessage =
+            org.springframework.integration.core.Message siOutMessage =
                 SpringIntegrationBinding.storeToSpringIntegrationMessage(exchange.getOut());
             reply.send(siOutMessage);
-        }
-
-
-    }
-
-    //TODO We need to clean the channel when shutdown the endpoint
+        }        
+    }   
 
 }

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationEndpoint.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationEndpoint.java Tue Nov  4 23:39:57 2008
@@ -25,7 +25,7 @@
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.core.MessageChannel;
 
 /**
  * Defines the <a href="http://activemq.apache.org/camel/springIntergration.html">Spring Intergration Endpoint</a>

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationMessage.java Tue Nov  4 23:39:57 2008
@@ -20,7 +20,7 @@
 import java.util.Map;
 
 import org.apache.camel.impl.DefaultMessage;
-import org.springframework.integration.message.MessageHeaders;
+import org.springframework.integration.core.MessageHeaders;
 
 /**
  * The Message {@link DefaultMessage} implementation
@@ -29,9 +29,9 @@
  * @version $Revision$
  */
 public class SpringIntegrationMessage extends DefaultMessage {
-    private org.springframework.integration.message.Message siMessage;
+    private org.springframework.integration.core.Message siMessage;
 
-    public SpringIntegrationMessage(org.springframework.integration.message.Message message) {
+    public SpringIntegrationMessage(org.springframework.integration.core.Message message) {
         siMessage = message;
     }
 
@@ -39,11 +39,11 @@
 
     }
 
-    public void setMessage(org.springframework.integration.message.Message message) {
+    public void setMessage(org.springframework.integration.core.Message message) {
         siMessage = message;
     }
 
-    public org.springframework.integration.message.Message getMessage() {
+    public org.springframework.integration.core.Message getMessage() {
         return siMessage;
     }
 

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java Tue Nov  4 23:39:57 2008
@@ -19,19 +19,24 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.spring.SpringCamelContext;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.springframework.integration.channel.AbstractPollableChannel;
-import org.springframework.integration.channel.ChannelRegistry;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.config.MessageBusParser;
-import org.springframework.integration.message.MessageHeaders;
+import org.springframework.integration.channel.BeanFactoryChannelResolver;
+import org.springframework.integration.channel.ChannelResolver;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
+import org.springframework.integration.message.MessageHandler;
 
 /**
  * A producer of exchanges for the Spring Integration
@@ -40,13 +45,13 @@
  * should be set for receiving the response message.
  * @version $Revision$
  */
-public class SpringIntegrationProducer extends DefaultProducer<SpringIntegrationExchange> {
+public class SpringIntegrationProducer extends DefaultProducer<SpringIntegrationExchange> implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(SpringIntegrationProducer.class);
     private SpringCamelContext context;
-    private AbstractPollableChannel inputChannel;
+    private DirectChannel inputChannel;
     private MessageChannel outputChannel;
     private String outputChannelName;
-    private ChannelRegistry channelRegistry;
+    private ChannelResolver channelResolver;
     private SpringIntegrationEndpoint endpoint;
 
     public SpringIntegrationProducer(SpringIntegrationEndpoint endpoint) {
@@ -55,7 +60,7 @@
         context = (SpringCamelContext) endpoint.getCamelContext();
         if (context != null && endpoint.getMessageChannel() == null) {
             outputChannelName = endpoint.getDefaultChannel();
-            channelRegistry = (ChannelRegistry) context.getApplicationContext().getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
+            channelResolver = new BeanFactoryChannelResolver(context.getApplicationContext());
             if (ObjectHelper.isNullOrBlank(outputChannelName)) {
                 outputChannelName = endpoint.getInputChannel();
             }
@@ -63,7 +68,7 @@
                 throw new RuntimeCamelException("Can't find the right outputChannelName, "
                                                 + "please check the endpoint uri outputChannel part!");
             } else {
-                outputChannel = (AbstractPollableChannel) channelRegistry.lookupChannel(outputChannelName);
+                outputChannel = channelResolver.resolveChannelName(outputChannelName);
             }
         } else {
             if (endpoint.getMessageChannel() != null) {
@@ -79,24 +84,38 @@
                 throw new RuntimeCamelException("Can't find the right inputChannel, "
                                                 + "please check the endpoint uri inputChannel part!");
             } else {
-                inputChannel = (AbstractPollableChannel) channelRegistry.lookupChannel(endpoint.getInputChannel());
+                inputChannel = (DirectChannel)channelResolver.resolveChannelName(endpoint.getInputChannel());
             }
+        } else {
+            endpoint.setExchangePattern(ExchangePattern.InOnly);
         }
     }
 
     public void process(Exchange exchange) throws Exception {
+        
+        AsyncProcessorHelper.process(this, exchange);       
+        
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         Map<String, Object> headers = new HashMap<String, Object>();
-        if (exchange.getPattern().isInCapable()) {
-            headers.put(MessageHeaders.RETURN_ADDRESS , inputChannel);
+        if (exchange.getPattern().isOutCapable()) {
+            headers.put(MessageHeaders.REPLY_CHANNEL , inputChannel);
+            inputChannel.subscribe(new MessageHandler() {                
+                public void handleMessage(Message<?> message) {                    
+                    SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
+                    callback.done(true);
+                }
+            });
         }
-        org.springframework.integration.message.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange, headers);
-
+        org.springframework.integration.core.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange, headers);
+        
         outputChannel.send(siOutmessage);
-        if (exchange.getPattern().isInCapable()) {
-            org.springframework.integration.message.Message siInMessage = inputChannel.receive();
-            SpringIntegrationBinding.storeToCamelMessage(siInMessage, exchange.getOut());
+        if (!exchange.getPattern().isOutCapable()) {
+            callback.done(true);
         }
-
+        
+        return true;
     }
 
 

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/AbstractCamelAdapter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/AbstractCamelAdapter.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/AbstractCamelAdapter.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/AbstractCamelAdapter.java Tue Nov  4 23:39:57 2008
@@ -20,7 +20,6 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
-import org.springframework.integration.handler.MessageHandler;
 
 /**
  * The Abstract class for the Spring Integration Camel Adapter

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java Tue Nov  4 23:39:57 2008
@@ -16,22 +16,28 @@
  */
 package org.apache.camel.component.spring.integration.adapter;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.spring.integration.SpringIntegrationBinding;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.beans.factory.InitializingBean;
-import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.bus.MessageBus;
-import org.springframework.integration.bus.MessageBusAware;
-import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.channel.PollableChannel;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
 import org.springframework.integration.gateway.SimpleMessagingGateway;
-import org.springframework.integration.handler.MessageHandler;
-import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageHandler;
+import org.springframework.integration.transformer.Transformer;
 
 /**
  * A CamelContext will be injected into CameSourceAdapter which will
@@ -41,59 +47,55 @@
  *
  * @version $Revision$
  */
-public class CamelSourceAdapter extends AbstractCamelAdapter implements MessageHandler, InitializingBean, MessageBusAware {
+public class CamelSourceAdapter extends AbstractCamelAdapter implements InitializingBean {
     protected final Object lifecycleMonitor = new Object();
     private final Log logger = LogFactory.getLog(this.getClass());
     private Consumer consumer;
     private Endpoint camelEndpoint;
     private MessageChannel requestChannel;
-    private SimpleMessagingGateway messageGateway = new SimpleMessagingGateway();
-
+    private DirectChannel replyChannel;
+    
     private volatile boolean initialized;
 
     public void setRequestChannel(MessageChannel channel) {
-        requestChannel = channel;
-        messageGateway.setRequestChannel(requestChannel);
+        requestChannel = channel;        
     }
 
     public MessageChannel getChannel() {
         return requestChannel;
     }
 
-    public void setReplyChannel(PollableChannel channel) {
-        messageGateway.setReplyChannel(channel);
-    }
-
-    public void setRequestTimeout(long requestTimeout) {
-        this.messageGateway.setRequestTimeout(requestTimeout);
-    }
-
-    public void setReplyTimeout(long replyTimeout) {
-        this.messageGateway.setReplyTimeout(replyTimeout);
+    public void setReplyChannel(DirectChannel channel) {        
+        replyChannel = channel;
     }
 
-    private void incoming(Exchange exchange) {
-        org.springframework.integration.message.Message request =
-            SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
-
-        org.springframework.integration.message.Message response = handle(request);
-        if (response != null) {
-            // TODO How to deal with the fault message
-            SpringIntegrationBinding.storeToCamelMessage(response, exchange.getOut());
+    protected class ConsumerProcessor implements AsyncProcessor {
+        public void process(Exchange exchange) throws Exception {
+            AsyncProcessorHelper.process(this, exchange);      
         }
-    }
 
-    protected class ConsumerProcessor implements Processor {
-        public void process(Exchange exchange) {
-            try {
-                incoming(exchange);
-            } catch (Throwable ex) {
-                ex.printStackTrace();
-                logger.warn("Failed to process incoming message : " + ex);
-                //TODO Maybe we should set the exception as the fault message
+        public boolean process(final Exchange exchange, final AsyncCallback callback) {
+            org.springframework.integration.core.Message request =
+                SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
+            Map<String, Object> headers = new HashMap<String, Object>();
+            if (exchange.getPattern().isOutCapable()) {
+                headers.put(MessageHeaders.REPLY_CHANNEL , replyChannel);
+                replyChannel.subscribe(new MessageHandler() {                
+                    public void handleMessage(Message<?> message) {
+                        //TODO set the corralationID
+                        SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
+                        callback.done(true);
+                    }
+                });
             }
+                 
+            requestChannel.send(request);
+            
+            if (!exchange.getPattern().isOutCapable()) {
+                callback.done(true);
+            }            
+            return true;
         }
-
     }
 
     public final void afterPropertiesSet() throws Exception {
@@ -111,27 +113,5 @@
         camelEndpoint = getCamelContext().getEndpoint(getCamelEndpointUri());
         consumer = camelEndpoint.createConsumer(new ConsumerProcessor());
         consumer.start();
-    }
-
-    public final Message<?> handle(Message<?> message) {
-        if (!this.initialized) {
-            try {
-                this.afterPropertiesSet();
-            } catch (Exception e) {
-                throw new ConfigurationException("unable to initialize " + this.getClass().getName(), e);
-            }
-        }
-        if (!isExpectReply()) {
-            messageGateway.send(message);
-            return null;
-        }
-        return messageGateway.sendAndReceiveMessage(message);
-    }
-
-    public void setMessageBus(MessageBus bus) {
-        messageGateway.setMessageBus(bus);
-    }
-
-
-
+    }   
 }

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapter.java Tue Nov  4 23:39:57 2008
@@ -17,7 +17,6 @@
 package org.apache.camel.component.spring.integration.adapter;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ProducerTemplate;
@@ -26,14 +25,12 @@
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.springframework.integration.bus.MessageBus;
-import org.springframework.integration.bus.MessageBusAware;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.message.Message;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
 import org.springframework.integration.message.MessageDeliveryException;
-import org.springframework.integration.message.MessageHeaders;
+import org.springframework.integration.message.MessageHandler;
 import org.springframework.integration.message.MessageRejectedException;
-import org.springframework.integration.message.MessageTarget;
 
 /**
  * CamelTargeAdapter will redirect the Spring Integration message to the Camel context.
@@ -43,7 +40,7 @@
  *
  * @version $Revision$
  */
-public class CamelTargetAdapter extends AbstractCamelAdapter implements MessageTarget {
+public class CamelTargetAdapter extends AbstractCamelAdapter implements MessageHandler {
 
     private final Log logger = LogFactory.getLog(this.getClass());
     private ProducerTemplate<Exchange> camelTemplate;
@@ -88,7 +85,7 @@
             //Check the message header for the return address
             response = SpringIntegrationBinding.storeToSpringIntegrationMessage(outExchange.getOut());
             if (replyChannel == null) {
-                MessageChannel messageReplyChannel = (MessageChannel) message.getHeaders().get(MessageHeaders.RETURN_ADDRESS);
+                MessageChannel messageReplyChannel = (MessageChannel) message.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                 if (messageReplyChannel != null) {
                     result = messageReplyChannel.send(response);
                 } else {
@@ -101,4 +98,8 @@
         return result;
     }
 
+    public void handleMessage(Message<?> message) {
+        send(message);        
+    }
+
 }

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/config/CamelTargetAdapterParser.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/config/CamelTargetAdapterParser.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/config/CamelTargetAdapterParser.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/config/CamelTargetAdapterParser.java Tue Nov  4 23:39:57 2008
@@ -26,8 +26,6 @@
 import org.springframework.beans.factory.support.RootBeanDefinition;
 import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser;
 import org.springframework.beans.factory.xml.ParserContext;
-import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.endpoint.MessagingBridge;
 import org.springframework.util.StringUtils;
 
 /**

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/converter/SpringIntegrationConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/converter/SpringIntegrationConverter.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/converter/SpringIntegrationConverter.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/converter/SpringIntegrationConverter.java Tue Nov  4 23:39:57 2008
@@ -16,15 +16,13 @@
  */
 package org.apache.camel.component.spring.integration.converter;
 
-import java.util.Map;
-
 import org.apache.camel.Converter;
 import org.apache.camel.Endpoint;
 import org.apache.camel.component.spring.integration.SpringIntegrationEndpoint;
 import org.apache.camel.component.spring.integration.SpringIntegrationMessage;
-import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
 import org.springframework.integration.message.GenericMessage;
-import org.springframework.integration.message.MessageHeaders;
 
 /**
  * The <a href="http://activemq.apache.org/camel/type-converter.html">Type Converters</a>
@@ -57,10 +55,10 @@
 
     @SuppressWarnings("unchecked")
     @Converter
-    public static org.springframework.integration.message.Message toSpringMessage(final org.apache.camel.Message camelMessage) throws Exception {
+    public static org.springframework.integration.core.Message toSpringMessage(final org.apache.camel.Message camelMessage) throws Exception {
         if (camelMessage instanceof SpringIntegrationMessage) {
             SpringIntegrationMessage siMessage = (SpringIntegrationMessage)camelMessage;
-            org.springframework.integration.message.Message message =  siMessage.getMessage();
+            org.springframework.integration.core.Message message =  siMessage.getMessage();
             if (message != null) {
                 return message;
             }
@@ -72,7 +70,7 @@
     }
 
     @Converter
-    public static org.apache.camel.Message toCamelMessage(final org.springframework.integration.message.Message springMessage) throws Exception {
+    public static org.apache.camel.Message toCamelMessage(final org.springframework.integration.core.Message springMessage) throws Exception {
         return new SpringIntegrationMessage(springMessage);
     }
 

Modified: activemq/camel/trunk/components/camel-spring-integration/src/main/resources/schema/camel-spring-integration.xsd
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/main/resources/schema/camel-spring-integration.xsd?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/main/resources/schema/camel-spring-integration.xsd (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/main/resources/schema/camel-spring-integration.xsd Tue Nov  4 23:39:57 2008
@@ -49,9 +49,7 @@
 		</xsd:annotation>
         <xsd:complexContent>
           <xsd:extension base="camelEndpointType">
-           <xsd:attribute name="requestChannel" type="xsd:string" use="required" />
-	       <xsd:attribute name="requestTimeout" type="xsd:long"/>
-		   <xsd:attribute name="replyTimeout" type="xsd:long"/>
+           <xsd:attribute name="requestChannel" type="xsd:string" use="required" />	       
 		  </xsd:extension>
 		</xsd:complexContent>
      </xsd:complexType>

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/HelloWorldService.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/HelloWorldService.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/HelloWorldService.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/HelloWorldService.java Tue Nov  4 23:39:57 2008
@@ -23,11 +23,20 @@
 public class HelloWorldService {
 
     private int count;
+    private String greetName;
 
     public String sayHello(String name) {
         count++;
         return "Hello " + name;
     }
+    
+    public void greet(String name) {        
+        greetName = name;
+    }
+    
+    public String getGreetName() {
+        return greetName;
+    }
 
     public int getCount() {
         return count;

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationOneWayConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationOneWayConsumerTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationOneWayConsumerTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationOneWayConsumerTest.java Tue Nov  4 23:39:57 2008
@@ -20,8 +20,7 @@
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.message.Message;
+import org.springframework.integration.core.MessageChannel;
 import org.springframework.integration.message.StringMessage;
 
 

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationProducerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationProducerTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationProducerTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationProducerTest.java Tue Nov  4 23:39:57 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.spring.integration;
 
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.spring.SpringTestSupport;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 
@@ -26,8 +27,14 @@
     }
 
     public void testSendingTwoWayMessage() throws Exception {
-        String result = (String) template.sendBody("direct:start", "Willem");
+        String result = (String) template.sendBody("direct:twowayMessage", ExchangePattern.InOut, "Willem");
         assertEquals("Can't get the right response", result, "Hello Willem");
     }
+    
+    public void testSendingOneWayMessage() throws Exception {
+        template.sendBody("direct:onewayMessage", "Greet");
+        HelloWorldService service = (HelloWorldService)applicationContext.getBean("helloService");
+        assertEquals("We should call the service", service.getGreetName(), "Greet");        
+    }
 
 }

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationTwoWayConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationTwoWayConsumerTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationTwoWayConsumerTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/SpringIntegrationTwoWayConsumerTest.java Tue Nov  4 23:39:57 2008
@@ -20,40 +20,35 @@
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 import org.springframework.integration.channel.AbstractPollableChannel;
-import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
 import org.springframework.integration.message.GenericMessage;
-import org.springframework.integration.message.Message;
-import org.springframework.integration.message.MessageHeaders;
-import org.springframework.integration.message.StringMessage;
+import org.springframework.integration.message.MessageHandler;
 
 
 public class SpringIntegrationTwoWayConsumerTest extends SpringTestSupport {
-    private static final String MESSAGE_BODY = "Request message";
-    public void testDelyConfiguration() throws Exception {
-        SpringIntegrationEndpoint endpoint = (SpringIntegrationEndpoint)resolveMandatoryEndpoint("spring-integration://requestChannel?outputChannel=responseChannel&inOut=true&consumer.delay=5000");
-        Map map = endpoint.getConsumerProperties();
-        assertEquals("There should have a delay property ", map.size(), 1);
-        assertEquals("The delay value is not right", map.get("delay"), "5000");
-    }
+    private static final String MESSAGE_BODY = "Request message";    
 
     public void testSendingTwoWayMessage() throws Exception {
-
+        
         MessageChannel requestChannel = (MessageChannel) applicationContext.getBean("requestChannel");
         Map<String, Object> maps = new HashMap<String, Object>();
-        maps.put(MessageHeaders.RETURN_ADDRESS, "responseChannel");
+        maps.put(MessageHeaders.REPLY_CHANNEL, "responseChannel");
         Message<String> message = new GenericMessage<String>(MESSAGE_BODY, maps);
-
-        requestChannel.send(message);
-
-        AbstractPollableChannel responseChannel = (AbstractPollableChannel) applicationContext.getBean("responseChannel");
-        Message responseMessage = responseChannel.receive();
-        String result = (String) responseMessage.getPayload();
-
-        assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);
+        DirectChannel responseChannel = (DirectChannel) applicationContext.getBean("responseChannel");
+        responseChannel.subscribe(new MessageHandler() {
+            public void handleMessage(Message<?> message) {
+                String result = (String) message.getPayload();
+                assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);                
+            }             
+        });
+        requestChannel.send(message);        
+        
     }
 
     public ClassPathXmlApplicationContext createApplicationContext() {

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapterTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapterTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapterTest.java Tue Nov  4 23:39:57 2008
@@ -16,23 +16,28 @@
  */
 package org.apache.camel.component.spring.integration.adapter;
 
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.spring.integration.HelloWorldService;
 import org.apache.camel.spring.SpringTestSupport;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.channel.PollableChannel;
-import org.springframework.integration.message.Message;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.message.MessageHandler;
 
 public class CamelSourceAdapterTest extends SpringTestSupport {
     public void testSendingOneWayMessage() throws Exception {
-        PollableChannel channelA = (PollableChannel) applicationContext.getBean("channelA");
+        DirectChannel channelA = (DirectChannel) applicationContext.getBean("channelA");
+        channelA.subscribe(new MessageHandler() {
+            public void handleMessage(Message<?> message) {
+                assertEquals("We should get the message from channelA", message.getPayload(), "Willem");             
+            }            
+        });
         template.sendBody("direct:OneWay", "Willem");
-        Message message = channelA.receive();
-        assertEquals("We should get the message from channelA", message.getPayload(), "Willem");
-
     }
 
     public void testSendingTwoWayMessage() throws Exception {
-        String result = (String) template.sendBody("direct:TwoWay", "Willem");
+        String result = (String) template.sendBody("direct:TwoWay", ExchangePattern.InOut, "Willem");
         assertEquals("Can't get the right response", result, "Hello Willem");
     }
 

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/CamelTargetAdapterTest.java Tue Nov  4 23:39:57 2008
@@ -22,11 +22,13 @@
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.SpringTestSupport;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.channel.DirectChannel;
 import org.springframework.integration.channel.PollableChannel;
+import org.springframework.integration.core.Message;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
 import org.springframework.integration.message.GenericMessage;
-import org.springframework.integration.message.Message;
-import org.springframework.integration.message.MessageHeaders;
+import org.springframework.integration.message.MessageHandler;
 import org.springframework.integration.message.StringMessage;
 
 public class CamelTargetAdapterTest extends SpringTestSupport {
@@ -44,28 +46,31 @@
 
         MessageChannel requestChannel = (MessageChannel) applicationContext.getBean("channelB");
         Message message = new StringMessage(MESSAGE_BODY);
+        //Need to subscribe the responseChannel first
+        DirectChannel responseChannel = (DirectChannel) applicationContext.getBean("channelC");
+        responseChannel.subscribe(new MessageHandler() {
+            public void handleMessage(Message<?> message) {
+                String result = (String) message.getPayload();
+                assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);                
+            }            
+        });
         requestChannel.send(message);
-
-        PollableChannel responseChannel = (PollableChannel) applicationContext.getBean("channelC");
-        Message responseMessage = responseChannel.receive();
-        String result = (String) responseMessage.getPayload();
-
-        assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);
     }
 
     public void testSendingTwoWayMessageWithMessageAddress() throws Exception {
 
         MessageChannel requestChannel = (MessageChannel) applicationContext.getBean("channelD");
-        PollableChannel responseChannel = (PollableChannel) applicationContext.getBean("channelC");
+        DirectChannel responseChannel = (DirectChannel) applicationContext.getBean("channelC");
         Map<String, Object> headers = new HashMap<String, Object>();
-        headers.put(MessageHeaders.RETURN_ADDRESS, responseChannel);
+        headers.put(MessageHeaders.REPLY_CHANNEL, responseChannel);
         GenericMessage<String> message = new GenericMessage<String>(MESSAGE_BODY, headers);
-        requestChannel.send(message);
-
-        Message responseMessage = responseChannel.receive();
-        String result = (String) responseMessage.getPayload();
-
-        assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);
+        responseChannel.subscribe(new MessageHandler() {
+            public void handleMessage(Message<?> message) {
+                String result = (String) message.getPayload();
+                assertEquals("Get the wrong result", MESSAGE_BODY + " is processed",  result);                
+            }            
+        });
+        requestChannel.send(message);        
     }
 
     @Override

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/ConfigurationTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/ConfigurationTest.java?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/ConfigurationTest.java (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/java/org/apache/camel/component/spring/integration/adapter/ConfigurationTest.java Tue Nov  4 23:39:57 2008
@@ -19,7 +19,6 @@
 import junit.framework.TestCase;
 import org.springframework.context.support.AbstractXmlApplicationContext;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.integration.endpoint.MessagingBridge;
 
 public class ConfigurationTest extends TestCase {
     private AbstractXmlApplicationContext context;

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/adapter/CamelTarget.xml Tue Nov  4 23:39:57 2008
@@ -33,10 +33,10 @@
  <!-- END SNIPPET: header -->
    	<message-bus/>
 
-    <channel-adapter id="channelA" target="camelTargetA"/>
-   	<channel-adapter id="channelB" target="camelTargetB"/>
+    <outbound-channel-adapter id="channelA" ref="camelTargetA"/>
+   	<outbound-channel-adapter id="channelB" ref="camelTargetB"/>
     <channel id="channelC"/>
-    <channel-adapter id="channelD" target="camelTargetD"/>
+    <outbound-channel-adapter id="channelD" ref="camelTargetD"/>
    <!-- START SNIPPET: example -->
    <!-- Create the camel context here -->
    <camelContext id="camelTargetContext" xmlns="http://activemq.apache.org/camel/schema/spring">
@@ -67,3 +67,4 @@
    <!-- END SNIPPET: example -->
 </beans:beans>
 
+

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/producer.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/producer.xml?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/producer.xml (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/producer.xml Tue Nov  4 23:39:57 2008
@@ -29,20 +29,29 @@
 	<message-bus/>
 	<channel id="inputChannel"/>
    	<channel id="outputChannel"/>
+   	<channel id="onewayChannel"/>
 
-	<service-activator input-channel="inputChannel"
-	          output-channel="outputChannel"
+	<service-activator input-channel="inputChannel"	          
 	          ref="helloService"
 	          method="sayHello"/>
-
+	          	      
+	<service-activator input-channel="onewayChannel"	          
+	          ref="helloService"
+	          method="greet"/>
+	          
 	<beans:bean id="helloService" class="org.apache.camel.component.spring.integration.HelloWorldService"/>
+    
     <!-- START SNIPPET: example -->
     <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
       <route>
-        <from uri="direct:start"/>
+        <from uri="direct:twowayMessage"/>
         <!-- Using the &amp; as the separator of & -->
         <to uri="spring-integration:inputChannel?inOut=true&amp;inputChannel=outputChannel"/>
       </route>
+      <route>      
+        <from uri="direct:onewayMessage"/>
+        <to uri="spring-integration:onewayChannel?inOut=false"/>
+      </route>
     </camelContext>
     <!-- END SNIPPET: example -->
 </beans:beans>

Modified: activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/twoWayConsumer.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/twoWayConsumer.xml?rev=711523&r1=711522&r2=711523&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/twoWayConsumer.xml (original)
+++ activemq/camel/trunk/components/camel-spring-integration/src/test/resources/org/apache/camel/component/spring/integration/twoWayConsumer.xml Tue Nov  4 23:39:57 2008
@@ -36,7 +36,7 @@
     <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
       <route>
         <!-- Using the &amp; as the separator of & -->
-        <from uri="spring-integration://requestChannel?outputChannel=responseChannel&amp;inOut=true&amp;consumer.delay=5000"/>
+        <from uri="spring-integration://requestChannel?outputChannel=responseChannel&amp;inOut=true"/>
         <process ref="myProcessor"/>
       </route>
     </camelContext>