You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/10/07 09:43:46 UTC
camel git commit: CAMEL-9195: Fixed memory leak in undertow producer.
Also allow to configure channel options from the endpoint uri.
Repository: camel
Updated Branches:
refs/heads/master d406c87f1 -> c7a4146d0
CAMEL-9195: Fixed memory leak in undertow producer. Also allow to configure channel options from the endpoint uri.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c7a4146d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c7a4146d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c7a4146d
Branch: refs/heads/master
Commit: c7a4146d0ace0cd50a306a2501ce859dfaf3e35a
Parents: d406c87
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Oct 7 09:17:22 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 7 09:45:50 2015 +0200
----------------------------------------------------------------------
.../component/undertow/UndertowComponent.java | 7 ++
.../component/undertow/UndertowEndpoint.java | 117 ++++++++++++++++++-
.../component/undertow/UndertowProducer.java | 54 +++++++--
.../undertow/UndertowProducerLeakTest.java | 45 +++++++
.../src/test/resources/log4j.properties | 2 +-
5 files changed, 213 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
index 3584819..13ad686 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowComponent.java
@@ -35,6 +35,7 @@ import org.apache.camel.spi.RestApiConsumerFactory;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.spi.RestConsumerFactory;
import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.slf4j.Logger;
@@ -58,10 +59,16 @@ public class UndertowComponent extends UriEndpointComponent implements RestConsu
URI uriHttpUriAddress = new URI(UnsafeUriCharactersEncoder.encodeHttpURI(remaining));
URI endpointUri = URISupport.createRemainingURI(uriHttpUriAddress, parameters);
+ // any additional channel options
+ Map<String, Object> options = IntrospectionSupport.extractProperties(parameters, "option.");
+
// create the endpoint first
UndertowEndpoint endpoint = createEndpointInstance(endpointUri, this);
endpoint.setUndertowHttpBinding(undertowHttpBinding);
setProperties(endpoint, parameters);
+ if (options != null) {
+ endpoint.setOptions(options);
+ }
// then re-create the http uri with the remaining parameters which the endpoint did not use
URI httpUri = URISupport.createRemainingURI(
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
index 746b609..4bb0f01 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.undertow;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Locale;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import io.undertow.server.HttpServerExchange;
@@ -34,16 +36,23 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.jsse.SSLContextParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xnio.Option;
+import org.xnio.OptionMap;
+import org.xnio.Options;
/**
* Represents an Undertow endpoint.
*/
@UriEndpoint(scheme = "undertow", title = "Undertow", syntax = "undertow:httpURI",
- consumerClass = UndertowConsumer.class, label = "http")
+ consumerClass = UndertowConsumer.class, label = "http")
public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
+ private static final Logger LOG = LoggerFactory.getLogger(UndertowEndpoint.class);
private UndertowComponent component;
private SSLContext sslContext;
+ private OptionMap optionMap;
@UriPath
private URI httpURI;
@@ -61,6 +70,14 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
private Boolean throwExceptionOnFailure;
@UriParam
private Boolean transferException;
+ @UriPath(label = "producer", defaultValue = "true")
+ private Boolean keepAlive = Boolean.TRUE;
+ @UriPath(label = "producer", defaultValue = "true")
+ private Boolean tcpNoDelay = Boolean.TRUE;
+ @UriPath(label = "producer", defaultValue = "true")
+ private Boolean reuseAddresses = Boolean.TRUE;
+ @UriParam(label = "producer")
+ private Map<String, Object> options;
public UndertowEndpoint(String uri, UndertowComponent component) throws URISyntaxException {
super(uri, component);
@@ -74,7 +91,7 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
@Override
public Producer createProducer() throws Exception {
- return new UndertowProducer(this);
+ return new UndertowProducer(this, optionMap);
}
@Override
@@ -206,6 +223,51 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
this.undertowHttpBinding = undertowHttpBinding;
}
+ public Boolean getKeepAlive() {
+ return keepAlive;
+ }
+
+ /**
+ * Setting to ensure socket is not closed due to inactivity
+ */
+ public void setKeepAlive(Boolean keepAlive) {
+ this.keepAlive = keepAlive;
+ }
+
+ public Boolean getTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ /**
+ * Setting to improve TCP protocol performance
+ */
+ public void setTcpNoDelay(Boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public Boolean getReuseAddresses() {
+ return reuseAddresses;
+ }
+
+ /**
+ * Setting to facilitate socket multiplexing
+ */
+ public void setReuseAddresses(Boolean reuseAddresses) {
+ this.reuseAddresses = reuseAddresses;
+ }
+
+ public Map<String, Object> getOptions() {
+ return options;
+ }
+
+ /**
+ * Sets additional channel options. The options that can be used are defined in {@link org.xnio.Options}.
+ * To configure from endpoint uri, then prefix each option with <tt>option.</tt>, such as <tt>option.close-abort=true&option.send-buffer=8192</tt>
+ */
+ public void setOptions(Map<String, Object> options) {
+ this.options = options;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -213,5 +275,56 @@ public class UndertowEndpoint extends DefaultEndpoint implements HeaderFilterStr
if (sslContextParameters != null) {
sslContext = sslContextParameters.createSSLContext();
}
+
+ // create options map
+ if (options != null && !options.isEmpty()) {
+
+ // favor to use the classloader that loaded the user application
+ ClassLoader cl = getComponent().getCamelContext().getApplicationContextClassLoader();
+ if (cl == null) {
+ cl = Options.class.getClassLoader();
+ }
+
+ OptionMap.Builder builder = OptionMap.builder();
+ for (Map.Entry<String, Object> entry : options.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ if (key != null && value != null) {
+ // upper case and dash as underscore
+ key = key.toUpperCase(Locale.ENGLISH).replace('-', '_');
+ // must be field name
+ key = Options.class.getName() + "." + key;
+ Option option = Option.fromString(key, cl);
+ value = option.parseValue(value.toString(), cl);
+ LOG.trace("Parsed option {}={}", option.getName(), value);
+ builder.set(option, value);
+ }
+ }
+ optionMap = builder.getMap();
+ } else {
+ // use an empty map
+ optionMap = OptionMap.EMPTY;
+ }
+
+ // and then configure these default options if they have not been explicit configured
+ if (keepAlive != null && !optionMap.contains(Options.KEEP_ALIVE)) {
+ // rebuild map
+ OptionMap.Builder builder = OptionMap.builder();
+ builder.addAll(optionMap).set(Options.KEEP_ALIVE, keepAlive);
+ optionMap = builder.getMap();
+ }
+ if (tcpNoDelay != null && !optionMap.contains(Options.TCP_NODELAY)) {
+ // rebuild map
+ OptionMap.Builder builder = OptionMap.builder();
+ builder.addAll(optionMap).set(Options.TCP_NODELAY, tcpNoDelay);
+ optionMap = builder.getMap();
+ }
+ if (reuseAddresses != null && !optionMap.contains(Options.REUSE_ADDRESSES)) {
+ // rebuild map
+ OptionMap.Builder builder = OptionMap.builder();
+ builder.addAll(optionMap).set(Options.REUSE_ADDRESSES, tcpNoDelay);
+ optionMap = builder.getMap();
+ }
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index 84a8b26..0d1ba75 100644
--- a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++ b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -34,6 +34,7 @@ import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.BufferAllocator;
@@ -53,10 +54,14 @@ import org.xnio.XnioWorker;
public class UndertowProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(UndertowProducer.class);
private UndertowEndpoint endpoint;
+ private XnioWorker worker;
+ private ByteBufferSlicePool pool;
+ private OptionMap options;
- public UndertowProducer(UndertowEndpoint endpoint) {
+ public UndertowProducer(UndertowEndpoint endpoint, OptionMap options) {
super(endpoint);
this.endpoint = endpoint;
+ this.options = options;
}
@Override
@@ -66,12 +71,12 @@ public class UndertowProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
+ ClientConnection connection = null;
+
try {
final UndertowClient client = UndertowClient.getInstance();
- XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);
- IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker,
- new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192), OptionMap.EMPTY);
+ IoFuture<ClientConnection> connect = client.connect(endpoint.getHttpURI(), worker, pool, options);
// creating the url to use takes 2-steps
String url = UndertowHelper.createURL(exchange, getEndpoint());
@@ -99,9 +104,11 @@ public class UndertowProducer extends DefaultAsyncProducer {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing http {} method: {}", method, url);
}
- connect.get().sendRequest(request, new UndertowProducerCallback(bodyAsByte, exchange, callback));
+ connection = connect.get();
+ connection.sendRequest(request, new UndertowProducerCallback(connection, bodyAsByte, exchange, callback));
} catch (Exception e) {
+ IOHelper.close(connection);
exchange.setException(e);
callback.done(true);
return true;
@@ -115,23 +122,45 @@ public class UndertowProducer extends DefaultAsyncProducer {
return endpoint.getUndertowHttpBinding().toHttpRequest(request, camelExchange.getIn());
}
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ pool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8192, 8192 * 8192);
+ worker = Xnio.getInstance().createWorker(options);
+
+ LOG.debug("Created worker: {} with options: {}", worker, options);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if (worker != null && !worker.isShutdown()) {
+ LOG.debug("Shutting down worker: {}", worker);
+ worker.shutdown();
+ }
+ }
+
/**
* Everything important happens in callback
*/
private class UndertowProducerCallback implements ClientCallback<ClientExchange> {
+ private final ClientConnection connection;
private final ByteBuffer body;
private final Exchange camelExchange;
private final AsyncCallback callback;
- public UndertowProducerCallback(ByteBuffer body, Exchange camelExchange, AsyncCallback callback) {
+ public UndertowProducerCallback(ClientConnection connection, ByteBuffer body, Exchange camelExchange, AsyncCallback callback) {
+ this.connection = connection;
this.body = body;
this.camelExchange = camelExchange;
this.callback = callback;
}
@Override
- public void completed(ClientExchange clientExchange) {
+ public void completed(final ClientExchange clientExchange) {
clientExchange.setResponseListener(new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange clientExchange) {
@@ -146,6 +175,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
} catch (Exception e) {
camelExchange.setException(e);
} finally {
+ IOHelper.close(connection);
// make sure to call callback
callback.done(false);
}
@@ -155,8 +185,12 @@ public class UndertowProducer extends DefaultAsyncProducer {
public void failed(IOException e) {
LOG.trace("failed: {}", e);
camelExchange.setException(e);
- // make sure to call callback
- callback.done(false);
+ try {
+ IOHelper.close(connection);
+ } finally {
+ // make sure to call callback
+ callback.done(false);
+ }
}
});
@@ -167,6 +201,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
}
} catch (IOException e) {
camelExchange.setException(e);
+ IOHelper.close(connection);
// make sure to call callback
callback.done(false);
}
@@ -176,6 +211,7 @@ public class UndertowProducer extends DefaultAsyncProducer {
public void failed(IOException e) {
LOG.trace("failed: {}", e);
camelExchange.setException(e);
+ IOHelper.close(connection);
// make sure to call callback
callback.done(false);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
new file mode 100644
index 0000000..cb11463
--- /dev/null
+++ b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowProducerLeakTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.undertow;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+public class UndertowProducerLeakTest extends BaseUndertowTest {
+
+ @Test
+ public void testLeak() throws Exception {
+ getMockEndpoint("mock:result").expectedMinimumMessageCount(50);
+
+ assertMockEndpointsSatisfied(2, TimeUnit.MINUTES);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("undertow:http://localhost:{{port}}/test").to("log:undertow?showAll=true").to("mock:result");
+
+ from("timer:foo?period=100").transform(constant("hello world"))
+ .to("undertow:http://localhost:{{port}}/test");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c7a4146d/components/camel-undertow/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-undertow/src/test/resources/log4j.properties b/components/camel-undertow/src/test/resources/log4j.properties
index b8d5c8b..8b9fe88 100644
--- a/components/camel-undertow/src/test/resources/log4j.properties
+++ b/components/camel-undertow/src/test/resources/log4j.properties
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
-#log4j.logger.org.apache.camel.component.undertow=TRACE
+#log4j.logger.org.apache.camel.component.undertow=DEBUG
#log4j.logger.org.apache.camel.util.jsse=DEBUG
# CONSOLE appender not used by default