You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2015/11/26 20:37:23 UTC
[1/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when
connection is prematurely closed
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x 5347e630a -> 2df8ed4e3
refs/heads/camel-2.16.x 083b89bfa -> 7bb44ce60
refs/heads/master 3f749f781 -> 2c96cb137
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2c96cb13
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2c96cb13
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2c96cb13
Branch: refs/heads/master
Commit: 2c96cb1374cccb30525e5330190006a1e1fb79f0
Parents: 3f749f7
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:05:37 2015 -0330
----------------------------------------------------------------------
.../netty4/handlers/ClientChannelHandler.java | 21 +++--
.../netty4/handlers/ServerChannelHandler.java | 8 +-
.../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index dd64cb6..8905540 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
import org.apache.camel.component.netty4.NettyConstants;
import org.apache.camel.component.netty4.NettyHelper;
import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
producer.getAllChannels().add(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
@@ -94,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
@@ -108,18 +111,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
// to keep track of open sockets
producer.getAllChannels().remove(ctx.channel());
- if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+ NettyConfiguration configuration = producer.getConfiguration();
+ if (configuration.isSync() && !exceptionHandled) {
// To avoid call the callback.done twice
exceptionHandled = true;
// session was closed but no message received. This could be because the remote server had an internal error
- // and could not return a response. We should count down to stop waiting for a response
+ // and could not return a response. We should count down to stop waiting for a response
+ String address = configuration != null ? configuration.getAddress() : "";
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+ LOG.debug("Channel closed but no message received from address: {}", address);
}
- exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+ exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
// signal callback
callback.done(false);
}
+
+ // make sure the event can be processed by other handlers
+ super.channelInactive(ctx);
}
@Override
@@ -202,7 +210,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception is thrown if error getting the response message
*/
protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
Object body = message;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+
+ super.channelInactive(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/2c96cb13/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+ private static int PORT = 4093;
+
+ @Test
+ public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ acceptReplyAcceptClose();
+ acceptReplyAcceptClose();
+ } catch (IOException e) {
+ log.error("Exception occured: " + e.getMessage(), e);
+ }
+ }
+ }).start();
+
+ String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+ log.info("Received first response <" + response1 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+
+ String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+ log.info("Received 2nd response <" + response2 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+ }
+
+ private void acceptReplyAcceptClose() throws IOException {
+ byte buf[] = new byte[128];
+
+ ServerSocket serverSocket = new ServerSocket(PORT);
+ Socket soc = serverSocket.accept();
+
+ log.info("Open socket and accept data");
+ try (InputStream is = soc.getInputStream();
+ OutputStream os = soc.getOutputStream()) {
+ // read first message
+ is.read(buf);
+
+ // reply to the first message
+ os.write("response\n".getBytes());
+
+ // read second message
+ is.read(buf);
+
+ // do not reply, just close socket (emulate network problem)
+ } finally {
+ soc.close();
+ serverSocket.close();
+ }
+ log.info("Close socket");
+ }
+
+}
[3/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when
connection is prematurely closed
Posted by ja...@apache.org.
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2df8ed4e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2df8ed4e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2df8ed4e
Branch: refs/heads/camel-2.15.x
Commit: 2df8ed4e3f0dba69dad59745c9ebc68600c1ddf4
Parents: 5347e63
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:06:26 2015 -0330
----------------------------------------------------------------------
.../netty4/handlers/ClientChannelHandler.java | 21 +++--
.../netty4/handlers/ServerChannelHandler.java | 8 +-
.../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index e7d0d13..81c67cc 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
import org.apache.camel.component.netty4.NettyConstants;
import org.apache.camel.component.netty4.NettyHelper;
import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
producer.getAllChannels().add(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
@@ -89,7 +92,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
@@ -103,18 +106,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
// to keep track of open sockets
producer.getAllChannels().remove(ctx.channel());
- if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+ NettyConfiguration configuration = producer.getConfiguration();
+ if (configuration.isSync() && !exceptionHandled) {
// To avoid call the callback.done twice
exceptionHandled = true;
// session was closed but no message received. This could be because the remote server had an internal error
- // and could not return a response. We should count down to stop waiting for a response
+ // and could not return a response. We should count down to stop waiting for a response
+ String address = configuration != null ? configuration.getAddress() : "";
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+ LOG.debug("Channel closed but no message received from address: {}", address);
}
- exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+ exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
// signal callback
callback.done(false);
}
+
+ // make sure the event can be processed by other handlers
+ super.channelInactive(ctx);
}
@Override
@@ -198,7 +206,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception is thrown if error getting the response message
*/
protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
Object body = message;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+
+ super.channelInactive(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/2df8ed4e/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+ private static int PORT = 4093;
+
+ @Test
+ public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ acceptReplyAcceptClose();
+ acceptReplyAcceptClose();
+ } catch (IOException e) {
+ log.error("Exception occured: " + e.getMessage(), e);
+ }
+ }
+ }).start();
+
+ String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+ log.info("Received first response <" + response1 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+
+ String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+ log.info("Received 2nd response <" + response2 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+ }
+
+ private void acceptReplyAcceptClose() throws IOException {
+ byte buf[] = new byte[128];
+
+ ServerSocket serverSocket = new ServerSocket(PORT);
+ Socket soc = serverSocket.accept();
+
+ log.info("Open socket and accept data");
+ try (InputStream is = soc.getInputStream();
+ OutputStream os = soc.getOutputStream()) {
+ // read first message
+ is.read(buf);
+
+ // reply to the first message
+ os.write("response\n".getBytes());
+
+ // read second message
+ is.read(buf);
+
+ // do not reply, just close socket (emulate network problem)
+ } finally {
+ soc.close();
+ serverSocket.close();
+ }
+ log.info("Close socket");
+ }
+
+}
[2/3] camel git commit: CAMEL-9368 - Netty4 producer hangs when
connection is prematurely closed
Posted by ja...@apache.org.
CAMEL-9368 - Netty4 producer hangs when connection is prematurely closed
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7bb44ce6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7bb44ce6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7bb44ce6
Branch: refs/heads/camel-2.16.x
Commit: 7bb44ce60dbb827ec72ff30adc40778f18a450bd
Parents: 083b89b
Author: Jonathan Anstey <ja...@gmail.com>
Authored: Thu Nov 26 16:05:37 2015 -0330
Committer: Jonathan Anstey <ja...@gmail.com>
Committed: Thu Nov 26 16:06:08 2015 -0330
----------------------------------------------------------------------
.../netty4/handlers/ClientChannelHandler.java | 21 +++--
.../netty4/handlers/ServerChannelHandler.java | 8 +-
.../component/netty4/NettyProducerHangTest.java | 93 ++++++++++++++++++++
3 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index dd64cb6..8905540 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -24,6 +24,7 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.netty4.NettyCamelState;
+import org.apache.camel.component.netty4.NettyConfiguration;
import org.apache.camel.component.netty4.NettyConstants;
import org.apache.camel.component.netty4.NettyHelper;
import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -47,12 +48,14 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
producer.getAllChannels().add(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
@@ -94,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
@@ -108,18 +111,23 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
// to keep track of open sockets
producer.getAllChannels().remove(ctx.channel());
- if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
+ NettyConfiguration configuration = producer.getConfiguration();
+ if (configuration.isSync() && !exceptionHandled) {
// To avoid call the callback.done twice
exceptionHandled = true;
// session was closed but no message received. This could be because the remote server had an internal error
- // and could not return a response. We should count down to stop waiting for a response
+ // and could not return a response. We should count down to stop waiting for a response
+ String address = configuration != null ? configuration.getAddress() : "";
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel closed but no message received from address: {}", producer.getConfiguration().getAddress());
+ LOG.debug("Channel closed but no message received from address: {}", address);
}
- exchange.setException(new CamelExchangeException("No response received from remote server: " + producer.getConfiguration().getAddress(), exchange));
+ exchange.setException(new CamelExchangeException("No response received from remote server: " + address, exchange));
// signal callback
callback.done(false);
}
+
+ // make sure the event can be processed by other handlers
+ super.channelInactive(ctx);
}
@Override
@@ -202,7 +210,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
* @throws Exception is thrown if error getting the response message
*/
protected Message getResponseMessage(Exchange exchange, ChannelHandlerContext ctx, Object message) throws Exception {
-
Object body = message;
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 0bb93fc..0df5ff0 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -48,21 +48,25 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
}
@Override
- public void channelActive(ChannelHandlerContext ctx) {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel open: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().addChannel(ctx.channel());
+
+ super.channelActive(ctx);
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel closed: {}", ctx.channel());
}
// to keep track of open sockets
consumer.getNettyServerBootstrapFactory().removeChannel(ctx.channel());
+
+ super.channelInactive(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/7bb44ce6/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
new file mode 100644
index 0000000..199180d
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyProducerHangTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class NettyProducerHangTest extends CamelTestSupport {
+
+ private static int PORT = 4093;
+
+ @Test
+ public void nettyProducerHangsOnTheSecondRequestToTheSocketWhichIsClosed() throws Exception {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ acceptReplyAcceptClose();
+ acceptReplyAcceptClose();
+ } catch (IOException e) {
+ log.error("Exception occured: " + e.getMessage(), e);
+ }
+ }
+ }).start();
+
+ String response1 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request1", String.class);
+ log.info("Received first response <" + response1 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request2", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+
+ String response2 = template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request3", String.class);
+ log.info("Received 2nd response <" + response2 + ">");
+
+ try {
+ // our test server will close the socket now so we should get an error
+ template.requestBody("netty4:tcp://localhost:" + PORT + "?textline=true&sync=true", "request4", String.class);
+ } catch (Exception e) {
+ assertStringContains(e.getCause().getMessage(), "No response received from remote server");
+ }
+ }
+
+ private void acceptReplyAcceptClose() throws IOException {
+ byte buf[] = new byte[128];
+
+ ServerSocket serverSocket = new ServerSocket(PORT);
+ Socket soc = serverSocket.accept();
+
+ log.info("Open socket and accept data");
+ try (InputStream is = soc.getInputStream();
+ OutputStream os = soc.getOutputStream()) {
+ // read first message
+ is.read(buf);
+
+ // reply to the first message
+ os.write("response\n".getBytes());
+
+ // read second message
+ is.read(buf);
+
+ // do not reply, just close socket (emulate network problem)
+ } finally {
+ soc.close();
+ serverSocket.close();
+ }
+ log.info("Close socket");
+ }
+
+}