You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/24 11:28:26 UTC
camel git commit: CAMEL-9637: camel-netty - Allow to reuse previous
Channel in next call
Repository: camel
Updated Branches:
refs/heads/master 25fb00796 -> dc9144ecb
CAMEL-9637: camel-netty - Allow to reuse previous Channel in next call
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc9144ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc9144ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc9144ec
Branch: refs/heads/master
Commit: dc9144ecb92da6bcf9e14828b12c646a5a30aabd
Parents: 25fb007
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 24 11:24:32 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 24 11:28:11 2016 +0100
----------------------------------------------------------------------
.../component/netty4/NettyConfiguration.java | 27 +++++-
.../camel/component/netty4/NettyConstants.java | 1 +
.../camel/component/netty4/NettyProducer.java | 68 +++++++++++++--
.../netty4/handlers/ClientChannelHandler.java | 3 +-
.../component/netty4/NettyReuseChannelTest.java | 90 ++++++++++++++++++++
5 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index cf8dac1..69f9cf9 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -24,10 +24,12 @@ import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
+import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.UriParam;
@@ -104,7 +106,9 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
private boolean useByteBuf;
@UriParam(label = "advanced")
private boolean udpByteArrayCodec;
-
+ @UriParam(label = "producer")
+ private boolean reuseChannel;
+
/**
* Returns a copy of this configuration
@@ -200,7 +204,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
// additional netty options, we don't want to store an empty map, so set it as null if empty
options = IntrospectionSupport.extractProperties(parameters, "option.");
- if (options != null && options.isEmpty()) {
+ if (options != null && options.isEmpty()) {
options = null;
}
@@ -584,7 +588,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
public void setUdpConnectionlessSending(boolean udpConnectionlessSending) {
this.udpConnectionlessSending = udpConnectionlessSending;
}
-
+
public boolean isClientMode() {
return clientMode;
}
@@ -618,6 +622,23 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
this.udpByteArrayCodec = udpByteArrayCodec;
}
+ public boolean isReuseChannel() {
+ return reuseChannel;
+ }
+
+ /**
+ * This option allows producers to reuse the same Netty {@link Channel} for the lifecycle of processing the {@link Exchange}.
+ * This is useable if you need to call a server multiple times in a Camel route and want to use the same network connection.
+ * When using this the channel is not returned to the connection pool until the {@link Exchange} is done; or disconnected
+ * if the disconnect option is set to true.
+ * <p/>
+ * The reused {@link Channel} is stored on the {@link Exchange} as an exchange property with the key {@link NettyConstants#NETTY_CHANNEL}
+ * which allows you to obtain the channel during routing and use it as well.
+ */
+ public void setReuseChannel(boolean reuseChannel) {
+ this.reuseChannel = reuseChannel;
+ }
+
private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
if (handlers != null) {
for (T handler : handlers) {
http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
index 5466c2a..fa2a6da 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -35,6 +35,7 @@ public final class NettyConstants {
public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
+ public static final String NETTY_CHANNEL = "CamelNettyChannel";
private NettyConstants() {
// Utility class
http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index d0d8eb7..c181ebf 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -42,6 +42,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
@@ -196,11 +197,16 @@ public class NettyProducer extends DefaultAsyncProducer {
}
// get a channel from the pool
- Channel existing;
+ Channel existing = null;
try {
- existing = pool.borrowObject();
- if (existing != null) {
- LOG.trace("Got channel from pool {}", existing);
+ if (getConfiguration().isReuseChannel()) {
+ existing = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+ }
+ if (existing == null) {
+ existing = pool.borrowObject();
+ if (existing != null) {
+ LOG.trace("Got channel from pool {}", existing);
+ }
}
} catch (Exception e) {
exchange.setException(e);
@@ -215,6 +221,50 @@ public class NettyProducer extends DefaultAsyncProducer {
return true;
}
+ // remember channel so we can reuse it
+ if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
+ final Channel channel = existing;
+ exchange.setProperty(NettyConstants.NETTY_CHANNEL, existing);
+ // and defer closing the channel until we are done routing the exchange
+ exchange.addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+
+ if (disconnect) {
+ LOG.trace("Closing channel {} as routing the Exchange is done", channel);
+ NettyHelper.close(channel);
+ }
+
+ try {
+ // Only put the connected channel back to the pool
+ if (channel.isActive()) {
+ LOG.trace("Putting channel back to pool {}", channel);
+ pool.returnObject(channel);
+ } else {
+ // and if its not active then invalidate it
+ LOG.trace("Invalidating channel from pool {}", channel);
+ pool.invalidateObject(channel);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e);
+ }
+ }
+ });
+ }
+
if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
ChannelHandler oldHandler = existing.pipeline().get("timeout");
@@ -263,7 +313,9 @@ public class NettyProducer extends DefaultAsyncProducer {
if (close != null) {
disconnect = close;
}
- if (disconnect) {
+
+ // we should not close if we are reusing the channel
+ if (!configuration.isReuseChannel() && disconnect) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
}
@@ -466,9 +518,13 @@ public class NettyProducer extends DefaultAsyncProducer {
if (channel.isActive()) {
LOG.trace("Putting channel back to pool {}", channel);
pool.returnObject(channel);
+ } else {
+ // and if its not active then invalidate it
+ LOG.trace("Invalidating channel from pool {}", channel);
+ pool.invalidateObject(channel);
}
} catch (Exception e) {
- LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel);
+ LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e);
} finally {
// ensure we call the delegated callback
callback.done(doneSync);
http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index ed4d695..b9a2a17 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -193,7 +193,8 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
if (close != null) {
disconnect = close;
}
- if (disconnect) {
+ // we should not close if we are reusing the channel
+ if (!producer.getConfiguration().isReuseChannel() && disconnect) {
if (LOG.isTraceEnabled()) {
LOG.trace("Closing channel when complete at address: {}", producer.getConfiguration().getAddress());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
new file mode 100644
index 0000000..7a4cbdd
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ * @version
+ */
+public class NettyReuseChannelTest extends BaseNettyTest {
+
+ private final List<Channel> channels = new ArrayList<>();
+
+ @Test
+ public void testReuse() throws Exception {
+ NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Hello World");
+
+ template.sendBody("direct:start", "World\n");
+
+ assertMockEndpointsSatisfied();
+
+ assertTrue(notify.matchesMockWaitTime());
+
+ assertEquals(2, channels.size());
+ assertSame("Should reuse channel", channels.get(0), channels.get(1));
+ assertFalse("And closed when routing done", channels.get(0).isOpen());
+ assertFalse("And closed when routing done", channels.get(1).isOpen());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+ channels.add(channel);
+ assertTrue("Should be active", channel.isActive());
+ }
+ })
+ .to("mock:a")
+ .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+ channels.add(channel);
+ assertTrue("Should be active", channel.isActive());
+ }
+ })
+ .to("mock:b");
+
+ from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
+ .transform(body().prepend("Hello "))
+ .to("mock:result");
+ }
+ };
+ }
+}