You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/03 12:02:02 UTC

svn commit: r749580 - in /servicemix/smx4/nmr/trunk/nmr/core/src: main/java/org/apache/servicemix/nmr/core/ChannelImpl.java test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java

Author: gnodet
Date: Tue Mar  3 11:02:02 2009
New Revision: 749580

URL: http://svn.apache.org/viewvc?rev=749580&view=rev
Log:
Make sure exceptions thrown by endpoints when processing exchanges are catched and result in ExchangeListner#exchangeFailed() being called

Modified:
    servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
    servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java?rev=749580&r1=749579&r2=749580&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java Tue Mar  3 11:02:02 2009
@@ -32,6 +32,7 @@
 import org.apache.servicemix.nmr.api.NMR;
 import org.apache.servicemix.nmr.api.Pattern;
 import org.apache.servicemix.nmr.api.Role;
+import org.apache.servicemix.nmr.api.Status;
 import org.apache.servicemix.nmr.api.event.ExchangeListener;
 import org.apache.servicemix.nmr.api.internal.InternalChannel;
 import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
@@ -214,25 +215,29 @@
      * @param exchange the exchange to process
      */
     protected void process(InternalExchange exchange) {
-        // Check for aborted exchanges
-        if (exchange.getError() instanceof AbortedException) {
-            return;
-        }
-        // Set destination endpoint
-        if (exchange.getDestination() == null) {
-            exchange.setDestination(endpoint);
-        }
-        // Change role
-        exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
-        // Call listeners
-        for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
-            l.exchangeDelivered(exchange);
+        try {
+            // Check for aborted exchanges
+            if (exchange.getError() instanceof AbortedException) {
+                return;
+            }
+            // Set destination endpoint
+            if (exchange.getDestination() == null) {
+                exchange.setDestination(endpoint);
+            }
+            // Change role
+            exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
+            // Call listeners
+            for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+                l.exchangeDelivered(exchange);
+            }
+            // Check if sendSync was used, in which case we need to unblock the other side
+            // rather than delivering the exchange
+            // TODO:
+            // Process exchange
+            endpoint.process(exchange);
+        } catch (RuntimeException e) {
+            handleFailure(exchange, e, false);
         }
-        // Check if sendSync was used, in which case we need to unblock the other side
-        // rather than delivering the exchange
-        // TODO:
-        // Process exchange
-        endpoint.process(exchange);
     }                             
 
     /**
@@ -261,12 +266,43 @@
         // Dispatch in NMR
         try {
             nmr.getFlowRegistry().dispatch(exchange);
-        } catch (RuntimeException t) {
-            exchange.setError(t);
+        } catch (RuntimeException e) {
+            handleFailure(exchange, e, true);
+        }
+    }
+
+    protected void handleFailure(InternalExchange exchange, RuntimeException e, boolean dispatch) {
+        LOG.warn("Error processing exchange " + exchange, e);
+        if (dispatch) {
+            exchange.setError(e);
             for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
                 l.exchangeFailed(exchange);
             }
-            throw t;
+            // Rethrow the exception so that sendSync are unblocked
+            throw e;
+        } else {
+            // If the exchange is active, let's try to send an error on behalf of the endpoint
+            if (exchange.getStatus() == Status.Active) {
+                try {
+                    exchange.setError(e);
+                    send(exchange);
+                } catch (RuntimeException e2) {
+                    for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+                        l.exchangeFailed(exchange);
+                    }
+                }
+            } else {
+                exchange.setError(e);
+                Semaphore lock = exchange.getRole() == Role.Provider ? exchange.getConsumerLock(false)
+                        : exchange.getProviderLock(false);
+                if (lock != null) {
+                    lock.release();
+                }
+                for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+                    l.exchangeFailed(exchange);
+                }
+            }
         }
     }
+
 }

Modified: servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java?rev=749580&r1=749579&r2=749580&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java Tue Mar  3 11:02:02 2009
@@ -147,9 +147,33 @@
         verify(listener);
     }
 
+    public void testProcessingFailure() throws Exception {
+        IMocksControl control = EasyMock.createControl();
+        ExchangeListener listener = control.createMock(ExchangeListener.class);
+        control.makeThreadSafe(true);
+        nmr.getListenerRegistry().register(listener, null);
+
+        final Exchange e = ep1.channel.createExchange(Pattern.InOnly);
+
+        listener.exchangeSent(same(e));
+        listener.exchangeDelivered(same(e));
+        listener.exchangeSent(same(e));
+        listener.exchangeDelivered(same(e));
+        replay(listener);
+
+        ep2.throwException = true;
+        e.setTarget(ep1.channel.getNMR().getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, "ep2")));
+        ep1.channel.sendSync(e);
+
+        verify(listener);
+        assertNotNull(ep2.exchange);
+        assertEquals(Status.Error, e.getStatus());
+    }
+
     protected static class MyEndpoint implements Endpoint {
         private Channel channel;
         private Exchange exchange;
+        private boolean throwException;
 
         public void setChannel(Channel channel) {
             this.channel = channel;
@@ -158,6 +182,9 @@
         public synchronized void process(Exchange exchange) {
             this.exchange = exchange;
             this.notifyAll();
+            if (throwException) {
+                throw new RuntimeException("Error");
+            }
         }
 
         public synchronized void done() {