You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/03 12:02:02 UTC
svn commit: r749580 - in /servicemix/smx4/nmr/trunk/nmr/core/src:
main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java
Author: gnodet
Date: Tue Mar 3 11:02:02 2009
New Revision: 749580
URL: http://svn.apache.org/viewvc?rev=749580&view=rev
Log:
Make sure exceptions thrown by endpoints when processing exchanges are catched and result in ExchangeListner#exchangeFailed() being called
Modified:
servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java
Modified: servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java?rev=749580&r1=749579&r2=749580&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/main/java/org/apache/servicemix/nmr/core/ChannelImpl.java Tue Mar 3 11:02:02 2009
@@ -32,6 +32,7 @@
import org.apache.servicemix.nmr.api.NMR;
import org.apache.servicemix.nmr.api.Pattern;
import org.apache.servicemix.nmr.api.Role;
+import org.apache.servicemix.nmr.api.Status;
import org.apache.servicemix.nmr.api.event.ExchangeListener;
import org.apache.servicemix.nmr.api.internal.InternalChannel;
import org.apache.servicemix.nmr.api.internal.InternalEndpoint;
@@ -214,25 +215,29 @@
* @param exchange the exchange to process
*/
protected void process(InternalExchange exchange) {
- // Check for aborted exchanges
- if (exchange.getError() instanceof AbortedException) {
- return;
- }
- // Set destination endpoint
- if (exchange.getDestination() == null) {
- exchange.setDestination(endpoint);
- }
- // Change role
- exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
- // Call listeners
- for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
- l.exchangeDelivered(exchange);
+ try {
+ // Check for aborted exchanges
+ if (exchange.getError() instanceof AbortedException) {
+ return;
+ }
+ // Set destination endpoint
+ if (exchange.getDestination() == null) {
+ exchange.setDestination(endpoint);
+ }
+ // Change role
+ exchange.setRole(exchange.getRole() == Role.Provider ? Role.Consumer : Role.Provider);
+ // Call listeners
+ for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+ l.exchangeDelivered(exchange);
+ }
+ // Check if sendSync was used, in which case we need to unblock the other side
+ // rather than delivering the exchange
+ // TODO:
+ // Process exchange
+ endpoint.process(exchange);
+ } catch (RuntimeException e) {
+ handleFailure(exchange, e, false);
}
- // Check if sendSync was used, in which case we need to unblock the other side
- // rather than delivering the exchange
- // TODO:
- // Process exchange
- endpoint.process(exchange);
}
/**
@@ -261,12 +266,43 @@
// Dispatch in NMR
try {
nmr.getFlowRegistry().dispatch(exchange);
- } catch (RuntimeException t) {
- exchange.setError(t);
+ } catch (RuntimeException e) {
+ handleFailure(exchange, e, true);
+ }
+ }
+
+ protected void handleFailure(InternalExchange exchange, RuntimeException e, boolean dispatch) {
+ LOG.warn("Error processing exchange " + exchange, e);
+ if (dispatch) {
+ exchange.setError(e);
for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
l.exchangeFailed(exchange);
}
- throw t;
+ // Rethrow the exception so that sendSync are unblocked
+ throw e;
+ } else {
+ // If the exchange is active, let's try to send an error on behalf of the endpoint
+ if (exchange.getStatus() == Status.Active) {
+ try {
+ exchange.setError(e);
+ send(exchange);
+ } catch (RuntimeException e2) {
+ for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+ l.exchangeFailed(exchange);
+ }
+ }
+ } else {
+ exchange.setError(e);
+ Semaphore lock = exchange.getRole() == Role.Provider ? exchange.getConsumerLock(false)
+ : exchange.getProviderLock(false);
+ if (lock != null) {
+ lock.release();
+ }
+ for (ExchangeListener l : nmr.getListenerRegistry().getListeners(ExchangeListener.class)) {
+ l.exchangeFailed(exchange);
+ }
+ }
}
}
+
}
Modified: servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java?rev=749580&r1=749579&r2=749580&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/core/src/test/java/org/apache/servicemix/nmr/core/ChannelImplTest.java Tue Mar 3 11:02:02 2009
@@ -147,9 +147,33 @@
verify(listener);
}
+ public void testProcessingFailure() throws Exception {
+ IMocksControl control = EasyMock.createControl();
+ ExchangeListener listener = control.createMock(ExchangeListener.class);
+ control.makeThreadSafe(true);
+ nmr.getListenerRegistry().register(listener, null);
+
+ final Exchange e = ep1.channel.createExchange(Pattern.InOnly);
+
+ listener.exchangeSent(same(e));
+ listener.exchangeDelivered(same(e));
+ listener.exchangeSent(same(e));
+ listener.exchangeDelivered(same(e));
+ replay(listener);
+
+ ep2.throwException = true;
+ e.setTarget(ep1.channel.getNMR().getEndpointRegistry().lookup(ServiceHelper.createMap(Endpoint.NAME, "ep2")));
+ ep1.channel.sendSync(e);
+
+ verify(listener);
+ assertNotNull(ep2.exchange);
+ assertEquals(Status.Error, e.getStatus());
+ }
+
protected static class MyEndpoint implements Endpoint {
private Channel channel;
private Exchange exchange;
+ private boolean throwException;
public void setChannel(Channel channel) {
this.channel = channel;
@@ -158,6 +182,9 @@
public synchronized void process(Exchange exchange) {
this.exchange = exchange;
this.notifyAll();
+ if (throwException) {
+ throw new RuntimeException("Error");
+ }
}
public synchronized void done() {