You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/07 09:43:46 UTC

camel git commit: CAMEL-9195: Fixed memory leak in undertow producer. Also allow to configure channel options from the endpoint uri.

Repository: camel
Updated Branches:
  refs/heads/master d406c87f1 -> c7a4146d0


CAMEL-9195: Fixed memory leak in undertow producer. Also allow to configure channel options from the endpoint uri.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7a4146d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7a4146d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7a4146d

Branch: refs/heads/master
Commit: c7a4146d0ace0cd50a306a2501ce859dfaf3e35a
Parents: d406c87
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Oct 7 09:17:22 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 7 09:45:50 2015 +0200

----------------------------------------------------------------------
 .../component/undertow/UndertowComponent.java   |   7 ++
 .../component/undertow/UndertowEndpoint.java    | 117 ++++++++++++++++++-
 .../component/undertow/UndertowProducer.java    |  54 +++++++--
 .../undertow/UndertowProducerLeakTest.java      |  45 +++++++
 .../src/test/resources/log4j.properties         |   2 +-
 5 files changed, 213 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
index 3584819..13ad686 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
@@ -35,6 +35,7 @@ import org.apache.camel.spi.RestApiConsumerFactory;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestConsumerFactory;
 import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
@@ -58,10 +59,16 @@ public class UndertowComponent extends UriEndpointComponent implements RestConsu
         URI uriHttpUriAddress = new URI(UnsafeUriCharactersEncoder.encodeHttpURI(remaining));
         URI endpointUri = URISupport.createRemainingURI(uriHttpUriAddress, parameters);
 
+        // any additional channel options
+        Map<String, Object> options = IntrospectionSupport.extractProperties(parameters, "option.");
+
         // create the endpoint first
         UndertowEndpoint endpoint = createEndpointInstance(endpointUri, this);
         endpoint.setUndertowHttpBinding(undertowHttpBinding);
         setProperties(endpoint, parameters);
+        if (options != null) {
+            endpoint.setOptions(options);
+        }
 
         // then re-create the http uri with the remaining parameters which the endpoint did not use
         URI httpUri = URISupport.createRemainingURI(

http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
index 746b609..4bb0f01 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.undertow;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Locale;
+import java.util.Map;
 import javax.net.ssl.SSLContext;
 
 import io.undertow.server.HttpServerExchange;
@@ -34,16 +36,23 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.jsse.SSLContextParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.Option;
+import org.xnio.OptionMap;
+import org.xnio.Options;
 
 /**
  * Represents an Undertow endpoint.
  */
 @UriEndpoint(scheme = "undertow", title = "Undertow", syntax = "undertow:httpURI",
-    consumerClass = UndertowConsumer.class, label = "http")
+        consumerClass = UndertowConsumer.class, label = "http")
 public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
 
+    private static final Logger LOG = LoggerFactory.getLogger(UndertowEndpoint.class);
     private UndertowComponent component;
     private SSLContext sslContext;
+    private OptionMap optionMap;
 
     @UriPath
     private URI httpURI;
@@ -61,6 +70,14 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
     private Boolean throwExceptionOnFailure;
     @UriParam
     private Boolean transferException;
+    @UriPath(label = "producer", defaultValue = "true")
+    private Boolean keepAlive = Boolean.TRUE;
+    @UriPath(label = "producer", defaultValue = "true")
+    private Boolean tcpNoDelay = Boolean.TRUE;
+    @UriPath(label = "producer", defaultValue = "true")
+    private Boolean reuseAddresses = Boolean.TRUE;
+    @UriParam(label = "producer")
+    private Map<String, Object> options;
 
     public UndertowEndpoint(String uri, UndertowComponent component) throws URISyntaxException {
         super(uri, component);
@@ -74,7 +91,7 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
 
     @Override
     public Producer createProducer() throws Exception {
-        return new UndertowProducer(this);
+        return new UndertowProducer(this, optionMap);
     }
 
     @Override
@@ -206,6 +223,51 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
         this.undertowHttpBinding = undertowHttpBinding;
     }
 
