You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2015/11/26 20:37:23 UTC

[1/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed

Repository: camel
Updated Branches:
  refs/heads/camel-2.15.x 5347e630a -> 2df8ed4e3
  refs/heads/camel-2.16.x 083b89bfa -> 7bb44ce60
  refs/heads/master 3f749f781 -> 2c96cb137


CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2c96cb13
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2c96cb13
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2c96cb13

Branch: refs/heads/master
Commit: 2c96cb1374cccb30525e5330190006a1e1fb79f0
Parents: 3f749f7
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:05:37 2015 -0330

----------------------------------------------------------------------
 .../netty4/handlers/ClientChannelHandler.java   | 21 +++--
 .../netty4/handlers/ServerChannelHandler.java   |  8 +-
 .../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
 3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index dd64cb6..8905540 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         producer.getAllChannels().add(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
@@ -94,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
@@ -108,18 +111,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         // to keep track of open sockets
         producer.getAllChannels().remove(ctx.channel());
 
-        if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+        NettyConfiguration configuration = producer.getConfiguration();
+        if (configuration.isSync() && !exceptionHandled) {
             // To avoid call the callback.done twice
             exceptionHandled = true;
             // session was closed but no message received. This could be because the remote server had an internal error
-            // and could not return a response. We should count down to stop waiting for a response
+            // and could not return a response. We should count down to stop waiting for a response            
+            String address = configuration != null ? configuration.getAddress() : "";
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+                LOG.debug("Channel closed but no message received from address: {}", address);
             }
-            exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+            exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
             // signal callback
             callback.done(false);
         }
+        
+        // make sure the event can be processed by other handlers
+        super.channelInactive(ctx);
     }
 
     @Override
@@ -202,7 +210,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
      * @throws Exception is thrown if error getting the response message
      */
     protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
         Object body = message;
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+        
+        super.channelInactive(ctx);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+	private static int PORT = 4093;
+
+    @Test
+    public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    acceptReplyAcceptClose();
+                    acceptReplyAcceptClose();
+                } catch (IOException e) {
+                    log.error("Exception occured: " + e.getMessage(), e);
+                }
+             }
+        }).start();
+
+        String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+        log.info("Received first response <" + response1 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+        
+        String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+        log.info("Received 2nd response <" + response2 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+    }
+
+	private void acceptReplyAcceptClose() throws IOException {
+		byte buf[] = new byte[128];
+
+		ServerSocket serverSocket = new ServerSocket(PORT);
+		Socket soc = serverSocket.accept();
+
+		log.info("Open socket and accept data");
+		try (InputStream is = soc.getInputStream();
+				OutputStream os = soc.getOutputStream()) {
+			// read first message
+			is.read(buf);
+						
+			// reply to the first message
+			os.write("response\n".getBytes());
+			
+			// read second message
+			is.read(buf);				
+
+			// do not reply, just close socket (emulate network problem)
+		} finally {
+			soc.close();
+			serverSocket.close();
+		}
+		log.info("Close socket");
+	}
+
+}


[3/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed

Posted by ja...@apache.org.
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2df8ed4e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2df8ed4e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2df8ed4e

Branch: refs/heads/camel-2.15.x
Commit: 2df8ed4e3f0dba69dad59745c9ebc68600c1ddf4
Parents: 5347e63
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:06:26 2015 -0330

----------------------------------------------------------------------
 .../netty4/handlers/ClientChannelHandler.java   | 21 +++--
 .../netty4/handlers/ServerChannelHandler.java   |  8 +-
 .../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
 3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index e7d0d13..81c67cc 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         producer.getAllChannels().add(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
@@ -89,7 +92,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
@@ -103,18 +106,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         // to keep track of open sockets
         producer.getAllChannels().remove(ctx.channel());
 
-        if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+        NettyConfiguration configuration = producer.getConfiguration();
+        if (configuration.isSync() && !exceptionHandled) {
             // To avoid call the callback.done twice
             exceptionHandled = true;
             // session was closed but no message received. This could be because the remote server had an internal error
-            // and could not return a response. We should count down to stop waiting for a response
+            // and could not return a response. We should count down to stop waiting for a response            
+            String address = configuration != null ? configuration.getAddress() : "";
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+                LOG.debug("Channel closed but no message received from address: {}", address);
             }
-            exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+            exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
             // signal callback
             callback.done(false);
         }
+        
+        // make sure the event can be processed by other handlers
+        super.channelInactive(ctx);
     }
 
     @Override
