You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/04 16:47:52 UTC

[3/3] git commit: CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging level on netty consumer, so its less noisy by default about channel closed, when clients disconnect abruptly.

CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging level on netty consumer, so its less noisy by default about channel closed, when clients disconnect abruptly.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/048601dc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/048601dc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/048601dc

Branch: refs/heads/camel-2.11.x
Commit: 048601dc5aa28d432f3751f23be8dd53ecd1426a
Parents: bcb0f00
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jun 4 16:46:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 4 16:47:30 2013 +0200

----------------------------------------------------------------------
 .../netty/DefaultClientPipelineFactory.java        |   11 ++
 .../camel/component/netty/NettyConfiguration.java  |   27 +++++
 .../camel/component/netty/NettyConsumer.java       |    1 +
 .../netty/NettyConsumerExceptionHandler.java       |   66 +++++++++++
 .../netty/handlers/ServerChannelHandler.java       |    8 +-
 .../component/netty/NettyRequestTimeoutTest.java   |   84 +++++++++++++++
 6 files changed, 193 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
index cc7cc05..9503fac 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.netty;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
@@ -27,6 +28,7 @@ import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,6 +77,15 @@ public class DefaultClientPipelineFactory extends ClientPipelineFactory  {
             addToPipeline("encoder-" + x, channelPipeline, encoder);
         }
 
+        // do we use request timeout?
+        if (producer.getConfiguration().getRequestTimeout() > 0) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
+            }
+            ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
+            addToPipeline("timeout", channelPipeline, timeout);
+        }
+
         // our handler must be added last
         addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
 

http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index ddbc58c..af86443 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -47,6 +47,7 @@ public class NettyConfiguration implements Cloneable {
     private boolean tcpNoDelay = true;
     private boolean broadcast;
     private long connectTimeout = 10000;
+    private long requestTimeout;
     private boolean reuseAddress = true;
     private boolean sync = true;
     private boolean textline;
@@ -74,6 +75,8 @@ public class NettyConfiguration implements Cloneable {
     private boolean transferExchange;
     private boolean disconnectOnNoReply = true;
     private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN;
+    private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
     private boolean allowDefaultCodec = true;
     private ClientPipelineFactory clientPipelineFactory;
     private ServerPipelineFactory serverPipelineFactory;
@@ -275,6 +278,14 @@ public class NettyConfiguration implements Cloneable {
         this.connectTimeout = connectTimeout;
     }
 
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
+    }
+
     public boolean isReuseAddress() {
         return reuseAddress;
     }
@@ -515,6 +526,22 @@ public class NettyConfiguration implements Cloneable {
         this.noReplyLogLevel = noReplyLogLevel;
     }
 
+    public LoggingLevel getServerExceptionCaughtLogLevel() {
+        return serverExceptionCaughtLogLevel;
+    }
+
+    public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel) {
+        this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel;
+    }
+
+    public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() {
+        return serverClosedChannelExceptionCaughtLogLevel;
+    }
+
+    public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel) {
+        this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel;
+    }
+
     public boolean isAllowDefaultCodec() {
         return allowDefaultCodec;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index 595e587..46dbb5b 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -56,6 +56,7 @@ public class NettyConsumer extends DefaultConsumer {
         this.context = this.getEndpoint().getCamelContext();
         this.configuration = configuration;
         this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri());
+        setExceptionHandler(new NettyConsumerExceptionHandler(this));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
new file mode 100644
index 0000000..845b189
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumerExceptionHandler implements ExceptionHandler {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+    private final CamelLogger logger;
+    private final LoggingLevel closedLoggingLevel;
+
+    public NettyConsumerExceptionHandler(NettyConsumer consumer) {
+        this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel());
+        this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel();
+    }
+
+    @Override
+    public void handleException(Throwable exception) {
+        handleException(null, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Throwable exception) {
+        handleException(message, null, exception);
+    }
+
+    @Override
+    public void handleException(String message, Exchange exchange, Throwable exception) {
+        try {
+            String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception);
+            boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception) != null;
+            if (closed) {
+                logger.log(msg, exception, closedLoggingLevel);
+            } else {
+                logger.log(msg, exception);
+            }
+        } catch (Throwable e) {
+            // the logging exception handler must not cause new exceptions to occur
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 8e0b4b0..317d377 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -41,8 +41,8 @@ import org.slf4j.LoggerFactory;
 public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     // use NettyConsumer as logger to make it easier to read the logs as this is part of the consumer
     private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
-    private NettyConsumer consumer;
-    private CamelLogger noReplyLogger;
+    private final NettyConsumer consumer;
+    private final CamelLogger noReplyLogger;
 
     public ServerChannelHandler(NettyConsumer consumer) {
         this.consumer = consumer;    
@@ -71,8 +71,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
         // only close if we are still allowed to run
         if (consumer.isRunAllowed()) {
-            LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
-
+            // let the exception handler deal with it
+            consumer.getExceptionHandler().handleException("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
             // close channel in case an exception was thrown
             NettyHelper.close(exceptionEvent.getChannel());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
new file mode 100644
index 0000000..94f9e79
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class NettyRequestTimeoutTest extends BaseNettyTest {
+
+    @Test
+    public void testRequestTimeoutOK() throws Exception {
+        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=5000", "Hello Camel", String.class);
+        assertEquals("Bye World", out);
+    }
+
+    @Test
+    public void testRequestTimeout() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+
+    @Test
+    public void testRequestTimeoutAndOk() throws Exception {
+        try {
+            template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+
+        // now we try again but this time the is no delay on server and thus faster
+        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello World", String.class);
+        assertEquals("Bye World", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            String body = exchange.getIn().getBody(String.class);
+
+                            if (body.contains("Camel")) {
+                                Thread.sleep(3000);
+                            }
+                        }
+                    })
+                    .transform().constant("Bye World");
+
+            }
+        };
+    }
+}