You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2015/07/23 19:55:04 UTC

[1/2] camel git commit: CAMEL-9003 - Use header for overriding request timeouts instead

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x e8e89bb08 -> b587d4027
  refs/heads/master 9ce76f97e -> 5187b5dea


CAMEL-9003 - Use header for overriding request timeouts instead


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b587d402
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b587d402
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b587d402

Branch: refs/heads/camel-2.15.x
Commit: b587d4027b38c319fd2fbf9cd111558bfeed96e5
Parents: e8e89bb
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Jul 23 15:23:20 2015 -0230
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Jul 23 15:23:20 2015 -0230

----------------------------------------------------------------------
 .../camel/component/netty/NettyConstants.java   |  1 +
 .../camel/component/netty/NettyEndpoint.java    | 14 +-----
 .../camel/component/netty/NettyProducer.java    | 14 ++++++
 .../netty/handlers/ClientChannelHandler.java    | 15 +++---
 .../netty/NettyCachedRequestTimeoutTest.java    | 53 --------------------
 .../netty/NettyRequestTimeoutTest.java          | 11 ++++
 .../camel/component/netty4/NettyConstants.java  |  1 +
 .../camel/component/netty4/NettyEndpoint.java   | 12 +----
 .../camel/component/netty4/NettyProducer.java   | 13 +++++
 .../netty4/handlers/ClientChannelHandler.java   | 10 ++--
 .../netty4/NettyCachedRequestTimeoutTest.java   | 53 --------------------
 .../netty4/NettyRequestTimeoutTest.java         | 11 ++++
 12 files changed, 65 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
index 4082c7d..eed8266 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
@@ -28,6 +28,7 @@ public final class NettyConstants {
     public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
     public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
     public static final String NETTY_LOCAL_ADDRESS = "CamelNettyLocalAddress";
+    public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
     public static final String NETTY_SSL_SESSION = "CamelNettySSLSession";
     public static final String NETTY_SSL_CLIENT_CERT_SUBJECT_NAME = "CamelNettySSLClientCertSubjectName";
     public static final String NETTY_SSL_CLIENT_CERT_ISSUER_NAME = "CamelNettySSLClientCertIssuerName";

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
index 81b8648..e8fcbcc 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
@@ -98,20 +98,10 @@ public class NettyEndpoint extends DefaultEndpoint {
     @Override
     protected String createEndpointUri() {
         ObjectHelper.notNull(configuration, "configuration");
-        return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() 
-                + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : "");
+        return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); 
     }
 
     @Override
-    public String getEndpointUri() {
-        if (getConfiguration().getRequestTimeout() > 0) {
-            return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout();   
-        } else {
-            return super.getEndpointUri();    
-        }        
-    }
-    
-    @Override
     protected void doStart() throws Exception {
         ObjectHelper.notNull(timer, "timer");
     }
@@ -177,4 +167,4 @@ public class NettyEndpoint extends DefaultEndpoint {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index bf72284..1bfe547 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -39,9 +39,11 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.SucceededChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -52,6 +54,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
 import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 import org.jboss.netty.util.ExternalResourceReleasable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,6 +249,17 @@ public class NettyProducer extends DefaultAsyncProducer {
             return true;
         }
 
+        if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
+            long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
+            ChannelHandler oldHandler = existing.getPipeline().get("timeout");
+            ReadTimeoutHandler newHandler = new ReadTimeoutHandler(getEndpoint().getTimer(), timeoutInMs, TimeUnit.MILLISECONDS);
+            if (oldHandler == null) {
+                existing.getPipeline().addBefore("handler", "timeout", newHandler);
+            } else {
+                existing.getPipeline().replace(oldHandler, "timeout", newHandler);
+            }
+        }
+        
         // need to declare as final
         final Channel channel = existing;
         final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index 7a6d31f..475e14b 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -133,19 +133,18 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
             LOG.trace("Message received: {}", messageEvent);
         }
 
-        if (producer.getConfiguration().getRequestTimeout() > 0) {
-            ChannelHandler handler = ctx.getPipeline().get("timeout");
-            if (handler != null) {
-                LOG.trace("Removing timeout channel as we received message");
-                ctx.getPipeline().remove(handler);
-            }
+        ChannelHandler handler = ctx.getPipeline().get("timeout");
+        if (handler != null) {
+            LOG.trace("Removing timeout channel as we received message");
+            ctx.getPipeline().remove(handler);
         }
-
+        
         Exchange exchange = getExchange(ctx);
         if (exchange == null) {
             // we just ignore the received message as the channel is closed
             return;
-        }
+        }     
+
         AsyncCallback callback = getAsyncCallback(ctx);
 
         Message message;

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
deleted file mode 100644
index fcb7e37..0000000
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.Test;
-
-/**
- * @version 
- */
-public class NettyCachedRequestTimeoutTest extends BaseNettyTest {
-
-    @Test
-    public void testRequestTimeoutKeyInProducerCache() throws Exception {        
-        assertEquals(0, template.getCurrentCacheSize());
-        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);
-        out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);       
-        assertEquals(1, template.getCurrentCacheSize());
-        
-        template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class);
-        assertEquals(2, template.getCurrentCacheSize());
-        template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class);
-        assertEquals(3, template.getCurrentCacheSize());        
-    }
-    
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
-                    .transform().constant("Bye World");
-
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
index 94f9e79..8b499ce 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
@@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest {
     }
 
     @Test
