You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/03 09:56:16 UTC
svn commit: r411403 - in /incubator/servicemix/trunk/servicemix-core/src:
main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
Author: gnodet
Date: Sat Jun 3 00:56:16 2006
New Revision: 411403
URL: http://svn.apache.org/viewvc?rev=411403&view=rev
Log:
Fix transactions tests: the Async receiver was not async :(
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=411403&r1=411402&r2=411403&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Sat Jun 3 00:56:16 2006
@@ -367,7 +367,8 @@
if (finished &&
me.getTxLock() == null &&
me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED &&
- me.isPushDelivery() == false) {
+ me.isPushDelivery() == false &&
+ me.getRole() == Role.CONSUMER) {
me.setTransactionContext(null);
}
container.sendExchange(mirror);
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java?rev=411403&r1=411402&r2=411403&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java Sat Jun 3 00:56:16 2006
@@ -2,6 +2,8 @@
import java.sql.Connection;
+import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.InOut;
@@ -30,7 +32,6 @@
import org.apache.servicemix.client.DefaultServiceMixClient;
import org.apache.servicemix.client.ServiceMixClient;
import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.components.util.OutBinding;
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.jaxp.StringSource;
import org.apache.servicemix.jbi.nmr.flow.Flow;
@@ -427,16 +428,40 @@
assertNotNull(store.load(me.getExchangeId()));
}
- protected class Async extends OutBinding {
+ protected class Async extends ComponentSupport implements Runnable {
private boolean sync;
private boolean rollback;
+ private Thread runner;
+ private boolean running;
public Async(boolean sync, boolean rollback) {
this.sync = sync;
this.rollback = rollback;
setService(new QName("service"));
setEndpoint("endpoint");
}
- protected void process(MessageExchange exchange, NormalizedMessage message) throws Exception {
+ public synchronized void start() throws JBIException {
+ if (!running) {
+ running = true;
+ runner = new Thread(this);
+ runner.start();
+ }
+ }
+ public void run() {
+ while (running) {
+ try {
+ DeliveryChannel deliveryChannel = getContext().getDeliveryChannel();
+ MessageExchange messageExchange = deliveryChannel.accept();
+ process(messageExchange);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public synchronized void stop() throws JBIException {
+ running = false;
+ }
+ protected void process(MessageExchange exchange) throws Exception {
if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
return;
}