@@ -198,7 +206,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
      * @throws Exception is thrown if error getting the response message
      */
     protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
         Object body = message;
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+        
+        super.channelInactive(ctx);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+	private static int PORT = 4093;
+
+    @Test
+    public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    acceptReplyAcceptClose();
+                    acceptReplyAcceptClose();
+                } catch (IOException e) {
+                    log.error("Exception occured: " + e.getMessage(), e);
+                }
+             }
+        }).start();
+
+        String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+        log.info("Received first response <" + response1 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+        
+        String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+        log.info("Received 2nd response <" + response2 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+    }
+
+	private void acceptReplyAcceptClose() throws IOException {
+		byte buf[] = new byte[128];
+
+		ServerSocket serverSocket = new ServerSocket(PORT);
+		Socket soc = serverSocket.accept();
+
+		log.info("Open socket and accept data");
+		try (InputStream is = soc.getInputStream();
+				OutputStream os = soc.getOutputStream()) {
+			// read first message
+			is.read(buf);
+						
+			// reply to the first message
+			os.write("response\n".getBytes());
+			
+			// read second message
+			is.read(buf);				
+
+			// do not reply, just close socket (emulate network problem)
+		} finally {
+			soc.close();
+			serverSocket.close();
+		}
+		log.info("Close socket");
+	}
+
+}


[2/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed

Posted by ja...@apache.org.
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7bb44ce6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7bb44ce6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7bb44ce6

Branch: refs/heads/camel-2.16.x
Commit: 7bb44ce60dbb827ec72ff30adc40778f18a450bd
Parents: 083b89b
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:06:08 2015 -0330

----------------------------------------------------------------------
 .../netty4/handlers/ClientChannelHandler.java   | 21 +++--
 .../netty4/handlers/ServerChannelHandler.java   |  8 +-
 .../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
 3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index dd64cb6..8905540 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
 import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         producer.getAllChannels().add(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
@@ -94,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
@@ -108,18 +111,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         // to keep track of open sockets
         producer.getAllChannels().remove(ctx.channel());
 
-        if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+        NettyConfiguration configuration = producer.getConfiguration();
+        if (configuration.isSync() && !exceptionHandled) {
             // To avoid call the callback.done twice
             exceptionHandled = true;
             // session was closed but no message received. This could be because the remote server had an internal error
-            // and could not return a response. We should count down to stop waiting for a response
+            // and could not return a response. We should count down to stop waiting for a response            
+            String address = configuration != null ? configuration.getAddress() : "";
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+                LOG.debug("Channel closed but no message received from address: {}", address);
             }
-            exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+            exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
             // signal callback
             callback.done(false);
         }
+        
+        // make sure the event can be processed by other handlers
+        super.channelInactive(ctx);
     }
 
     @Override
@@ -202,7 +210,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
      * @throws Exception is thrown if error getting the response message
      */
     protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
         Object body = message;
 
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
     }
 
     @Override
-    public void channelActive(ChannelHandlerContext ctx) {
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel open: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+        
+        super.channelActive(ctx);
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Channel closed: {}", ctx.channel());
         }
         // to keep track of open sockets
         consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+        
+        super.channelInactive(ctx);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+	private static int PORT = 4093;
+
+    @Test
+    public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    acceptReplyAcceptClose();
+                    acceptReplyAcceptClose();
+                } catch (IOException e) {
+                    log.error("Exception occured: " + e.getMessage(), e);
+                }
+             }
+        }).start();
+
+        String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+        log.info("Received first response <" + response1 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+        
+        String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+        log.info("Received 2nd response <" + response2 + ">");
+
+        try {
+            // our test server will close the socket now so we should get an error
+            template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);            
+        } catch (Exception e) {
+            assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+        }
+    }
+
+	private void acceptReplyAcceptClose() throws IOException {
+		byte buf[] = new byte[128];
+
+		ServerSocket serverSocket = new ServerSocket(PORT);
+		Socket soc = serverSocket.accept();
+
+		log.info("Open socket and accept data");
+		try (InputStream is = soc.getInputStream();
+				OutputStream os = soc.getOutputStream()) {
+			// read first message
+			is.read(buf);
+						
+			// reply to the first message
+			os.write("response\n".getBytes());
+			
+			// read second message
+			is.read(buf);				
+
+			// do not reply, just close socket (emulate network problem)
+		} finally {
+			soc.close();
+			serverSocket.close();
+		}
+		log.info("Close socket");
+	}
+
+}