+    public void testRequestTimeoutViaHeader() throws Exception {
+        try {
+            template.requestBodyAndHeader("netty:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+    
+    @Test
     public void testRequestTimeoutAndOk() throws Exception {
         try {
             template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
index e381e61..5466c2a 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -34,6 +34,7 @@ public final class NettyConstants {
     public static final String NETTY_SSL_CLIENT_CERT_SERIAL_NO = "CamelNettySSLClientCertSerialNumber";
     public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
     public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
+    public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
 
     private NettyConstants() {
         // Utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
index 5669e9d..1b57b11 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -89,18 +89,8 @@ public class NettyEndpoint extends DefaultEndpoint {
     @Override
     protected String createEndpointUri() {
         ObjectHelper.notNull(configuration, "configuration");
-        return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() 
-                + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : "");
+        return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); 
     }
-
-    @Override
-    public String getEndpointUri() {
-        if (getConfiguration().getRequestTimeout() > 0) {
-            return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout();   
-        } else {
-            return super.getEndpointUri();    
-        }        
-    }    
     
     protected SSLSession getSSLSession(ChannelHandlerContext ctx) {
         final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 965397f..14dab4b 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -28,6 +28,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
@@ -35,6 +36,7 @@ import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 
 import org.apache.camel.AsyncCallback;
@@ -216,6 +218,17 @@ public class NettyProducer extends DefaultAsyncProducer {
             return true;
         }
 
+        if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
+            long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
+            ChannelHandler oldHandler = existing.pipeline().get("timeout");
+            ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS);
+            if (oldHandler == null) {
+                existing.pipeline().addBefore("handler", "timeout", newHandler);
+            } else {
+                existing.pipeline().replace(oldHandler, "timeout", newHandler);
+            }
+        }
+        
         // need to declare as final
         final Channel channel = existing;
         final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index 92e9851..e7d0d13 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -126,12 +126,10 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             LOG.trace("Message received: {}", msg);
         }
 
-        if (producer.getConfiguration().getRequestTimeout() > 0) {
-            ChannelHandler handler = ctx.pipeline().get("timeout");
-            if (handler != null) {
-                LOG.trace("Removing timeout channel as we received message");
-                ctx.pipeline().remove(handler);
-            }
+        ChannelHandler handler = ctx.pipeline().get("timeout");
+        if (handler != null) {
+            LOG.trace("Removing timeout channel as we received message");
+            ctx.pipeline().remove(handler);
         }
 
         Exchange exchange = getExchange(ctx);

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
deleted file mode 100644
index 93765cd..0000000
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.Test;
-
-/**
- * @version 
- */
-public class NettyCachedRequestTimeoutTest extends BaseNettyTest {
-
-    @Test
-    public void testRequestTimeoutKeyInProducerCache() throws Exception {        
-        assertEquals(0, template.getCurrentCacheSize());
-        String out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);
-        out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);       
-        assertEquals(1, template.getCurrentCacheSize());
-        
-        template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class);
-        assertEquals(2, template.getCurrentCacheSize());
-        template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class);
-        assertEquals(3, template.getCurrentCacheSize());        
-    }
-    
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
-                    .transform().constant("Bye World");
-
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/b587d402/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
index fe98395..770fab5 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
@@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest {
     }
 
     @Test
+    public void testRequestTimeoutViaHeader() throws Exception {
+        try {
+            template.requestBodyAndHeader("netty4:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+    
+    @Test
     public void testRequestTimeoutAndOk() throws Exception {
         try {
             template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);


[2/2] camel git commit: CAMEL-9003 - Use header for overriding request timeouts instead

Posted by ja...@apache.org.
CAMEL-9003 - Use header for overriding request timeouts instead


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5187b5de
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5187b5de
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5187b5de

Branch: refs/heads/master
Commit: 5187b5dea17062ab05afd2e9e40615ef6609d27f
Parents: 9ce76f9
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Jul 23 15:23:20 2015 -0230
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Jul 23 15:23:48 2015 -0230

----------------------------------------------------------------------
 .../camel/component/netty/NettyConstants.java   |  1 +
 .../camel/component/netty/NettyEndpoint.java    | 14 +-----
 .../camel/component/netty/NettyProducer.java    | 14 ++++++
 .../netty/handlers/ClientChannelHandler.java    | 15 +++---
 .../netty/NettyCachedRequestTimeoutTest.java    | 53 --------------------
 .../netty/NettyRequestTimeoutTest.java          | 11 ++++
 .../camel/component/netty4/NettyConstants.java  |  1 +
 .../camel/component/netty4/NettyEndpoint.java   | 12 +----
 .../camel/component/netty4/NettyProducer.java   | 13 +++++
 .../netty4/handlers/ClientChannelHandler.java   | 10 ++--
 .../netty4/NettyCachedRequestTimeoutTest.java   | 53 --------------------
 .../netty4/NettyRequestTimeoutTest.java         | 11 ++++
 12 files changed, 65 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
index 4082c7d..eed8266 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
@@ -28,6 +28,7 @@ public final class NettyConstants {
     public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
     public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
     public static final String NETTY_LOCAL_ADDRESS = "CamelNettyLocalAddress";
+    public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
     public static final String NETTY_SSL_SESSION = "CamelNettySSLSession";
     public static final String NETTY_SSL_CLIENT_CERT_SUBJECT_NAME = "CamelNettySSLClientCertSubjectName";
     public static final String NETTY_SSL_CLIENT_CERT_ISSUER_NAME = "CamelNettySSLClientCertIssuerName";

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
index 81b8648..e8fcbcc 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
@@ -98,20 +98,10 @@ public class NettyEndpoint extends DefaultEndpoint {
     @Override
     protected String createEndpointUri() {
         ObjectHelper.notNull(configuration, "configuration");
-        return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() 
-                + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : "");
+        return "netty:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); 
     }
 
     @Override
-    public String getEndpointUri() {
-        if (getConfiguration().getRequestTimeout() > 0) {
-            return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout();   
-        } else {
-            return super.getEndpointUri();    
-        }        
-    }
-    
-    @Override
     protected void doStart() throws Exception {
         ObjectHelper.notNull(timer, "timer");
     }
@@ -177,4 +167,4 @@ public class NettyEndpoint extends DefaultEndpoint {
         }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index bf72284..1bfe547 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -39,9 +39,11 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.SucceededChannelFuture;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -52,6 +54,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
 import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
 import org.jboss.netty.util.ExternalResourceReleasable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -246,6 +249,17 @@ public class NettyProducer extends DefaultAsyncProducer {
             return true;
         }
 
+        if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
+            long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
+            ChannelHandler oldHandler = existing.getPipeline().get("timeout");
+            ReadTimeoutHandler newHandler = new ReadTimeoutHandler(getEndpoint().getTimer(), timeoutInMs, TimeUnit.MILLISECONDS);
+            if (oldHandler == null) {
+                existing.getPipeline().addBefore("handler", "timeout", newHandler);
+            } else {
+                existing.getPipeline().replace(oldHandler, "timeout", newHandler);
+            }
+        }
+        
         // need to declare as final
         final Channel channel = existing;
         final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index c314e39..8988ad5 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -138,19 +138,18 @@ public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
             LOG.trace("Message received: {}", messageEvent);
         }
 
-        if (producer.getConfiguration().getRequestTimeout() > 0) {
-            ChannelHandler handler = ctx.getPipeline().get("timeout");
-            if (handler != null) {
-                LOG.trace("Removing timeout channel as we received message");
-                ctx.getPipeline().remove(handler);
-            }
+        ChannelHandler handler = ctx.getPipeline().get("timeout");
+        if (handler != null) {
+            LOG.trace("Removing timeout channel as we received message");
+            ctx.getPipeline().remove(handler);
         }
-
+        
         Exchange exchange = getExchange(ctx);
         if (exchange == null) {
             // we just ignore the received message as the channel is closed
             return;
-        }
+        }     
+
         AsyncCallback callback = getAsyncCallback(ctx);
 
         Message message;

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
deleted file mode 100644
index fcb7e37..0000000
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCachedRequestTimeoutTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.Test;
-
-/**
- * @version 
- */
-public class NettyCachedRequestTimeoutTest extends BaseNettyTest {
-
-    @Test
-    public void testRequestTimeoutKeyInProducerCache() throws Exception {        
-        assertEquals(0, template.getCurrentCacheSize());
-        String out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);
-        out = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);       
-        assertEquals(1, template.getCurrentCacheSize());
-        
-        template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class);
-        assertEquals(2, template.getCurrentCacheSize());
-        template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class);
-        assertEquals(3, template.getCurrentCacheSize());        
-    }
-    
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
-                    .transform().constant("Bye World");
-
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
index 94f9e79..8b499ce 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyRequestTimeoutTest.java
@@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest {
     }
 
     @Test
+    public void testRequestTimeoutViaHeader() throws Exception {
+        try {
+            template.requestBodyAndHeader("netty:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+    
+    @Test
     public void testRequestTimeoutAndOk() throws Exception {
         try {
             template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
index e381e61..5466c2a 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -34,6 +34,7 @@ public final class NettyConstants {
     public static final String NETTY_SSL_CLIENT_CERT_SERIAL_NO = "CamelNettySSLClientCertSerialNumber";
     public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
     public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
+    public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
 
     private NettyConstants() {
         // Utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
index 5669e9d..1b57b11 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
@@ -89,18 +89,8 @@ public class NettyEndpoint extends DefaultEndpoint {
     @Override
     protected String createEndpointUri() {
         ObjectHelper.notNull(configuration, "configuration");
-        return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort() 
-                + ((getConfiguration().getRequestTimeout() > 0) ? "?requestTimeout=" + getConfiguration().getRequestTimeout() : "");
+        return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort(); 
     }
-
-    @Override
-    public String getEndpointUri() {
-        if (getConfiguration().getRequestTimeout() > 0) {
-            return super.getEndpointUri() + "?requestTimeout=" + getConfiguration().getRequestTimeout();   
-        } else {
-            return super.getEndpointUri();    
-        }        
-    }    
     
     protected SSLSession getSSLSession(ChannelHandlerContext ctx) {
         final SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 965397f..14dab4b 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -28,6 +28,7 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
@@ -35,6 +36,7 @@ import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.ReadTimeoutHandler;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 
 import org.apache.camel.AsyncCallback;
@@ -216,6 +218,17 @@ public class NettyProducer extends DefaultAsyncProducer {
             return true;
         }
 
+        if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
+            long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
+            ChannelHandler oldHandler = existing.pipeline().get("timeout");
+            ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS);
+            if (oldHandler == null) {
+                existing.pipeline().addBefore("handler", "timeout", newHandler);
+            } else {
+                existing.pipeline().replace(oldHandler, "timeout", newHandler);
+            }
+        }
+        
         // need to declare as final
         final Channel channel = existing;
         final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index 28d6e22..dd64cb6 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -130,12 +130,10 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             LOG.trace("Message received: {}", msg);
         }
 
-        if (producer.getConfiguration().getRequestTimeout() > 0) {
-            ChannelHandler handler = ctx.pipeline().get("timeout");
-            if (handler != null) {
-                LOG.trace("Removing timeout channel as we received message");
-                ctx.pipeline().remove(handler);
-            }
+        ChannelHandler handler = ctx.pipeline().get("timeout");
+        if (handler != null) {
+            LOG.trace("Removing timeout channel as we received message");
+            ctx.pipeline().remove(handler);
         }
 
         Exchange exchange = getExchange(ctx);

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
deleted file mode 100644
index 93765cd..0000000
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyCachedRequestTimeoutTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import org.apache.camel.builder.RouteBuilder;
-import org.junit.Test;
-
-/**
- * @version 
- */
-public class NettyCachedRequestTimeoutTest extends BaseNettyTest {
-
-    @Test
-    public void testRequestTimeoutKeyInProducerCache() throws Exception {        
-        assertEquals(0, template.getCurrentCacheSize());
-        String out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);
-        out = template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);
-        assertEquals("Bye World", out);       
-        assertEquals(1, template.getCurrentCacheSize());
-        
-        template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1001", "Hello Camel", String.class);
-        assertEquals(2, template.getCurrentCacheSize());
-        template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1002", "Hello Camel", String.class);
-        assertEquals(3, template.getCurrentCacheSize());        
-    }
-    
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
-                    .transform().constant("Bye World");
-
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/5187b5de/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
index fe98395..770fab5 100644
--- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyRequestTimeoutTest.java
@@ -46,6 +46,17 @@ public class NettyRequestTimeoutTest extends BaseNettyTest {
     }
 
     @Test
+    public void testRequestTimeoutViaHeader() throws Exception {
+        try {
+            template.requestBodyAndHeader("netty4:tcp://localhost:{{port}}?textline=true&sync=true", "Hello Camel", NettyConstants.NETTY_REQUEST_TIMEOUT, 1000, String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            ReadTimeoutException cause = assertIsInstanceOf(ReadTimeoutException.class, e.getCause());
+            assertNotNull(cause);
+        }
+    }
+    
+    @Test
     public void testRequestTimeoutAndOk() throws Exception {
         try {
             template.requestBody("netty4:tcp://localhost:{{port}}?textline=true&sync=true&requestTimeout=1000", "Hello Camel", String.class);