You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/15 19:32:16 UTC

svn commit: r944686 - in /camel/trunk/components: camel-mina/src/main/java/org/apache/camel/component/mina/ camel-mina/src/test/java/org/apache/camel/component/mina/ camel-netty/src/main/java/org/apache/camel/component/netty/ camel-netty/src/main/java/...

Author: davsclaus
Date: Sat May 15 17:32:15 2010
New Revision: 944686

URL: http://svn.apache.org/viewvc?rev=944686&view=rev
Log:
CAMEL-2721: Added options on mina/netty to control what to do in consumer if no reply to send back.

Removed:
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaMulticastTest.java
Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConfiguration.java Sat May 15 17:32:15 2010
@@ -19,6 +19,7 @@ package org.apache.camel.component.mina;
 import java.nio.charset.Charset;
 import java.util.List;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.mina.common.IoFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFactory;
@@ -44,6 +45,8 @@ public class MinaConfiguration implement
     private List<IoFilter> filters;
     private boolean allowDefaultCodec = true;
     private boolean disconnect;
+    private boolean disconnectOnNoReply = true;
+    private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
 
     /**
      * Returns a copy of this configuration
@@ -189,7 +192,7 @@ public class MinaConfiguration implement
     }
 
     public boolean isDatagramProtocol() {
-        return protocol.equals("udp") || protocol.equals("mcast") || protocol.equals("multicast");
+        return protocol.equals("udp");
     }
     
     public void setAllowDefaultCodec(boolean allowDefaultCodec) {
@@ -207,4 +210,20 @@ public class MinaConfiguration implement
     public void setDisconnect(boolean disconnect) {
         this.disconnect = disconnect;
     }
+
+    public boolean isDisconnectOnNoReply() {
+        return disconnectOnNoReply;
+    }
+
+    public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+        this.disconnectOnNoReply = disconnectOnNoReply;
+    }
+
+    public LoggingLevel getNoReplyLogLevel() {
+        return noReplyLogLevel;
+    }
+
+    public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+        this.noReplyLogLevel = noReplyLogLevel;
+    }
 }

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat May 15 17:32:15 2010
@@ -22,6 +22,7 @@ import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.processor.Logger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@ public class MinaConsumer extends Defaul
     private final SocketAddress address;
     private final IoAcceptor acceptor;
     private boolean sync;
+    private Logger noReplyLogger;
 
     public MinaConsumer(final MinaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -49,6 +51,7 @@ public class MinaConsumer extends Defaul
         this.address = endpoint.getAddress();
         this.acceptor = endpoint.getAcceptor();
         this.sync = endpoint.getConfiguration().isSync();
+        this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
     }
 
     @Override
@@ -132,10 +135,15 @@ public class MinaConsumer extends Defaul
                 }
 
                 if (body == null) {
-                    // must close session if no data to write otherwise client will never receive a response
-                    // and wait forever (if not timing out)
-                    LOG.warn("Cannot write body since its null, closing session: " + exchange);
-                    session.close();
+                    noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
+                    if (endpoint.getConfiguration().isDisconnectOnNoReply()) {
+                        // must close session if no data to write otherwise client will never receive a response
+                        // and wait forever (if not timing out)
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Closing session as no payload to send as reply at address: " + address);
+                        }
+                        session.close();
+                    }
                 } else {
                     // we got a response to write
                     if (LOG.isDebugEnabled()) {

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat May 15 17:32:15 2010
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.Logger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@ public class MinaProducer extends Defaul
     private long timeout;
     private IoConnector connector;
     private boolean sync;
+    private Logger noReplyLogger;
 
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
@@ -56,6 +58,7 @@ public class MinaProducer extends Defaul
         this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation();
         this.timeout = endpoint.getConfiguration().getTimeout();
         this.sync = endpoint.getConfiguration().isSync();
+        this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
     }
 
     @Override
@@ -80,7 +83,7 @@ public class MinaProducer extends Defaul
 
         Object body = MinaPayloadHelper.getIn(endpoint, exchange);
         if (body == null) {
-            LOG.warn("No payload to send for exchange: " + exchange);
+            noReplyLogger.log("No payload to send for exchange: " + exchange);
             return; // exit early since nothing to write
         }
 

Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010
@@ -17,6 +17,7 @@
 package org.apache.camel.component.mina;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 
@@ -39,12 +40,26 @@ public class MinaInOutWithForcedNoRespon
         }
     }
 
+    public void testNoResponseDisconnectOnNoReplyFalse() throws Exception {
+        try {
+            template.requestBody("mina:tcp://localhost:4445?sync=true&timeout=3000", "London");
+            fail("Should throw an exception");
+        } catch (RuntimeCamelException e) {
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+        }
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
                 from("mina:tcp://localhost:4444?sync=true")
-                        .choice()
+                    .choice()
+                        .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+                        .otherwise().transform(constant(null));
+
+                from("mina:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+                    .choice()
                         .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
                         .otherwise().transform(constant(null));
             }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Sat May 15 17:32:15 2010
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.URISupport;
 import org.jboss.netty.channel.ChannelDownstreamHandler;
@@ -61,6 +62,8 @@ public class NettyConfiguration implemen
     private boolean disconnect;
     private boolean lazyChannelCreation = true;
     private boolean transferExchange;
+    private boolean disconnectOnNoReply = true;
+    private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
 
     /**
      * Returns a copy of this configuration
@@ -154,6 +157,12 @@ public class NettyConfiguration implemen
         if (settings.containsKey("transferExchange")) {
             setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
         }
+        if (settings.containsKey("disconnectOnNoReply")) {
+            setDisconnectOnNoReply(Boolean.valueOf((String) settings.get("disconnectOnNoReply")));
+        }
+        if (settings.containsKey("noReplyLogLevel")) {
+            setNoReplyLogLevel(LoggingLevel.valueOf((String) settings.get("noReplyLogLevel")));
+        }
     }
 
     public String getProtocol() {
@@ -392,6 +401,22 @@ public class NettyConfiguration implemen
         this.transferExchange = transferExchange;
     }
 
+    public boolean isDisconnectOnNoReply() {
+        return disconnectOnNoReply;
+    }
+
+    public void setDisconnectOnNoReply(boolean disconnectOnNoReply) {
+        this.disconnectOnNoReply = disconnectOnNoReply;
+    }
+
+    public LoggingLevel getNoReplyLogLevel() {
+        return noReplyLogLevel;
+    }
+
+    public void setNoReplyLogLevel(LoggingLevel noReplyLogLevel) {
+        this.noReplyLogLevel = noReplyLogLevel;
+    }
+
     public String getAddress() {
         return host + ":" + port;
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Sat May 15 17:32:15 2010
@@ -29,6 +29,7 @@ import org.apache.camel.ExchangeTimedOut
 import org.apache.camel.ServicePoolAware;
 import org.apache.camel.component.netty.handlers.ClientChannelHandler;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.Logger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,12 +59,14 @@ public class NettyProducer extends Defau
     private ConnectionlessBootstrap connectionlessClientBootstrap;
     private ClientPipelineFactory clientPipelineFactory;
     private ChannelPipeline clientPipeline;
+    private Logger noReplyLogger;
 
     public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
         super(nettyEndpoint);
         this.configuration = configuration;
         this.context = this.getEndpoint().getCamelContext();
         this.allChannels = new DefaultChannelGroup("NettyProducer-" + nettyEndpoint.getEndpointUri());
+        this.noReplyLogger = new Logger(LOG, configuration.getNoReplyLogLevel());
     }
 
     @Override
@@ -111,7 +114,7 @@ public class NettyProducer extends Defau
 
         Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
         if (body == null) {
-            LOG.warn("No payload to send for exchange: " + exchange);
+            noReplyLogger.log("No payload to send for exchange: " + exchange);
             return; // exit early since nothing to write
         }
 

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Sat May 15 17:32:15 2010
@@ -23,6 +23,7 @@ import org.apache.camel.component.netty.
 import org.apache.camel.component.netty.NettyConsumer;
 import org.apache.camel.component.netty.NettyHelper;
 import org.apache.camel.component.netty.NettyPayloadHelper;
+import org.apache.camel.processor.Logger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,10 +38,12 @@ import org.jboss.netty.channel.SimpleCha
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     private static final transient Log LOG = LogFactory.getLog(ServerChannelHandler.class);
     private NettyConsumer consumer;
-    
+    private Logger noReplyLogger;
+
     public ServerChannelHandler(NettyConsumer consumer) {
         super();
         this.consumer = consumer;    
+        this.noReplyLogger = new Logger(LOG, consumer.getConfiguration().getNoReplyLogLevel());
     }
 
     @Override
@@ -110,10 +113,15 @@ public class ServerChannelHandler extend
         }
 
         if (body == null) {
-            // must close session if no data to write otherwise client will never receive a response
-            // and wait forever (if not timing out)
-            LOG.warn("Cannot write body since its null, closing channel: " + exchange);
-            NettyHelper.close(messageEvent.getChannel());
+            noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
+            if (consumer.getConfiguration().isDisconnectOnNoReply()) {
+                // must close session if no data to write otherwise client will never receive a response
+                // and wait forever (if not timing out)
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Closing channel as no payload to send as reply at address: " + messageEvent.getRemoteAddress());
+                }
+                NettyHelper.close(messageEvent.getChannel());
+            }
         } else {
             // we got a body to write
             if (LOG.isDebugEnabled()) {

Modified: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=944686&r1=944685&r2=944686&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Sat May 15 17:32:15 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.netty;
 
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -42,6 +43,16 @@ public class NettyInOutWithForcedNoRespo
         }
     }
 
+    @Test
+    public void testNoResponseDisconnectOnNoReplyFalse() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:4445?sync=true&timeout=3000", "London");
+            fail("Should throw an exception");
+        } catch (RuntimeCamelException e) {
+            assertIsInstanceOf(ExchangeTimedOutException.class, e.getCause());
+        }
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -50,6 +61,11 @@ public class NettyInOutWithForcedNoRespo
                     .choice()
                         .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
                         .otherwise().transform(constant(null));
+
+                from("netty:tcp://localhost:4445?sync=true&disconnectOnNoReply=false&noReplyLogLevel=OFF")
+                    .choice()
+                        .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+                        .otherwise().transform(constant(null));
             }
         };
     }