You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/05 11:46:01 UTC

svn commit: r771643 - in /camel/trunk/components: camel-jms/src/test/java/org/apache/camel/component/jms/tx/ camel-spring-integration/ camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/ camel-spring-integration/src/ma...

Author: davsclaus
Date: Tue May  5 09:46:01 2009
New Revision: 771643

URL: http://svn.apache.org/viewvc?rev=771643&view=rev
Log:
CAMEL-1572: API cleanup. And upgraded spring integration to 1.0.2

Modified:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java
    camel/trunk/components/camel-spring-integration/pom.xml
    camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
    camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
    camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java Tue May  5 09:46:01 2009
@@ -24,7 +24,6 @@
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.processor.DeadLetterChannel;
 import org.apache.camel.processor.DefaultErrorHandler;
-import org.apache.camel.processor.DelegateAsyncProcessor;
 import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.spring.spi.TransactionErrorHandler;
@@ -94,8 +93,6 @@
 
             if (processor instanceof Channel) {
                 processor = ((Channel)processor).getNextProcessor();
-            } else if (processor instanceof DelegateAsyncProcessor) {
-                processor = ((DelegateAsyncProcessor)processor).getProcessor();
             } else if (processor instanceof DelegateProcessor) {
                 // TransactionInterceptor is a DelegateProcessor
                 processor = ((DelegateProcessor)processor).getProcessor();

Modified: camel/trunk/components/camel-spring-integration/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/pom.xml?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/pom.xml (original)
+++ camel/trunk/components/camel-spring-integration/pom.xml Tue May  5 09:46:01 2009
@@ -33,7 +33,7 @@
 
   <properties>
     <camel.osgi.export.pkg>org.apache.camel.component.spring.integration.*</camel.osgi.export.pkg>
-    <spring-integration-version>1.0.1.RELEASE</spring-integration-version>
+    <spring-integration-version>1.0.2.RELEASE</spring-integration-version>
   </properties>
 
   <repositories>

Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java Tue May  5 09:46:01 2009
@@ -34,20 +34,18 @@
     }
 
     public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange) {
-        return createSpringIntegrationMessage(exchange, null);
+        return createSpringIntegrationMessage(exchange, exchange.getIn().getHeaders());
     }
 
     @SuppressWarnings("unchecked")
     public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange, Map<String, Object> headers) {
         org.apache.camel.Message message = exchange.getIn();
-        GenericMessage siMessage = new GenericMessage(message.getBody(), headers);
-        return siMessage;
+        return new GenericMessage(message.getBody(), headers);
     }
 
     @SuppressWarnings("unchecked")
     public static org.springframework.integration.core.Message storeToSpringIntegrationMessage(org.apache.camel.Message message) {
-        GenericMessage siMessage = new GenericMessage(message.getBody());
-        return siMessage;
+        return new GenericMessage(message.getBody());
     }
 
     public static void storeToCamelMessage(org.springframework.integration.core.Message siMessage, org.apache.camel.Message cMessage) {

Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java Tue May  5 09:46:01 2009
@@ -16,17 +16,12 @@
  */
 package org.apache.camel.component.spring.integration;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.spring.SpringCamelContext;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,8 +40,9 @@
  * should be set for receiving the response message.
  * @version $Revision$
  */
-public class SpringIntegrationProducer extends DefaultProducer implements AsyncProcessor {
+public class SpringIntegrationProducer extends DefaultProducer implements Processor {
     private static final transient Log LOG = LogFactory.getLog(SpringIntegrationProducer.class);
+
     private SpringCamelContext context;
     private DirectChannel inputChannel;
     private MessageChannel outputChannel;
@@ -65,8 +61,7 @@
                 outputChannelName = endpoint.getInputChannel();
             }
             if (ObjectHelper.isEmpty(outputChannelName)) {
-                throw new RuntimeCamelException("Can't find the right outputChannelName, "
-                                                + "please check the endpoint uri outputChannel part!");
+                throw new RuntimeCamelException("Cannot find outputChannelName, please check the endpoint uri outputChannel part!");
             } else {
                 outputChannel = channelResolver.resolveChannelName(outputChannelName);
             }
@@ -74,15 +69,14 @@
             if (endpoint.getMessageChannel() != null) {
                 outputChannel = endpoint.getMessageChannel();
             } else {
-                throw new RuntimeCamelException("Can't find the right message channel, please check your configuration.");
+                throw new RuntimeCamelException("Cannot find message channel, please check your configuration.");
             }
         }
         if (endpoint.isInOut()) {
             endpoint.setExchangePattern(ExchangePattern.InOut);
             // we need to setup right inputChannel for further processing
             if (ObjectHelper.isEmpty(endpoint.getInputChannel())) {
-                throw new RuntimeCamelException("Can't find the right inputChannel, "
-                                                + "please check the endpoint uri inputChannel part!");
+                throw new RuntimeCamelException("Cannot find inputChannel, please check the endpoint uri inputChannel part!");
             } else {
                 inputChannel = (DirectChannel)channelResolver.resolveChannelName(endpoint.getInputChannel());
             }
@@ -91,31 +85,18 @@
         }
     }
 
