You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/06/03 09:56:16 UTC

svn commit: r411403 - in /incubator/servicemix/trunk/servicemix-core/src: main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java

Author: gnodet
Date: Sat Jun  3 00:56:16 2006
New Revision: 411403

URL: http://svn.apache.org/viewvc?rev=411403&view=rev
Log:
Fix transactions tests: the Async receiver was not async :(

Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=411403&r1=411402&r2=411403&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Sat Jun  3 00:56:16 2006
@@ -367,7 +367,8 @@
             if (finished && 
                 me.getTxLock() == null &&
                 me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED &&
-                me.isPushDelivery() == false) {
+                me.isPushDelivery() == false &&
+                me.getRole() == Role.CONSUMER) {
                 me.setTransactionContext(null);
             }
             container.sendExchange(mirror);

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java?rev=411403&r1=411402&r2=411403&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/TransactionsTest.java Sat Jun  3 00:56:16 2006
@@ -2,6 +2,8 @@
 
 import java.sql.Connection;
 
+import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.ExchangeStatus;
 import javax.jbi.messaging.InOnly;
 import javax.jbi.messaging.InOut;
@@ -30,7 +32,6 @@
 import org.apache.servicemix.client.DefaultServiceMixClient;
 import org.apache.servicemix.client.ServiceMixClient;
 import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.components.util.OutBinding;
 import org.apache.servicemix.jbi.container.JBIContainer;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 import org.apache.servicemix.jbi.nmr.flow.Flow;
@@ -427,16 +428,40 @@
         assertNotNull(store.load(me.getExchangeId()));
    }
     
-    protected class Async extends OutBinding {
+    protected class Async extends ComponentSupport implements Runnable {
         private boolean sync;
         private boolean rollback;
+        private Thread runner;
+        private boolean running;
         public Async(boolean sync, boolean rollback) {
             this.sync = sync;
             this.rollback = rollback;
             setService(new QName("service"));
             setEndpoint("endpoint");
         }
-        protected void process(MessageExchange exchange, NormalizedMessage message) throws Exception {
+        public synchronized void start() throws JBIException {
+            if (!running) {
+                running = true;
+                runner = new Thread(this);
+                runner.start();
+            }
+        }
+        public void run() {
+            while (running) {
+                try {
+                    DeliveryChannel deliveryChannel = getContext().getDeliveryChannel();
+                    MessageExchange messageExchange = deliveryChannel.accept();
+                    process(messageExchange);
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        public synchronized void stop() throws JBIException {
+            running = false;
+        }
+        protected void process(MessageExchange exchange) throws Exception {
             if (exchange.getStatus() != ExchangeStatus.ACTIVE) {
                 return;
             }