You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/15 19:32:16 UTC
svn commit: r944686 - in /camel/trunk/components:
camel-mina/src/main/java/org/apache/camel/component/mina/
camel-mina/src/test/java/org/apache/camel/component/mina/
camel-netty/src/main/java/org/apache/camel/component/netty/
camel-netty/src/main/java/...
Author: davsclaus
Date: Sat May 15 17:32:15 2010
New Revision: 944686
URL: http://svn.apache.org/viewvc?rev=944686&view=rev
Log:
CAMEL-2721: Added options on mina/netty to control what to do in consumer if no reply to send back.
Removed:
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaMulticastTest.java
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java Sat May 15 17:32:15 2010
@@ -19,6 +19,7 @@ package org.apache.camel.component.mina;
import java.nio.charset.Charset;
import java.util.List;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
import org.apache.mina.common.IoFilter;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
@@ -44,6 +45,8 @@ public class MinaConfiguration implement
private List<IoFilter> filters;
private boolean allowDefaultCodec = true;
private boolean disconnect;
+ private boolean disconnectOnNoReply = true;
+ private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
/**
* Returns a copy of this configuration
@@ -189,7 +192,7 @@ public class MinaConfiguration implement
}
public boolean isDatagramProtocol() {
- return protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast");
+ return protocol.equals("udp");
}
public void setAllowDefaultCodec(boolean allowDefaultCodec) {
@@ -207,4 +210,20 @@ public class MinaConfiguration implement
public void setDisconnect(boolean disconnect) {
this.disconnect = disconnect;
}
+
+ public boolean isDisconnectOnNoReply() {
+ return disconnectOnNoReply;
+ }
+
+ public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+ this.disconnectOnNoReply = disconnectOnNoReply;
+ }
+
+ public LoggingLevel getNoReplyLogLevel() {
+ return noReplyLogLevel;
+ }
+
+ public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+ this.noReplyLogLevel = noReplyLogLevel;
+ }
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat May 15 17:32:15 2010
@@ -22,6 +22,7 @@ import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.Logger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@ public class MinaConsumer extends Defaul
private final SocketAddress address;
private final IoAcceptor acceptor;
private boolean sync;
+ private Logger noReplyLogger;
public MinaConsumer(final MinaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -49,6 +51,7 @@ public class MinaConsumer extends Defaul
this.address = endpoint.getAddress();
this.acceptor = endpoint.getAcceptor();
this.sync = endpoint.getConfiguration().isSync();
+ this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
}
@Override
@@ -132,10 +135,15 @@ public class MinaConsumer extends Defaul
}
if (body == null) {
- // must close session if no data to write otherwise client will never receive a response
- // and wait forever (if not timing out)
- LOG.warn("Cannot write body since its null, closing session: " + exchange);
- session.close();
+ noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
+ if (endpoint.getConfiguration().isDisconnectOnNoReply()) {
+ // must close session if no data to write otherwise client will never receive a response
+ // and wait forever (if not timing out)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing session as no payload to send as reply at address: " + address);
+ }
+ session.close();
+ }
} else {
// we got a response to write
if (LOG.isDebugEnabled()) {
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat May 15 17:32:15 2010
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.Logger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@ public class MinaProducer extends Defaul
private long timeout;
private IoConnector connector;
private boolean sync;
+ private Logger noReplyLogger;
public MinaProducer(MinaEndpoint endpoint) {
super(endpoint);
@@ -56,6 +58,7 @@ public class MinaProducer extends Defaul
this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation();
this.timeout = endpoint.getConfiguration().getTimeout();
this.sync = endpoint.getConfiguration().isSync();
+ this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
}
@Override
@@ -80,7 +83,7 @@ public class MinaProducer extends Defaul
Object body = MinaPayloadHelper.getIn(endpoint, exchange);
if (body == null) {
- LOG.warn("No payload to send for exchange: " + exchange);
+ noReplyLogger.log("No payload to send for exchange: " + exchange);
return; // exit early since nothing to write
}
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010
@@ -17,6 +17,7 @@
package org.apache.camel.component.mina;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
@@ -39,12 +40,26 @@ public class MinaInOutWithForcedNoRespon
}
}
+ public void testNoResponseDisconnectOnNoReplyFalse() throws Exception {
+ try {
+ template.requestBody("mina:tcp://localhost:4445?sync=true&timeout=3000", "London");
+ fail("Should throw an exception");
+ } catch (RuntimeCamelException e) {
+ assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+ }
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("mina:tcp://localhost:4444?sync=true")
- .choice()
+ .choice()
+ .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+ .otherwise().transform(constant(null));
+
+ from("mina:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+ .choice()
.when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
.otherwise().transform(constant(null));
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Sat May 15 17:32:15 2010
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.URISupport;
import org.jboss.netty.channel.ChannelDownstreamHandler;
@@ -61,6 +62,8 @@ public class NettyConfiguration implemen
private boolean disconnect;
private boolean lazyChannelCreation = true;
private boolean transferExchange;
+ private boolean disconnectOnNoReply = true;
+ private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
/**
* Returns a copy of this configuration
@@ -154,6 +157,12 @@ public class NettyConfiguration implemen
if (settings.containsKey("transferExchange")) {
setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
}
+ if (settings.containsKey("disconnectOnNoReply")) {
+ setDisconnectOnNoReply(Boolean.valueOf((String) settings.get("disconnectOnNoReply")));
+ }
+ if (settings.containsKey("noReplyLogLevel")) {
+ setNoReplyLogLevel(LoggingLevel.valueOf((String) settings.get("noReplyLogLevel")));
+ }
}
public String getProtocol() {
@@ -392,6 +401,22 @@ public class NettyConfiguration implemen
this.transferExchange = transferExchange;
}
+ public boolean isDisconnectOnNoReply() {
+ return disconnectOnNoReply;
+ }
+
+ public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+ this.disconnectOnNoReply = disconnectOnNoReply;
+ }
+
+ public LoggingLevel getNoReplyLogLevel() {
+ return noReplyLogLevel;
+ }
+
+ public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+ this.noReplyLogLevel = noReplyLogLevel;
+ }
+
public String getAddress() {
return host + ":" + port;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sat May 15 17:32:15 2010
@@ -29,6 +29,7 @@ import org.apache.camel.ExchangeTimedOut
import org.apache.camel.ServicePoolAware;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.Logger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,12 +59,14 @@ public class NettyProducer extends Defau
private ConnectionlessBootstrap connectionlessClientBootstrap;
private ClientPipelineFactory clientPipelineFactory;
private ChannelPipeline clientPipeline;
+ private Logger noReplyLogger;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
super(nettyEndpoint);
this.configuration = configuration;
this.context = this.getEndpoint().getCamelContext();
this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
+ this.noReplyLogger = new Logger(LOG, configuration.getNoReplyLogLevel());
}
@Override
@@ -111,7 +114,7 @@ public class NettyProducer extends Defau
Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
if (body == null) {
- LOG.warn("No payload to send for exchange: " + exchange);
+ noReplyLogger.log("No payload to send for exchange: " + exchange);
return; // exit early since nothing to write
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Sat May 15 17:32:15 2010
@@ -23,6 +23,7 @@ import org.apache.camel.component.netty.
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.NettyPayloadHelper;
+import org.apache.camel.processor.Logger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,10 +38,12 @@ import org.jboss.netty.channel.SimpleCha
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class);
private NettyConsumer consumer;
-
+ private Logger noReplyLogger;
+
public ServerChannelHandler(NettyConsumer consumer) {
super();
this.consumer = consumer;
+ this.noReplyLogger = new Logger(LOG, consumer.getConfiguration().getNoReplyLogLevel());
}
@Override
@@ -110,10 +113,15 @@ public class ServerChannelHandler extend
}
if (body == null) {
- // must close session if no data to write otherwise client will never receive a response
- // and wait forever (if not timing out)
- LOG.warn("Cannot write body since its null, closing channel: " + exchange);
- NettyHelper.close(messageEvent.getChannel());
+ noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
+ if (consumer.getConfiguration().isDisconnectOnNoReply()) {
+ // must close session if no data to write otherwise client will never receive a response
+ // and wait forever (if not timing out)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing channel as no payload to send as reply at address: " + messageEvent.getRemoteAddress());
+ }
+ NettyHelper.close(messageEvent.getChannel());
+ }
} else {
// we got a body to write
if (LOG.isDebugEnabled()) {
Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.netty;
+import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
@@ -42,6 +43,16 @@ public class NettyInOutWithForcedNoRespo
}
}
+ @Test
+ public void testNoResponseDisconnectOnNoReplyFalse() throws Exception {
+ try {
+ template.requestBody("netty:tcp://localhost:4445?sync=true&timeout=3000", "London");
+ fail("Should throw an exception");
+ } catch (RuntimeCamelException e) {
+ assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+ }
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -50,6 +61,11 @@ public class NettyInOutWithForcedNoRespo
.choice()
.when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
.otherwise().transform(constant(null));
+
+ from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+ .choice()
+ .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+ .otherwise().transform(constant(null));
}
};
}