+    public Boolean getKeepAlive() {
+        return keepAlive;
+    }
+
+    /**
+     * Setting to ensure socket is not closed due to inactivity
+     */
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public Boolean getTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    /**
+     * Setting to improve TCP protocol performance
+     */
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public Boolean getReuseAddresses() {
+        return reuseAddresses;
+    }
+
+    /**
+     * Setting to facilitate socket multiplexing
+     */
+    public void setReuseAddresses(Boolean reuseAddresses) {
+        this.reuseAddresses = reuseAddresses;
+    }
+
+    public Map<String, Object> getOptions() {
+        return options;
+    }
+
+    /**
+     * Sets additional channel options. The options that can be used are defined in {@link org.xnio.Options}.
+     * To configure from endpoint uri, then prefix each option with <tt>option.</tt>, such as <tt>option.close-abort=true&option.send-buffer=8192</tt>
+     */
+    public void setOptions(Map<String, Object> options) {
+        this.options = options;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -213,5 +275,56 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
         if (sslContextParameters != null) {
             sslContext = sslContextParameters.createSSLContext();
         }
+
+        // create options map
+        if (options != null && !options.isEmpty()) {
+
+            // favor to use the classloader that loaded the user application
+            ClassLoader cl = getComponent().getCamelContext().getApplicationContextClassLoader();
+            if (cl == null) {
+                cl = Options.class.getClassLoader();
+            }
+
+            OptionMap.Builder builder = OptionMap.builder();
+            for (Map.Entry<String, Object> entry : options.entrySet()) {
+                String key = entry.getKey();
+                Object value = entry.getValue();
+                if (key != null && value != null) {
+                    // upper case and dash as underscore
+                    key = key.toUpperCase(Locale.ENGLISH).replace('-', '_');
+                    // must be field name
+                    key = Options.class.getName() + "." + key;
+                    Option option = Option.fromString(key, cl);
+                    value = option.parseValue(value.toString(), cl);
+                    LOG.trace("Parsed option {}={}", option.getName(), value);
+                    builder.set(option, value);
+                }
+            }
+            optionMap = builder.getMap();
+        } else {
+            // use an empty map
+            optionMap = OptionMap.EMPTY;
+        }
+
+        // and then configure these default options if they have not been explicit configured
+        if (keepAlive != null && !optionMap.contains(Options.KEEP_ALIVE)) {
+            // rebuild map
+            OptionMap.Builder builder = OptionMap.builder();
+            builder.addAll(optionMap).set(Options.KEEP_ALIVE, keepAlive);
+            optionMap = builder.getMap();
+        }
+        if (tcpNoDelay != null && !optionMap.contains(Options.TCP_NODELAY)) {
+            // rebuild map
+            OptionMap.Builder builder = OptionMap.builder();
+            builder.addAll(optionMap).set(Options.TCP_NODELAY, tcpNoDelay);
+            optionMap = builder.getMap();
+        }
+        if (reuseAddresses != null && !optionMap.contains(Options.REUSE_ADDRESSES)) {
+            // rebuild map
+            OptionMap.Builder builder = OptionMap.builder();
+            builder.addAll(optionMap).set(Options.REUSE_ADDRESSES, tcpNoDelay);
+            optionMap = builder.getMap();
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index 84a8b26..0d1ba75 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.Message;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xnio.BufferAllocator;
@@ -53,10 +54,14 @@ import org.xnio.XnioWorker;
 public class UndertowProducer extends DefaultAsyncProducer {
     private static final Logger LOG = LoggerFactory.getLogger(UndertowProducer.class);
     private UndertowEndpoint endpoint;
+    private XnioWorker worker;
+    private ByteBufferSlicePool pool;
+    private OptionMap options;
 
-    public UndertowProducer(UndertowEndpoint endpoint) {
+    public UndertowProducer(UndertowEndpoint endpoint, OptionMap options) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.options = options;
     }
 
     @Override
@@ -66,12 +71,12 @@ public class UndertowProducer extends DefaultAsyncProducer {
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        ClientConnection connection = null;
+
         try {
             final UndertowClient client = UndertowClient.getInstance();
-            XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);
 
-            IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker,
-                    new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192), OptionMap.EMPTY);
+            IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, pool, options);
 
             // creating the url to use takes 2-steps
             String url = UndertowHelper.createURL(exchange, getEndpoint());
@@ -99,9 +104,11 @@ public class UndertowProducer extends DefaultAsyncProducer {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Executing http {} method: {}", method, url);
             }
-            connect.get().sendRequest(request, new UndertowProducerCallback(bodyAsByte, exchange, callback));
+            connection = connect.get();
+            connection.sendRequest(request, new UndertowProducerCallback(connection, bodyAsByte, exchange, callback));
 
         } catch (Exception e) {
+            IOHelper.close(connection);
             exchange.setException(e);
             callback.done(true);
             return true;
@@ -115,23 +122,45 @@ public class UndertowProducer extends DefaultAsyncProducer {
         return endpoint.getUndertowHttpBinding().toHttpRequest(request, camelExchange.getIn());
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        pool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192);
+        worker = Xnio.getInstance().createWorker(options);
+
+        LOG.debug("Created worker: {} with options: {}", worker, options);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (worker != null && !worker.isShutdown()) {
+            LOG.debug("Shutting down worker: {}", worker);
+            worker.shutdown();
+        }
+    }
+
     /**
      * Everything important happens in callback
      */
     private class UndertowProducerCallback implements ClientCallback<ClientExchange> {
 
+        private final ClientConnection connection;
         private final ByteBuffer body;
         private final Exchange camelExchange;
         private final AsyncCallback callback;
 
-        public UndertowProducerCallback(ByteBuffer body, Exchange camelExchange, AsyncCallback callback) {
+        public UndertowProducerCallback(ClientConnection connection, ByteBuffer body, Exchange camelExchange, AsyncCallback callback) {
+            this.connection = connection;
             this.body = body;
             this.camelExchange = camelExchange;
             this.callback = callback;
         }
 
         @Override
-        public void completed(ClientExchange clientExchange) {
+        public void completed(final ClientExchange clientExchange) {
             clientExchange.setResponseListener(new ClientCallback<ClientExchange>() {
                 @Override
                 public void completed(ClientExchange clientExchange) {
@@ -146,6 +175,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
                     } catch (Exception e) {
                         camelExchange.setException(e);
                     } finally {
+                        IOHelper.close(connection);
                         // make sure to call callback
                         callback.done(false);
                     }
@@ -155,8 +185,12 @@ public class UndertowProducer extends DefaultAsyncProducer {
                 public void failed(IOException e) {
                     LOG.trace("failed: {}", e);
                     camelExchange.setException(e);
-                    // make sure to call callback
-                    callback.done(false);
+                    try {
+                        IOHelper.close(connection);
+                    } finally {
+                        // make sure to call callback
+                        callback.done(false);
+                    }
                 }
             });
 
@@ -167,6 +201,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
                 }
             } catch (IOException e) {
                 camelExchange.setException(e);
+                IOHelper.close(connection);
                 // make sure to call callback
                 callback.done(false);
             }
@@ -176,6 +211,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
         public void failed(IOException e) {
             LOG.trace("failed: {}", e);
             camelExchange.setException(e);
+            IOHelper.close(connection);
             // make sure to call callback
             callback.done(false);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
new file mode 100644
index 0000000..cb11463
--- /dev/null
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.undertow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class UndertowProducerLeakTest extends BaseUndertowTest {
+
+    @Test
+    public void testLeak() throws Exception {
+        getMockEndpoint("mock:result").expectedMinimumMessageCount(50);
+
+        assertMockEndpointsSatisfied(2, TimeUnit.MINUTES);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("undertow:http://localhost:{{port}}/test").to("log:undertow?showAll=true").to("mock:result");
+
+                from("timer:foo?period=100").transform(constant("hello world"))
+                        .to("undertow:http://localhost:{{port}}/test");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/test/resources/log4j.properties b/components/camel-undertow/src/test/resources/log4j.properties
index b8d5c8b..8b9fe88 100644
--- a/components/camel-undertow/src/test/resources/log4j.properties
+++ b/components/camel-undertow/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, file
 
 # uncomment the following to enable camel debugging
-#log4j.logger.org.apache.camel.component.undertow=TRACE
+#log4j.logger.org.apache.camel.component.undertow=DEBUG
 #log4j.logger.org.apache.camel.util.jsse=DEBUG
 
 # CONSOLE appender not used by default