You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/04 16:47:52 UTC
[3/3] git commit: CAMEL-5819: Added requestTimeout option to netty
producer. As well options to control logging level on netty consumer,
so its less noisy by default about channel closed,
when clients disconnect abruptly.
CAMEL-5819: Added requestTimeout option to netty producer. As well options to control logging level on netty consumer, so its less noisy by default about channel closed, when clients disconnect abruptly.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/048601dc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/048601dc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/048601dc
Branch: refs/heads/camel-2.11.x
Commit: 048601dc5aa28d432f3751f23be8dd53ecd1426a
Parents: bcb0f00
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jun 4 16:46:00 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jun 4 16:47:30 2013 +0200
----------------------------------------------------------------------
.../netty/DefaultClientPipelineFactory.java | 11 ++
.../camel/component/netty/NettyConfiguration.java | 27 +++++
.../camel/component/netty/NettyConsumer.java | 1 +
.../netty/NettyConsumerExceptionHandler.java | 66 +++++++++++
.../netty/handlers/ServerChannelHandler.java | 8 +-
.../component/netty/NettyRequestTimeoutTest.java | 84 +++++++++++++++
6 files changed, 193 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
index cc7cc05..9503fac 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.netty;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -27,6 +28,7 @@ import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,6 +77,15 @@ public class DefaultClientPipelineFactory extends ClientPipelineFactory {
addToPipeline("encoder-" + x, channelPipeline, encoder);
}
+ // do we use request timeout?
+ if (producer.getConfiguration().getRequestTimeout() > 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Using request timeout {} millis", producer.getConfiguration().getRequestTimeout());
+ }
+ ChannelHandler timeout = new ReadTimeoutHandler(NettyComponent.getTimer(), producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
+ addToPipeline("timeout", channelPipeline, timeout);
+ }
+
// our handler must be added last
addToPipeline("handler", channelPipeline, new ClientChannelHandler(producer));
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index ddbc58c..af86443 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -47,6 +47,7 @@ public class NettyConfiguration implements Cloneable {
private boolean tcpNoDelay = true;
private boolean broadcast;
private long connectTimeout = 10000;
+ private long requestTimeout;
private boolean reuseAddress = true;
private boolean sync = true;
private boolean textline;
@@ -74,6 +75,8 @@ public class NettyConfiguration implements Cloneable {
private boolean transferExchange;
private boolean disconnectOnNoReply = true;
private LoggingLevel noReplyLogLevel = LoggingLevel.WARN;
+ private LoggingLevel serverExceptionCaughtLogLevel = LoggingLevel.WARN;
+ private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
private boolean allowDefaultCodec = true;
private ClientPipelineFactory clientPipelineFactory;
private ServerPipelineFactory serverPipelineFactory;
@@ -275,6 +278,14 @@ public class NettyConfiguration implements Cloneable {
this.connectTimeout = connectTimeout;
}
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
public boolean isReuseAddress() {
return reuseAddress;
}
@@ -515,6 +526,22 @@ public class NettyConfiguration implements Cloneable {
this.noReplyLogLevel = noReplyLogLevel;
}
+ public LoggingLevel getServerExceptionCaughtLogLevel() {
+ return serverExceptionCaughtLogLevel;
+ }
+
+ public void setServerExceptionCaughtLogLevel(LoggingLevel serverExceptionCaughtLogLevel) {
+ this.serverExceptionCaughtLogLevel = serverExceptionCaughtLogLevel;
+ }
+
+ public LoggingLevel getServerClosedChannelExceptionCaughtLogLevel() {
+ return serverClosedChannelExceptionCaughtLogLevel;
+ }
+
+ public void setServerClosedChannelExceptionCaughtLogLevel(LoggingLevel serverClosedChannelExceptionCaughtLogLevel) {
+ this.serverClosedChannelExceptionCaughtLogLevel = serverClosedChannelExceptionCaughtLogLevel;
+ }
+
public boolean isAllowDefaultCodec() {
return allowDefaultCodec;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
index 595e587..46dbb5b 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
@@ -56,6 +56,7 @@ public class NettyConsumer extends DefaultConsumer {
this.context = this.getEndpoint().getCamelContext();
this.configuration = configuration;
this.allChannels = new DefaultChannelGroup("NettyConsumer-" + nettyEndpoint.getEndpointUri());
+ setExceptionHandler(new NettyConsumerExceptionHandler(this));
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
new file mode 100644
index 0000000..845b189
--- /dev/null
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumerExceptionHandler.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.channels.ClosedChannelException;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyConsumerExceptionHandler implements ExceptionHandler {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
+ private final CamelLogger logger;
+ private final LoggingLevel closedLoggingLevel;
+
+ public NettyConsumerExceptionHandler(NettyConsumer consumer) {
+ this.logger = new CamelLogger(LOG, consumer.getConfiguration().getServerExceptionCaughtLogLevel());
+ this.closedLoggingLevel = consumer.getConfiguration().getServerClosedChannelExceptionCaughtLogLevel();
+ }
+
+ @Override
+ public void handleException(Throwable exception) {
+ handleException(null, null, exception);
+ }
+
+ @Override
+ public void handleException(String message, Throwable exception) {
+ handleException(message, null, exception);
+ }
+
+ @Override
+ public void handleException(String message, Exchange exchange, Throwable exception) {
+ try {
+ String msg = CamelExchangeException.createExceptionMessage(message, exchange, exception);
+ boolean closed = ObjectHelper.getException(ClosedChannelException.class, exception) != null;
+ if (closed) {
+ logger.log(msg, exception, closedLoggingLevel);
+ } else {
+ logger.log(msg, exception);
+ }
+ } catch (Throwable e) {
+ // the logging exception handler must not cause new exceptions to occur
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 8e0b4b0..317d377 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -41,8 +41,8 @@ import org.slf4j.LoggerFactory;
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
// use NettyConsumer as logger to make it easier to read the logs as this is part of the consumer
private static final transient Logger LOG = LoggerFactory.getLogger(NettyConsumer.class);
- private NettyConsumer consumer;
- private CamelLogger noReplyLogger;
+ private final NettyConsumer consumer;
+ private final CamelLogger noReplyLogger;
public ServerChannelHandler(NettyConsumer consumer) {
this.consumer = consumer;
@@ -71,8 +71,8 @@ public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
// only close if we are still allowed to run
if (consumer.isRunAllowed()) {
- LOG.warn("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
-
+ // let the exception handler deal with it
+ consumer.getExceptionHandler().handleException("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
// close channel in case an exception was thrown
NettyHelper.close(exceptionEvent.getChannel());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/048601dc/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
new file mode 100644
index 0000000..94f9e79
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class NettyRequestTimeoutTest extends BaseNettyTest {
+
+ @Test
+ public void testRequestTimeoutOK() throws Exception {
+ String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=5000", "Hello Camel", String.class);
+ assertEquals("Bye World", out);
+ }
+
+ @Test
+ public void testRequestTimeout() throws Exception {
+ try {
+ template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+ assertNotNull(cause);
+ }
+ }
+
+ @Test
+ public void testRequestTimeoutAndOk() throws Exception {
+ try {
+ template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+ assertNotNull(cause);
+ }
+
+ // now we try again but this time the is no delay on server and thus faster
+ String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello World", String.class);
+ assertEquals("Bye World", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+
+ if (body.contains("Camel")) {
+ Thread.sleep(3000);
+ }
+ }
+ })
+ .transform().constant("Bye World");
+
+ }
+ };
+ }
+}