You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/24 11:28:26 UTC

camel git commit: CAMEL-9637: camel-netty - Allow to reuse previous Channel in next call

Repository: camel
Updated Branches:
  refs/heads/master 25fb00796 -> dc9144ecb


CAMEL-9637: camel-netty - Allow to reuse previous Channel in next call


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc9144ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc9144ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc9144ec

Branch: refs/heads/master
Commit: dc9144ecb92da6bcf9e14828b12c646a5a30aabd
Parents: 25fb007
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Feb 24 11:24:32 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Feb 24 11:28:11 2016 +0100

----------------------------------------------------------------------
 .../component/netty4/NettyConfiguration.java    | 27 +++++-
 .../camel/component/netty4/NettyConstants.java  |  1 +
 .../camel/component/netty4/NettyProducer.java   | 68 +++++++++++++--
 .../netty4/handlers/ClientChannelHandler.java   |  3 +-
 .../component/netty4/NettyReuseChannelTest.java | 90 ++++++++++++++++++++
 5 files changed, 179 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index cf8dac1..69f9cf9 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -24,10 +24,12 @@ import java.util.List;
 import java.util.Map;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.handler.codec.Delimiters;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
+import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.UriParam;
@@ -104,7 +106,9 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     private boolean useByteBuf;
     @UriParam(label = "advanced")
     private boolean udpByteArrayCodec;
