You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/09/21 19:33:56 UTC
svn commit: r578206 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/component/seda/
main/java/org/apache/camel/component/vm/
test/java/org/apache/camel/component/file/
Author: chirino
Date: Fri Sep 21 10:33:54 2007
New Revision: 578206
URL: http://svn.apache.org/viewvc?rev=578206&view=rev
Log:
Seda exchange once again completes an exchange when a producer enqueues it's message on the seda queue.. use the thread(x) DSL method if you want to do async processing of an exchange that will be completed later
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java Fri Sep 21 10:33:54 2007
@@ -31,8 +31,8 @@
* @version $Revision: 1.1 $
*/
public class SedaComponent extends DefaultComponent {
- public BlockingQueue<SedaEndpoint.Entry> createQueue() {
- return new LinkedBlockingQueue<SedaEndpoint.Entry>(1000);
+ public BlockingQueue<Exchange> createQueue() {
+ return new LinkedBlockingQueue<Exchange>(1000);
}
@Override
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri Sep 21 10:33:54 2007
@@ -51,30 +51,17 @@
public void run() {
while (!isStopping()) {
- final SedaEndpoint.Entry entry;
+ final Exchange exchange;
try {
- entry = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+ exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
break;
}
- if (entry != null && !isStopping()) {
- processor.process(entry.getExchange(), new AsyncCallback() {
+ if (exchange != null && !isStopping()) {
+ processor.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
- if (entry.getCallback() != null) {
- entry.getCallback().done(false);
- } else {
- Throwable e = entry.getExchange().getException();
- if (e != null) {
- if (e instanceof AlreadyStoppedException) {
- LOG.debug("Ignoring failed message due to shutdown: " + e, e);
- } else {
- LOG.error(e);
- }
- }
- }
}
});
-
}
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Fri Sep 21 10:33:54 2007
@@ -39,47 +39,24 @@
* @version $Revision: 519973 $
*/
public class SedaEndpoint extends DefaultEndpoint<Exchange> {
-
- static public class Entry {
- Exchange exchange;
- AsyncCallback callback;
- public Entry(Exchange exchange, AsyncCallback callback) {
- this.exchange = exchange;
- this.callback = callback;
- }
-
- public Exchange getExchange() {
- return exchange;
- }
- public void setExchange(Exchange exchange) {
- this.exchange = exchange;
- }
- public AsyncCallback getCallback() {
- return callback;
- }
- public void setCallback(AsyncCallback callback) {
- this.callback = callback;
- }
-
- }
-
private final class SedaProducer extends DefaultProducer implements AsyncProcessor {
private SedaProducer(Endpoint endpoint) {
super(endpoint);
}
public void process(Exchange exchange) {
- queue.add(new Entry(createExchange(exchange), null));
+ queue.add(exchange.copy());
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- queue.add(new Entry(createExchange(exchange), callback));
- return false;
+ queue.add(exchange.copy());
+ callback.done(true);
+ return true;
}
}
- private BlockingQueue<Entry> queue;
+ private BlockingQueue<Exchange> queue;
- public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Entry> queue) {
+ public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) {
super(endpointUri, component);
this.queue = queue;
}
@@ -96,7 +73,7 @@
return new SedaConsumer(this, processor);
}
- public BlockingQueue<Entry> getQueue() {
+ public BlockingQueue<Exchange> getQueue() {
return queue;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Fri Sep 21 10:33:54 2007
@@ -24,7 +24,6 @@
import org.apache.camel.Exchange;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.camel.component.seda.SedaEndpoint;
-import org.apache.camel.component.seda.SedaEndpoint.Entry;
/**
* An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
@@ -39,13 +38,13 @@
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception {
- BlockingQueue<SedaEndpoint.Entry> blockingQueue = getBlockingQueue(uri);
+ BlockingQueue<Exchange> blockingQueue = getBlockingQueue(uri);
return new SedaEndpoint(uri, this, blockingQueue);
}
- protected BlockingQueue<Entry> getBlockingQueue(String uri) {
+ protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
synchronized (queues) {
- BlockingQueue<Entry> answer = queues.get(uri);
+ BlockingQueue<Exchange> answer = queues.get(uri);
if (answer == null) {
answer = createQueue();
queues.put(uri, answer);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java?rev=578206&r1=578205&r2=578206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAsyncRouteTest.java Fri Sep 21 10:33:54 2007
@@ -76,8 +76,8 @@
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from(uri).to("seda:a");
- from("seda:a").process(new Processor() {
+ from(uri).thread(1).to("direct:a");
+ from("direct:a").process(new Processor() {
public void process(Exchange exchange) throws Exception {
file.set((File)exchange.getIn().getBody());
// Simulate a processing delay..