-    public void process(Exchange exchange) throws Exception {
-        
-        AsyncProcessorHelper.process(this, exchange);       
-        
-    }
-
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        Map<String, Object> headers = new HashMap<String, Object>();
+    public void process(final Exchange exchange) throws Exception {
         if (exchange.getPattern().isOutCapable()) {
-            headers.put(MessageHeaders.REPLY_CHANNEL , inputChannel);
-            inputChannel.subscribe(new MessageHandler() {                
+            exchange.getIn().getHeaders().put(MessageHeaders.REPLY_CHANNEL , inputChannel);
+            inputChannel.subscribe(new MessageHandler() {
                 public void handleMessage(Message<?> message) {                    
                     SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
-                    callback.done(true);
                 }
             });
         }
-        org.springframework.integration.core.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange, headers);
+        org.springframework.integration.core.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
         
         outputChannel.send(siOutmessage);
-        if (!exchange.getPattern().isOutCapable()) {
-            callback.done(true);
-        }
-        
-        return true;
     }
 
 

Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java Tue May  5 09:46:01 2009
@@ -16,28 +16,22 @@
  */
 package org.apache.camel.component.spring.integration.adapter;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.spring.integration.SpringIntegrationBinding;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.DisposableBean;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.channel.PollableChannel;
 import org.springframework.integration.core.Message;
 import org.springframework.integration.core.MessageChannel;
 import org.springframework.integration.core.MessageHeaders;
-import org.springframework.integration.gateway.SimpleMessagingGateway;
 import org.springframework.integration.message.MessageHandler;
-import org.springframework.integration.transformer.Transformer;
 
 /**
  * A CamelContext will be injected into CameSourceAdapter which will
@@ -47,15 +41,15 @@
  *
  * @version $Revision$
  */
-public class CamelSourceAdapter extends AbstractCamelAdapter implements InitializingBean {
-    protected final Object lifecycleMonitor = new Object();
-    private final Log logger = LogFactory.getLog(this.getClass());
+public class CamelSourceAdapter extends AbstractCamelAdapter implements InitializingBean, DisposableBean {
+    private final static Log LOG = LogFactory.getLog(CamelSourceAdapter.class);
+
     private Consumer consumer;
     private Endpoint camelEndpoint;
     private MessageChannel requestChannel;
     private DirectChannel replyChannel;
     
-    private volatile boolean initialized;
+    private AtomicBoolean initialized = new AtomicBoolean();
 
     public void setRequestChannel(MessageChannel channel) {
         requestChannel = channel;        
@@ -69,43 +63,35 @@
         replyChannel = channel;
     }
 
-    protected class ConsumerProcessor implements AsyncProcessor {
-        public void process(Exchange exchange) throws Exception {
-            AsyncProcessorHelper.process(this, exchange);      
-        }
+    protected class ConsumerProcessor implements Processor {
+
+        public void process(final Exchange exchange) throws Exception {
+            org.springframework.integration.core.Message request = SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
 
-        public boolean process(final Exchange exchange, final AsyncCallback callback) {
-            org.springframework.integration.core.Message request =
-                SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
-            Map<String, Object> headers = new HashMap<String, Object>();
             if (exchange.getPattern().isOutCapable()) {
-                headers.put(MessageHeaders.REPLY_CHANNEL , replyChannel);
-                replyChannel.subscribe(new MessageHandler() {                
+                exchange.getIn().getHeaders().put(MessageHeaders.REPLY_CHANNEL , replyChannel);
+                replyChannel.subscribe(new MessageHandler() {
                     public void handleMessage(Message<?> message) {
                         //TODO set the corralationID
                         SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
-                        callback.done(true);
                     }
                 });
             }
                  
             requestChannel.send(request);
-            
-            if (!exchange.getPattern().isOutCapable()) {
-                callback.done(true);
-            }            
-            return true;
         }
     }
 
     public final void afterPropertiesSet() throws Exception {
-        synchronized (this.lifecycleMonitor) {
-            if (this.initialized) {
-                return;
-            }
+        if (initialized.compareAndSet(false, true)) {
+            initialize();
+        }
+    }
+
+    public void destroy() throws Exception {
+        if (consumer != null) {
+            consumer.stop();
         }
-        this.initialize();
-        this.initialized = true;
     }
 
     protected void initialize() throws Exception {
@@ -113,5 +99,6 @@
         camelEndpoint = getCamelContext().getEndpoint(getCamelEndpointUri());
         consumer = camelEndpoint.createConsumer(new ConsumerProcessor());
         consumer.start();
-    }   
+    }
+
 }