-    
+    @UriParam(label = "producer")
+    private boolean reuseChannel;
+
 
     /**
      * Returns a copy of this configuration
@@ -200,7 +204,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
 
         // additional netty options, we don't want to store an empty map, so set it as null if empty
         options = IntrospectionSupport.extractProperties(parameters, "option.");
-        if (options !=  null && options.isEmpty()) {
+        if (options != null && options.isEmpty()) {
             options = null;
         }
 
@@ -584,7 +588,7 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
     public void setUdpConnectionlessSending(boolean udpConnectionlessSending) {
         this.udpConnectionlessSending = udpConnectionlessSending;
     }
-    
+
     public boolean isClientMode() {
         return clientMode;
     }
@@ -618,6 +622,23 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
         this.udpByteArrayCodec = udpByteArrayCodec;
     }
 
+    public boolean isReuseChannel() {
+        return reuseChannel;
+    }
+
+    /**
+     * This option allows producers to reuse the same Netty {@link Channel} for the lifecycle of processing the {@link Exchange}.
+     * This is useable if you need to call a server multiple times in a Camel route and want to use the same network connection.
+     * When using this the channel is not returned to the connection pool until the {@link Exchange} is done; or disconnected
+     * if the disconnect option is set to true.
+     * <p/>
+     * The reused {@link Channel} is stored on the {@link Exchange} as an exchange property with the key {@link NettyConstants#NETTY_CHANNEL}
+     * which allows you to obtain the channel during routing and use it as well.
+     */
+    public void setReuseChannel(boolean reuseChannel) {
+        this.reuseChannel = reuseChannel;
+    }
+
     private static <T> void addToHandlersList(List<T> configured, List<T> handlers, Class<T> handlerType) {
         if (handlers != null) {
             for (T handler : handlers) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
index 5466c2a..fa2a6da 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConstants.java
@@ -35,6 +35,7 @@ public final class NettyConstants {
     public static final String NETTY_SSL_CLIENT_CERT_NOT_BEFORE = "CamelNettySSLClientCertNotBefore";
     public static final String NETTY_SSL_CLIENT_CERT_NOT_AFTER = "CamelNettySSLClientCertNotAfter";
     public static final String NETTY_REQUEST_TIMEOUT = "CamelNettyRequestTimeout";
+    public static final String NETTY_CHANNEL = "CamelNettyChannel";
 
     private NettyConstants() {
         // Utility class

http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index d0d8eb7..c181ebf 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -42,6 +42,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CamelLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
@@ -196,11 +197,16 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // get a channel from the pool
-        Channel existing;
+        Channel existing = null;
         try {
-            existing = pool.borrowObject();
-            if (existing != null) {
-                LOG.trace("Got channel from pool {}", existing);
+            if (getConfiguration().isReuseChannel()) {
+                existing = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+            }
+            if (existing == null) {
+                existing = pool.borrowObject();
+                if (existing != null) {
+                    LOG.trace("Got channel from pool {}", existing);
+                }
             }
         } catch (Exception e) {
             exchange.setException(e);
@@ -215,6 +221,50 @@ public class NettyProducer extends DefaultAsyncProducer {
             return true;
         }
 
+        // remember channel so we can reuse it
+        if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
+            final Channel channel = existing;
+            exchange.setProperty(NettyConstants.NETTY_CHANNEL, existing);
+            // and defer closing the channel until we are done routing the exchange
+            exchange.addOnCompletion(new SynchronizationAdapter() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    // should channel be closed after complete?
+                    Boolean close;
+                    if (ExchangeHelper.isOutCapable(exchange)) {
+                        close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                    } else {
+                        close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+                    }
+
+                    // should we disconnect, the header can override the configuration
+                    boolean disconnect = getConfiguration().isDisconnect();
+                    if (close != null) {
+                        disconnect = close;
+                    }
+
+                    if (disconnect) {
+                        LOG.trace("Closing channel {} as routing the Exchange is done", channel);
+                        NettyHelper.close(channel);
+                    }
+
+                    try {
+                        // Only put the connected channel back to the pool
+                        if (channel.isActive()) {
+                            LOG.trace("Putting channel back to pool {}", channel);
+                            pool.returnObject(channel);
+                        } else {
+                            // and if its not active then invalidate it
+                            LOG.trace("Invalidating channel from pool {}", channel);
+                            pool.invalidateObject(channel);
+                        }
+                    } catch (Exception e) {
+                        LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e);
+                    }
+                }
+            });
+        }
+
         if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
             long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
             ChannelHandler oldHandler = existing.pipeline().get("timeout");
@@ -263,7 +313,9 @@ public class NettyProducer extends DefaultAsyncProducer {
                         if (close != null) {
                             disconnect = close;
                         }
-                        if (disconnect) {
+
+                        // we should not close if we are reusing the channel
+                        if (!configuration.isReuseChannel() && disconnect) {
                             if (LOG.isTraceEnabled()) {
                                 LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
                             }
@@ -466,9 +518,13 @@ public class NettyProducer extends DefaultAsyncProducer {
                 if (channel.isActive()) {
                     LOG.trace("Putting channel back to pool {}", channel);
                     pool.returnObject(channel);
+                } else {
+                    // and if its not active then invalidate it
+                    LOG.trace("Invalidating channel from pool {}", channel);
+                    pool.invalidateObject(channel);
                 }
             } catch (Exception e) {
-                LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel);
+                LOG.warn("Error returning channel to pool " + channel + ". This exception will be ignored.", e);
             } finally {
                 // ensure we call the delegated callback
                 callback.done(doneSync);

http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
index ed4d695..b9a2a17 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ClientChannelHandler.java
@@ -193,7 +193,8 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             if (close != null) {
                 disconnect = close;
             }
-            if (disconnect) {
+            // we should not close if we are reusing the channel
+            if (!producer.getConfiguration().isReuseChannel() && disconnect) {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Closing channel when complete at address: {}", producer.getConfiguration().getAddress());
                 }

http://git-wip-us.apache.org/repos/asf/camel/blob/dc9144ec/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
new file mode 100644
index 0000000..7a4cbdd
--- /dev/null
+++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyReuseChannelTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ * @version 
+ */
+public class NettyReuseChannelTest extends BaseNettyTest {
+
+    private final List<Channel> channels = new ArrayList<>();
+
+    @Test
+    public void testReuse() throws Exception {
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Hello World");
+
+        template.sendBody("direct:start", "World\n");
+
+        assertMockEndpointsSatisfied();
+
+        assertTrue(notify.matchesMockWaitTime());
+
+        assertEquals(2, channels.size());
+        assertSame("Should reuse channel", channels.get(0), channels.get(1));
+        assertFalse("And closed when routing done", channels.get(0).isOpen());
+        assertFalse("And closed when routing done", channels.get(1).isOpen());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                            channels.add(channel);
+                            assertTrue("Should be active", channel.isActive());
+                        }
+                    })
+                    .to("mock:a")
+                    .to("netty4:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                            channels.add(channel);
+                            assertTrue("Should be active", channel.isActive());
+                        }
+                    })
+                    .to("mock:b");
+
+                from("netty4:tcp://localhost:{{port}}?textline=true&sync=true")
+                    .transform(body().prepend("Hello "))
+                    .to("mock:result");
+            }
+        };
+    }
+}