You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/05 11:46:01 UTC
svn commit: r771643 - in /camel/trunk/components:
camel-jms/src/test/java/org/apache/camel/component/jms/tx/
camel-spring-integration/
camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/
camel-spring-integration/src/ma...
Author: davsclaus
Date: Tue May 5 09:46:01 2009
New Revision: 771643
URL: http://svn.apache.org/viewvc?rev=771643&view=rev
Log:
CAMEL-1572: API cleanup. And upgraded spring integration to 1.0.2
Modified:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java
camel/trunk/components/camel-spring-integration/pom.xml
camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/AbstractTransactionTest.java Tue May 5 09:46:01 2009
@@ -24,7 +24,6 @@
import org.apache.camel.impl.EventDrivenConsumerRoute;
import org.apache.camel.processor.DeadLetterChannel;
import org.apache.camel.processor.DefaultErrorHandler;
-import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.processor.DelegateProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.spring.spi.TransactionErrorHandler;
@@ -94,8 +93,6 @@
if (processor instanceof Channel) {
processor = ((Channel)processor).getNextProcessor();
- } else if (processor instanceof DelegateAsyncProcessor) {
- processor = ((DelegateAsyncProcessor)processor).getProcessor();
} else if (processor instanceof DelegateProcessor) {
// TransactionInterceptor is a DelegateProcessor
processor = ((DelegateProcessor)processor).getProcessor();
Modified: camel/trunk/components/camel-spring-integration/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/pom.xml?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/pom.xml (original)
+++ camel/trunk/components/camel-spring-integration/pom.xml Tue May 5 09:46:01 2009
@@ -33,7 +33,7 @@
<properties>
<camel.osgi.export.pkg>org.apache.camel.component.spring.integration.*</camel.osgi.export.pkg>
- <spring-integration-version>1.0.1.RELEASE</spring-integration-version>
+ <spring-integration-version>1.0.2.RELEASE</spring-integration-version>
</properties>
<repositories>
Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationBinding.java Tue May 5 09:46:01 2009
@@ -34,20 +34,18 @@
}
public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange) {
- return createSpringIntegrationMessage(exchange, null);
+ return createSpringIntegrationMessage(exchange, exchange.getIn().getHeaders());
}
@SuppressWarnings("unchecked")
public static org.springframework.integration.core.Message createSpringIntegrationMessage(Exchange exchange, Map<String, Object> headers) {
org.apache.camel.Message message = exchange.getIn();
- GenericMessage siMessage = new GenericMessage(message.getBody(), headers);
- return siMessage;
+ return new GenericMessage(message.getBody(), headers);
}
@SuppressWarnings("unchecked")
public static org.springframework.integration.core.Message storeToSpringIntegrationMessage(org.apache.camel.Message message) {
- GenericMessage siMessage = new GenericMessage(message.getBody());
- return siMessage;
+ return new GenericMessage(message.getBody());
}
public static void storeToCamelMessage(org.springframework.integration.core.Message siMessage, org.apache.camel.Message cMessage) {
Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/SpringIntegrationProducer.java Tue May 5 09:46:01 2009
@@ -16,17 +16,12 @@
*/
package org.apache.camel.component.spring.integration;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.spring.SpringCamelContext;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,8 +40,9 @@
* should be set for receiving the response message.
* @version $Revision$
*/
-public class SpringIntegrationProducer extends DefaultProducer implements AsyncProcessor {
+public class SpringIntegrationProducer extends DefaultProducer implements Processor {
private static final transient Log LOG = LogFactory.getLog(SpringIntegrationProducer.class);
+
private SpringCamelContext context;
private DirectChannel inputChannel;
private MessageChannel outputChannel;
@@ -65,8 +61,7 @@
outputChannelName = endpoint.getInputChannel();
}
if (ObjectHelper.isEmpty(outputChannelName)) {
- throw new RuntimeCamelException("Can't find the right outputChannelName, "
- + "please check the endpoint uri outputChannel part!");
+ throw new RuntimeCamelException("Cannot find outputChannelName, please check the endpoint uri outputChannel part!");
} else {
outputChannel = channelResolver.resolveChannelName(outputChannelName);
}
@@ -74,15 +69,14 @@
if (endpoint.getMessageChannel() != null) {
outputChannel = endpoint.getMessageChannel();
} else {
- throw new RuntimeCamelException("Can't find the right message channel, please check your configuration.");
+ throw new RuntimeCamelException("Cannot find message channel, please check your configuration.");
}
}
if (endpoint.isInOut()) {
endpoint.setExchangePattern(ExchangePattern.InOut);
// we need to setup right inputChannel for further processing
if (ObjectHelper.isEmpty(endpoint.getInputChannel())) {
- throw new RuntimeCamelException("Can't find the right inputChannel, "
- + "please check the endpoint uri inputChannel part!");
+ throw new RuntimeCamelException("Cannot find inputChannel, please check the endpoint uri inputChannel part!");
} else {
inputChannel = (DirectChannel)channelResolver.resolveChannelName(endpoint.getInputChannel());
}
@@ -91,31 +85,18 @@
}
}
- public void process(Exchange exchange) throws Exception {
-
- AsyncProcessorHelper.process(this, exchange);
-
- }
-
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- Map<String, Object> headers = new HashMap<String, Object>();
+ public void process(final Exchange exchange) throws Exception {
if (exchange.getPattern().isOutCapable()) {
- headers.put(MessageHeaders.REPLY_CHANNEL , inputChannel);
- inputChannel.subscribe(new MessageHandler() {
+ exchange.getIn().getHeaders().put(MessageHeaders.REPLY_CHANNEL , inputChannel);
+ inputChannel.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
- callback.done(true);
}
});
}
- org.springframework.integration.core.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange, headers);
+ org.springframework.integration.core.Message siOutmessage = SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
outputChannel.send(siOutmessage);
- if (!exchange.getPattern().isOutCapable()) {
- callback.done(true);
- }
-
- return true;
}
Modified: camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java?rev=771643&r1=771642&r2=771643&view=diff
==============================================================================
--- camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java (original)
+++ camel/trunk/components/camel-spring-integration/src/main/java/org/apache/camel/component/spring/integration/adapter/CamelSourceAdapter.java Tue May 5 09:46:01 2009
@@ -16,28 +16,22 @@
*/
package org.apache.camel.component.spring.integration.adapter;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.spring.integration.SpringIntegrationBinding;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
-import org.springframework.integration.gateway.SimpleMessagingGateway;
import org.springframework.integration.message.MessageHandler;
-import org.springframework.integration.transformer.Transformer;
/**
* A CamelContext will be injected into CameSourceAdapter which will
@@ -47,15 +41,15 @@
*
* @version $Revision$
*/
-public class CamelSourceAdapter extends AbstractCamelAdapter implements InitializingBean {
- protected final Object lifecycleMonitor = new Object();
- private final Log logger = LogFactory.getLog(this.getClass());
+public class CamelSourceAdapter extends AbstractCamelAdapter implements InitializingBean, DisposableBean {
+ private final static Log LOG = LogFactory.getLog(CamelSourceAdapter.class);
+
private Consumer consumer;
private Endpoint camelEndpoint;
private MessageChannel requestChannel;
private DirectChannel replyChannel;
- private volatile boolean initialized;
+ private AtomicBoolean initialized = new AtomicBoolean();
public void setRequestChannel(MessageChannel channel) {
requestChannel = channel;
@@ -69,43 +63,35 @@
replyChannel = channel;
}
- protected class ConsumerProcessor implements AsyncProcessor {
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
+ protected class ConsumerProcessor implements Processor {
+
+ public void process(final Exchange exchange) throws Exception {
+ org.springframework.integration.core.Message request = SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
- public boolean process(final Exchange exchange, final AsyncCallback callback) {
- org.springframework.integration.core.Message request =
- SpringIntegrationBinding.createSpringIntegrationMessage(exchange);
- Map<String, Object> headers = new HashMap<String, Object>();
if (exchange.getPattern().isOutCapable()) {
- headers.put(MessageHeaders.REPLY_CHANNEL , replyChannel);
- replyChannel.subscribe(new MessageHandler() {
+ exchange.getIn().getHeaders().put(MessageHeaders.REPLY_CHANNEL , replyChannel);
+ replyChannel.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
//TODO set the corralationID
SpringIntegrationBinding.storeToCamelMessage(message, exchange.getOut());
- callback.done(true);
}
});
}
requestChannel.send(request);
-
- if (!exchange.getPattern().isOutCapable()) {
- callback.done(true);
- }
- return true;
}
}
public final void afterPropertiesSet() throws Exception {
- synchronized (this.lifecycleMonitor) {
- if (this.initialized) {
- return;
- }
+ if (initialized.compareAndSet(false, true)) {
+ initialize();
+ }
+ }
+
+ public void destroy() throws Exception {
+ if (consumer != null) {
+ consumer.stop();
}
- this.initialize();
- this.initialized = true;
}
protected void initialize() throws Exception {
@@ -113,5 +99,6 @@
camelEndpoint = getCamelContext().getEndpoint(getCamelEndpointUri());
consumer = camelEndpoint.createConsumer(new ConsumerProcessor());
consumer.start();
- }
+ }
+
}