You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/20 11:04:23 UTC
svn commit: r1424387 - in /camel/branches/camel-2.10.x: ./
components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Author: davsclaus
Date: Thu Dec 20 10:04:23 2012
New Revision: 1424387
URL: http://svn.apache.org/viewvc?rev=1424387&view=rev
Log:
CAMEL-5899: netty producer should honor connection timeout while waiting for netty to open connection. And keep better track of open/closed channels.
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1424386
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424387&r1=1424386&r2=1424387&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:04:23 2012
@@ -18,10 +18,8 @@ package org.apache.camel.component.netty
import java.net.InetSocketAddress;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -303,7 +301,7 @@ public class NettyProducer extends Defau
}
}
- private ChannelFuture openConnection() throws Exception {
+ protected ChannelFuture openConnection() throws Exception {
ChannelFuture answer;
if (isTcp()) {
@@ -363,20 +361,13 @@ public class NettyProducer extends Defau
}
private Channel openChannel(ChannelFuture channelFuture) throws Exception {
- // wait until until the operation is complete
- final CountDownLatch latch = new CountDownLatch(1);
- channelFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- LOG.trace("Operation complete {}", channelFuture);
- latch.countDown();
- }
- });
// blocking for channel to be done
- LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
- latch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
+ }
+ channelFuture.awaitUninterruptibly(configuration.getConnectTimeout());
- if (!channelFuture.isSuccess()) {
+ if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
}
Channel answer = channelFuture.getChannel();
Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1424387&r1=1424386&r2=1424387&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu Dec 20 10:04:23 2012
@@ -50,6 +50,9 @@ public class ClientChannelHandler extend
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel open: {}", ctx.getChannel());
+ }
// to keep track of open sockets
producer.getAllChannels().add(channelStateEvent.getChannel());
}
@@ -90,7 +93,9 @@ public class ClientChannelHandler extend
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.trace("Channel closed: {}", ctx.getChannel());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Channel closed: {}", ctx.getChannel());
+ }
Exchange exchange = getExchange(ctx);
AsyncCallback callback = getAsyncCallback(ctx);
@@ -98,6 +103,9 @@ public class ClientChannelHandler extend
// remove state
producer.removeState(ctx.getChannel());
+ // to keep track of open sockets
+ producer.getAllChannels().remove(ctx.getChannel());
+
if (producer.getConfiguration().isSync() && !messageReceived && !exceptionHandled) {
// session was closed but no message received. This could be because the remote server had an internal error
// and could not return a response. We should count down to stop waiting for a response