You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/06/26 09:40:32 UTC
[1/3] git commit: CAMEL-6488: camel-netty-http allow to share port in
OSGi environment. Work in progress.
Updated Branches:
refs/heads/master 55c66af89 -> c90f92441
CAMEL-6488: camel-netty-http allow to share port in OSGi environment. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7c947b46
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7c947b46
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7c947b46
Branch: refs/heads/master
Commit: 7c947b46ca516a441d13a82813810feebbde0fd3
Parents: 55c66af
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jun 25 16:02:15 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jun 26 08:32:24 2013 +0200
----------------------------------------------------------------------
.../http/DefaultSharedNettyHttpServer.java | 78 +++++++++++++++++++
.../netty/http/HttpServerBootstrapFactory.java | 37 +++++----
.../http/HttpServerConsumerChannelFactory.java | 63 +++++++++++++++
.../netty/http/HttpServerPipelineFactory.java | 35 +++++++--
.../netty/http/NettyHttpComponent.java | 16 ++--
.../component/netty/http/NettyHttpConsumer.java | 10 +++
.../component/netty/http/NettyHttpEndpoint.java | 28 ++++++-
.../netty/http/SharedNettyHttpServer.java | 53 +++++++++++++
.../HttpServerMultiplexChannelHandler.java | 27 +++++--
.../netty/http/NettySharedHttpServerTest.java | 82 ++++++++++++++++++++
.../netty/DefaultServerPipelineFactory.java | 29 +++++--
.../component/netty/NettyConfiguration.java | 9 ---
.../NettyServerBootstrapConfiguration.java | 14 ++++
.../netty/NettyServerBootstrapFactory.java | 6 +-
.../SingleTCPNettyServerBootstrapFactory.java | 44 +++++++----
.../SingleUDPNettyServerBootstrapFactory.java | 27 ++++---
16 files changed, 478 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
new file mode 100644
index 0000000..ff24819
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.component.netty.DefaultServerPipelineFactory;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.component.netty.NettyServerBootstrapFactory;
+import org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * A default {@link SharedNettyHttpServer} to make sharing Netty server in Camel applications easier.
+ */
+public class DefaultSharedNettyHttpServer extends ServiceSupport implements SharedNettyHttpServer {
+
+ private NettyServerBootstrapConfiguration configuration;
+ private HttpServerConsumerChannelFactory channelFactory;
+ private HttpServerBootstrapFactory bootstrapFactory;
+
+ public void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public int getPort() {
+ return configuration != null ? configuration.getPort() : -1;
+ }
+
+ public HttpServerConsumerChannelFactory getConsumerChannelFactory() {
+ return channelFactory;
+ }
+
+ public NettyServerBootstrapFactory getServerBootstrapFactory() {
+ return bootstrapFactory;
+ }
+
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(configuration, "setNettyServerBootstrapConfiguration() must be called with a NettyServerBootstrapConfiguration instance", this);
+
+ // port must be set
+ if (configuration.getPort() <= 0) {
+ throw new IllegalArgumentException("Port must be configured on NettyServerBootstrapConfiguration " + configuration);
+ }
+
+ // force using tcp as the underlying transport
+ configuration.setProtocol("tcp");
+ // TODO: ChannelPipelineFactory should be a shared to handle adding consumers
+ ChannelPipelineFactory pipelineFactory = new HttpServerPipelineFactory(configuration);
+
+ channelFactory = new HttpServerMultiplexChannelHandler();
+ channelFactory.init(configuration.getPort());
+
+ // create bootstrap factory and disable compatible check as its shared among the consumers
+ bootstrapFactory = new HttpServerBootstrapFactory(channelFactory, false);
+ bootstrapFactory.init(null, configuration, pipelineFactory);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(bootstrapFactory, channelFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java
index e1168d7..cf8ae4e 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerBootstrapFactory.java
@@ -28,16 +28,22 @@ import org.slf4j.LoggerFactory;
public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFactory {
private static final Logger LOG = LoggerFactory.getLogger(HttpServerBootstrapFactory.class);
- private final NettyHttpComponent component;
+ private final HttpServerConsumerChannelFactory channelFactory;
private int port;
private NettyServerBootstrapConfiguration bootstrapConfiguration;
+ private boolean compatibleCheck;
- public HttpServerBootstrapFactory(NettyHttpComponent component) {
- this.component = component;
+ public HttpServerBootstrapFactory(HttpServerConsumerChannelFactory channelFactory) {
+ this(channelFactory, true);
+ }
+
+ public HttpServerBootstrapFactory(HttpServerConsumerChannelFactory channelFactory, boolean compatibleCheck) {
+ this.channelFactory = channelFactory;
+ this.compatibleCheck = compatibleCheck;
}
@Override
- public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
super.init(camelContext, configuration, pipelineFactory);
this.port = configuration.getPort();
this.bootstrapConfiguration = configuration;
@@ -46,21 +52,24 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac
}
public void addConsumer(NettyConsumer consumer) {
- // when adding additional consumers on the same port (eg to reuse port for multiple routes etc) then the Netty server bootstrap
- // configuration must match, as its the 1st consumer that calls the init method, which configuration is used for the Netty server bootstrap
- // we do this to avoid mis configuration, so people configure SSL and plain configuration on the same port etc.
+ if (compatibleCheck) {
+ // when adding additional consumers on the same port (eg to reuse port for multiple routes etc) then the Netty server bootstrap
+ // configuration must match, as its the 1st consumer that calls the init method, which configuration is used for the Netty server bootstrap
+ // we do this to avoid mis configuration, so people configure SSL and plain configuration on the same port etc.
- // first it may be the same instance, so only check for compatibility of different instance
- if (bootstrapConfiguration != consumer.getConfiguration() && !bootstrapConfiguration.compatible(consumer.getConfiguration())) {
- throw new IllegalArgumentException("Bootstrap configuration must be identical when adding additional consumer: " + consumer.getEndpoint() + " on same port: " + port
- + ".\n Existing " + bootstrapConfiguration.toStringBootstrapConfiguration() + "\n New " + consumer.getConfiguration().toStringBootstrapConfiguration());
+ // first it may be the same instance, so only check for compatibility of different instance
+ if (bootstrapConfiguration != consumer.getConfiguration() && !bootstrapConfiguration.compatible(consumer.getConfiguration())) {
+ throw new IllegalArgumentException("Bootstrap configuration must be identical when adding additional consumer: " + consumer.getEndpoint() + " on same port: " + port
+ + ".\n Existing " + bootstrapConfiguration.toStringBootstrapConfiguration() + "\n New " + consumer.getConfiguration().toStringBootstrapConfiguration());
+ }
}
if (LOG.isDebugEnabled()) {
NettyHttpConsumer httpConsumer = (NettyHttpConsumer) consumer;
LOG.debug("BootstrapFactory on port {} is adding consumer with context-path {}", port, httpConsumer.getConfiguration().getPath());
}
- component.getMultiplexChannelHandler(port).addConsumer((NettyHttpConsumer) consumer);
+
+ channelFactory.addConsumer((NettyHttpConsumer) consumer);
}
@Override
@@ -69,7 +78,7 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac
NettyHttpConsumer httpConsumer = (NettyHttpConsumer) consumer;
LOG.debug("BootstrapFactory on port {} is removing consumer with context-path {}", port, httpConsumer.getConfiguration().getPath());
}
- component.getMultiplexChannelHandler(port).removeConsumer((NettyHttpConsumer) consumer);
+ channelFactory.removeConsumer((NettyHttpConsumer) consumer);
}
@Override
@@ -81,7 +90,7 @@ public class HttpServerBootstrapFactory extends SingleTCPNettyServerBootstrapFac
@Override
public void stop() throws Exception {
// only stop if no more active consumers
- int consumers = component.getMultiplexChannelHandler(port).consumers();
+ int consumers = channelFactory.consumers();
if (consumers == 0) {
LOG.debug("BootstrapFactory on port {} is stopping", port);
super.stop();
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java
new file mode 100644
index 0000000..386cf09
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerConsumerChannelFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * Factory for setting up Netty {@link ChannelHandler} bound to a given Netty port.
+ * <p/>
+ * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
+ * allows to share the same port for multiple consumers.
+ *
+ * This factory is needed to ensure we can handle the situations when consumers is added and removing in
+ * a dynamic environment such as OSGi, where Camel applications can be hot-deployed. And we want these
+ * Camel applications to be able to share the same Netty port in a easy way.
+ */
+public interface HttpServerConsumerChannelFactory {
+
+ /**
+ * Initializes this consumer channel factory with the given port.
+ */
+ void init(int port);
+
+ /**
+ * The port number this consumer channel factory is using.
+ */
+ int getPort();
+
+ /**
+ * Adds the given consumer.
+ */
+ void addConsumer(NettyHttpConsumer consumer);
+
+ /**
+ * Removes the given consumer
+ */
+ void removeConsumer(NettyHttpConsumer consumer);
+
+ /**
+ * Number of active consumers
+ */
+ int consumers();
+
+ /**
+ * Gets the {@link ChannelHandler}
+ */
+ ChannelHandler getChannelHandler();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
index 434fa36..9772460 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
@@ -20,6 +20,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.camel.component.netty.NettyConsumer;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
import org.apache.camel.component.netty.ServerPipelineFactory;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
import org.apache.camel.util.ObjectHelper;
@@ -47,10 +48,23 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
// default constructor needed
}
+ public HttpServerPipelineFactory(NettyServerBootstrapConfiguration configuration) {
+ this.consumer = null;
+ try {
+ this.sslContext = createSSLContext(configuration);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
+ }
+
public HttpServerPipelineFactory(NettyHttpConsumer nettyConsumer) {
this.consumer = nettyConsumer;
try {
- this.sslContext = createSSLContext(consumer);
+ this.sslContext = createSSLContext(consumer.getConfiguration());
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
@@ -70,6 +84,7 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
+ // TODO: on demand use configuration
SslHandler sslHandler = configureServerSSLOnDemand();
if (sslHandler != null) {
LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
@@ -86,22 +101,28 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
pipeline.addLast("deflater", new HttpContentCompressor());
}
+ // TODO: shared netty http server in ctr
// handler to route Camel messages
- int port = consumer.getConfiguration().getPort();
- ChannelHandler handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port);
+ ChannelHandler handler;
+ if (consumer.getSharedNettyHttpServer() != null) {
+ handler = consumer.getSharedNettyHttpServer().getConsumerChannelFactory().getChannelHandler();
+ } else {
+ int port = consumer.getConfiguration().getPort();
+ handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port).getChannelHandler();
+ }
pipeline.addLast("handler", handler);
return pipeline;
}
- private SSLContext createSSLContext(NettyConsumer consumer) throws Exception {
- if (!consumer.getConfiguration().isSsl()) {
+ private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
return null;
}
// create ssl context once
- if (consumer.getConfiguration().getSslContextParameters() != null) {
- SSLContext context = consumer.getConfiguration().getSslContextParameters().createSSLContext();
+ if (configuration.getSslContextParameters() != null) {
+ SSLContext context = configuration.getSslContextParameters().createSSLContext();
return context;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
index 11a728d..1542c9e 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
@@ -37,10 +37,8 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder;
*/
public class NettyHttpComponent extends NettyComponent implements HeaderFilterStrategyAware {
- // TODO: support on consumer
- // - urlrewrite
-
- private final Map<Integer, HttpServerMultiplexChannelHandler> multiplexChannelHandlers = new HashMap<Integer, HttpServerMultiplexChannelHandler>();
+ // factories which is created by this component and therefore manage their lifecycles
+ private final Map<Integer, HttpServerConsumerChannelFactory> multiplexChannelHandlers = new HashMap<Integer, HttpServerConsumerChannelFactory>();
private final Map<String, HttpServerBootstrapFactory> bootstrapFactories = new HashMap<String, HttpServerBootstrapFactory>();
private NettyHttpBinding nettyHttpBinding;
private HeaderFilterStrategy headerFilterStrategy;
@@ -129,10 +127,11 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt
this.headerFilterStrategy = headerFilterStrategy;
}
- public synchronized HttpServerMultiplexChannelHandler getMultiplexChannelHandler(int port) {
- HttpServerMultiplexChannelHandler answer = multiplexChannelHandlers.get(port);
+ public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
+ HttpServerConsumerChannelFactory answer = multiplexChannelHandlers.get(port);
if (answer == null) {
- answer = new HttpServerMultiplexChannelHandler(port);
+ answer = new HttpServerMultiplexChannelHandler();
+ answer.init(port);
multiplexChannelHandlers.put(port, answer);
}
return answer;
@@ -142,7 +141,8 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt
String key = consumer.getConfiguration().getAddress();
HttpServerBootstrapFactory answer = bootstrapFactories.get(key);
if (answer == null) {
- answer = new HttpServerBootstrapFactory(this);
+ HttpServerConsumerChannelFactory channelFactory = getMultiplexChannelHandler(consumer.getConfiguration().getPort());
+ answer = new HttpServerBootstrapFactory(channelFactory);
answer.init(getCamelContext(), consumer.getConfiguration(), new HttpServerPipelineFactory(consumer));
bootstrapFactories.put(key, answer);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
index 2c27204..e4f9b40 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
@@ -26,6 +26,8 @@ import org.apache.camel.util.ObjectHelper;
*/
public class NettyHttpConsumer extends NettyConsumer {
+ private SharedNettyHttpServer sharedNettyHttpServer;
+
public NettyHttpConsumer(NettyHttpEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
super(nettyEndpoint, processor, configuration);
}
@@ -40,6 +42,14 @@ public class NettyHttpConsumer extends NettyConsumer {
return (NettyHttpConfiguration) super.getConfiguration();
}
+ public SharedNettyHttpServer getSharedNettyHttpServer() {
+ return sharedNettyHttpServer;
+ }
+
+ public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) {
+ this.sharedNettyHttpServer = sharedNettyHttpServer;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
index 011ce65..aac08bd 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
@@ -31,17 +31,21 @@ import org.apache.camel.util.ObjectHelper;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* HTTP based {@link NettyEndpoint}
*/
public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStrategyAware {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyHttpEndpoint.class);
private String uriParameters;
private NettyHttpBinding nettyHttpBinding;
private HeaderFilterStrategy headerFilterStrategy;
private boolean traceEnabled;
private String httpMethodRestrict;
+ private SharedNettyHttpServer sharedNettyHttpServer;
public NettyHttpEndpoint(String endpointUri, NettyHttpComponent component, NettyConfiguration configuration) {
super(endpointUri, component, configuration);
@@ -56,10 +60,18 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra
public Consumer createConsumer(Processor processor) throws Exception {
NettyHttpConsumer answer = new NettyHttpConsumer(this, processor, getConfiguration());
configureConsumer(answer);
- // reuse pipeline factory for the same address
- HttpServerBootstrapFactory factory = getComponent().getOrCreateHttpNettyServerBootstrapFactory(answer);
- // force using our server bootstrap factory
- answer.setNettyServerBootstrapFactory(factory);
+
+ if (sharedNettyHttpServer != null) {
+ answer.setSharedNettyHttpServer(sharedNettyHttpServer);
+ answer.setNettyServerBootstrapFactory(sharedNettyHttpServer.getServerBootstrapFactory());
+ LOG.debug("Created NettyHttpConsumer: {} using SharedNettyHttpServer: {}", answer, sharedNettyHttpServer);
+ } else {
+ // reuse pipeline factory for the same address
+ HttpServerBootstrapFactory factory = getComponent().getOrCreateHttpNettyServerBootstrapFactory(answer);
+ // force using our server bootstrap factory
+ answer.setNettyServerBootstrapFactory(factory);
+ LOG.debug("Created NettyHttpConsumer: {} using HttpServerBootstrapFactory: {}", answer, factory);
+ }
return answer;
}
@@ -148,6 +160,14 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra
this.uriParameters = uriParameters;
}
+ public SharedNettyHttpServer getSharedNettyHttpServer() {
+ return sharedNettyHttpServer;
+ }
+
+ public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) {
+ this.sharedNettyHttpServer = sharedNettyHttpServer;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
new file mode 100644
index 0000000..66244bb
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.Service;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.component.netty.NettyServerBootstrapFactory;
+
+/**
+ * A single interface to easily configure and setup a shared Netty HTTP server
+ * to be re-used among other Camel applications.
+ * <p/>
+ * To use this, just define a {@link NettyServerBootstrapConfiguration} configuration, and
+ * set this using {@link #setNettyServerBootstrapConfiguration(org.apache.camel.component.netty.NettyServerBootstrapConfiguration)}.
+ * Then call the {@link #start()} to initialize this shared server.
+ */
+public interface SharedNettyHttpServer extends Service {
+
+ /**
+ * Sets the bootstrap configuration to use by this shared Netty HTTP server.
+ */
+ void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration);
+
+ /**
+ * Gets the port number this Netty HTTP server uses.
+ */
+ int getPort();
+
+ /**
+ * Gets the {@link HttpServerConsumerChannelFactory} to use.
+ */
+ HttpServerConsumerChannelFactory getConsumerChannelFactory();
+
+ /**
+ * Gets the {@link NettyServerBootstrapFactory} to use.
+ */
+ NettyServerBootstrapFactory getServerBootstrapFactory();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
index 71d9218..1674235 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerMultiplexChannelHandler.java
@@ -23,9 +23,11 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.camel.Exchange;
import org.apache.camel.component.netty.http.ContextPathMatcher;
import org.apache.camel.component.netty.http.DefaultContextPathMatcher;
+import org.apache.camel.component.netty.http.HttpServerConsumerChannelFactory;
import org.apache.camel.component.netty.http.NettyHttpConsumer;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@@ -44,15 +46,21 @@ import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
* target handler based on the http context path in the incoming request. This is used to allow to reuse
* the same Netty consumer, allowing to have multiple routes on the same netty {@link org.jboss.netty.bootstrap.ServerBootstrap}
*/
-public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHandler {
+public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHandler implements HttpServerConsumerChannelFactory {
// use NettyHttpConsumer as logger to make it easier to read the logs as this is part of the consumer
private static final transient Logger LOG = LoggerFactory.getLogger(NettyHttpConsumer.class);
private final ConcurrentMap<ContextPathMatcher, HttpServerChannelHandler> consumers = new ConcurrentHashMap<ContextPathMatcher, HttpServerChannelHandler>();
- private final String token;
- private final int len;
+ private int port;
+ private String token;
+ private int len;
- public HttpServerMultiplexChannelHandler(int port) {
+ public HttpServerMultiplexChannelHandler() {
+ // must have default no-arg constructor to allow IoC containers to manage it
+ }
+
+ public void init(int port) {
+ this.port = port;
this.token = ":" + port;
this.len = token.length();
}
@@ -69,13 +77,18 @@ public class HttpServerMultiplexChannelHandler extends SimpleChannelUpstreamHand
consumers.remove(matcher);
}
- /**
- * Number of active consumers.
- */
public int consumers() {
return consumers.size();
}
+ public int getPort() {
+ return port;
+ }
+
+ public ChannelHandler getChannelHandler() {
+ return this;
+ }
+
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
// store request, as this channel handler is created per pipeline
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
new file mode 100644
index 0000000..67bbff4
--- /dev/null
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+public class NettySharedHttpServerTest extends BaseNettyTest {
+
+ private SharedNettyHttpServer sharedNettyHttpServer;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ sharedNettyHttpServer = new DefaultSharedNettyHttpServer();
+
+ NettyServerBootstrapConfiguration configuration = new NettyServerBootstrapConfiguration();
+ configuration.setPort(getPort());
+ configuration.setHost("localhost");
+ configuration.setBacklog(20);
+ configuration.setKeepAlive(true);
+ sharedNettyHttpServer.setNettyServerBootstrapConfiguration(configuration);
+
+ sharedNettyHttpServer.start();
+
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myNettyServer", sharedNettyHttpServer);
+ return jndi;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ sharedNettyHttpServer.stop();
+ super.tearDown();
+ }
+
+ @Test
+ public void testTwoRoutes() throws Exception {
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:bar").expectedBodiesReceived("Hello Camel");
+
+ String out = template.requestBody("netty-http:http://localhost:{{port}}/foo", "Hello World", String.class);
+ assertEquals("Bye World", out);
+
+ out = template.requestBody("netty-http:http://localhost:{{port}}/bar", "Hello Camel", String.class);
+ assertEquals("Bye Camel", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("netty-http:http://0.0.0.0:{{port}}/foo?sharedNettyHttpServer=#myNettyServer")
+ .to("mock:foo")
+ .transform().constant("Bye World");
+
+ from("netty-http:http://0.0.0.0:{{port}}/bar?sharedNettyHttpServer=#myNettyServer")
+ .to("mock:bar")
+ .transform().constant("Bye Camel");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
index 247067d..6ae04d0 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultServerPipelineFactory.java
@@ -34,16 +34,33 @@ import org.slf4j.LoggerFactory;
public class DefaultServerPipelineFactory extends ServerPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(DefaultServerPipelineFactory.class);
- private final NettyConsumer consumer;
+ private NettyConsumer consumer;
private SSLContext sslContext;
+ public DefaultServerPipelineFactory(NettyServerBootstrapConfiguration configuration) {
+ this.consumer = null;
+ try {
+ this.sslContext = createSSLContext(configuration);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
+ }
+
public DefaultServerPipelineFactory(NettyConsumer consumer) {
this.consumer = consumer;
try {
- this.sslContext = createSSLContext(consumer);
+ this.sslContext = createSSLContext(consumer.getConfiguration());
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
}
@Override
@@ -97,14 +114,14 @@ public class DefaultServerPipelineFactory extends ServerPipelineFactory {
pipeline.addLast(name, handler);
}
- private SSLContext createSSLContext(NettyConsumer consumer) throws Exception {
- if (!consumer.getConfiguration().isSsl()) {
+ private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
return null;
}
// create ssl context once
- if (consumer.getConfiguration().getSslContextParameters() != null) {
- SSLContext context = consumer.getConfiguration().getSslContextParameters().createSSLContext();
+ if (configuration.getSslContextParameters() != null) {
+ SSLContext context = configuration.getSslContextParameters().createSSLContext();
return context;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
index 0c1fa0e..53bd02f 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
@@ -58,7 +58,6 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
private LoggingLevel serverClosedChannelExceptionCaughtLogLevel = LoggingLevel.DEBUG;
private boolean allowDefaultCodec = true;
private ClientPipelineFactory clientPipelineFactory;
- private SSLContextParameters sslContextParameters;
private int maximumPoolSize = 16;
private boolean orderedThreadPoolExecutor = true;
private int producerPoolMaxActive = -1;
@@ -366,14 +365,6 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem
return clientPipelineFactory;
}
- public SSLContextParameters getSslContextParameters() {
- return sslContextParameters;
- }
-
- public void setSslContextParameters(SSLContextParameters sslContextParameters) {
- this.sslContextParameters = sslContextParameters;
- }
-
public int getMaximumPoolSize() {
return maximumPoolSize;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
index 6992874..bc71182 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.netty;
import java.io.File;
import java.util.Map;
+import org.apache.camel.util.jsse.SSLContextParameters;
import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
// SSL options is also part of the server bootstrap as the server listener on port X is either plain or SSL
protected boolean ssl;
protected SslHandler sslHandler;
+ protected SSLContextParameters sslContextParameters;
protected boolean needClientAuth;
protected File keyStoreFile;
protected File trustStoreFile;
@@ -182,6 +184,14 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
this.sslHandler = sslHandler;
}
+ public SSLContextParameters getSslContextParameters() {
+ return sslContextParameters;
+ }
+
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ this.sslContextParameters = sslContextParameters;
+ }
+
public boolean isNeedClientAuth() {
return needClientAuth;
}
@@ -349,6 +359,9 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
if (sslHandler != other.sslHandler) {
return false;
}
+ if (sslContextParameters != other.sslContextParameters) {
+ return false;
+ }
if (needClientAuth != other.needClientAuth) {
return false;
}
@@ -396,6 +409,7 @@ public class NettyServerBootstrapConfiguration implements Cloneable {
", options=" + options +
", ssl=" + ssl +
", sslHandler=" + sslHandler +
+ ", sslContextParameters='" + sslContextParameters + '\'' +
", needClientAuth=" + needClientAuth +
", keyStoreFile=" + keyStoreFile +
", trustStoreFile=" + trustStoreFile +
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
index 6dbb817..a27ccfb 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyServerBootstrapFactory.java
@@ -32,8 +32,12 @@ public interface NettyServerBootstrapFactory extends Service {
/**
* Initializes this {@link NettyServerBootstrapFactory}.
+ *
+ * @param camelContext Use <tt>null</tt> if this factory is to be shared among other Camel applications.
+ * @param configuration the bootstrap configuration
+ * @param pipelineFactory the pipeline factory
*/
- void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory);
+ void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
/**
* When a new {@link Channel} is opened.
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
index c6dba03..45442ce 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleTCPNettyServerBootstrapFactory.java
@@ -19,10 +19,10 @@ package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.camel.CamelContext;
import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -42,7 +42,7 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class);
private final ChannelGroup allChannels;
private CamelContext camelContext;
- private NettyConfiguration configuration;
+ private NettyServerBootstrapConfiguration configuration;
private ChannelPipelineFactory pipelineFactory;
private ChannelFactory channelFactory;
private ServerBootstrap serverBootstrap;
@@ -54,7 +54,8 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
}
- public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ // notice CamelContext can be optional
this.camelContext = camelContext;
this.configuration = configuration;
this.pipelineFactory = pipelineFactory;
@@ -78,7 +79,6 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
@Override
protected void doStart() throws Exception {
- ObjectHelper.notNull(camelContext, "CamelContext");
startServerBootstrap();
}
@@ -88,15 +88,25 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
}
protected void startServerBootstrap() {
- bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
- workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
-
- if (configuration.getWorkerCount() <= 0) {
- channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+ if (camelContext != null) {
+ bossExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+ workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
+
+ if (configuration.getWorkerCount() <= 0) {
+ channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
+ } else {
+ channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
+ configuration.getWorkerCount());
+ }
} else {
- channelFactory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor,
- configuration.getWorkerCount());
+ if (configuration.getWorkerCount() <= 0) {
+ channelFactory = new NioServerSocketChannelFactory();
+ } else {
+ channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(), configuration.getWorkerCount());
+ }
}
+
serverBootstrap = new ServerBootstrap(channelFactory);
serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
@@ -138,11 +148,19 @@ public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport impleme
// and then shutdown the thread pools
if (bossExecutor != null) {
- camelContext.getExecutorServiceManager().shutdown(bossExecutor);
+ if (camelContext != null) {
+ camelContext.getExecutorServiceManager().shutdown(bossExecutor);
+ } else {
+ bossExecutor.shutdownNow();
+ }
bossExecutor = null;
}
if (workerExecutor != null) {
- camelContext.getExecutorServiceManager().shutdown(workerExecutor);
+ if (camelContext != null) {
+ camelContext.getExecutorServiceManager().shutdown(workerExecutor);
+ } else {
+ workerExecutor.shutdownNow();
+ }
workerExecutor = null;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/7c947b46/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
index ea6e5a0..138c98f 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/SingleUDPNettyServerBootstrapFactory.java
@@ -19,10 +19,10 @@ package org.apache.camel.component.netty;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.camel.CamelContext;
import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -43,19 +43,19 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
private final ChannelGroup allChannels;
private CamelContext camelContext;
- private NettyConfiguration configuration;
+ private NettyServerBootstrapConfiguration configuration;
private ChannelPipelineFactory pipelineFactory;
private DatagramChannelFactory datagramChannelFactory;
private ConnectionlessBootstrap connectionlessServerBootstrap;
private Channel channel;
- private ExecutorService bossExecutor;
private ExecutorService workerExecutor;
public SingleUDPNettyServerBootstrapFactory() {
this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
}
- public void init(CamelContext camelContext, NettyConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ // notice CamelContext can be optional
this.camelContext = camelContext;
this.configuration = configuration;
this.pipelineFactory = pipelineFactory;
@@ -79,7 +79,6 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
@Override
protected void doStart() throws Exception {
- ObjectHelper.notNull(camelContext, "CamelContext");
startServerBootstrap();
}
@@ -89,12 +88,18 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
}
protected void startServerBootstrap() {
- workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+ if (camelContext != null) {
+ workerExecutor = camelContext.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+ } else {
+ workerExecutor = Executors.newCachedThreadPool();
+ }
+
if (configuration.getWorkerCount() <= 0) {
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
} else {
datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
}
+
connectionlessServerBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
connectionlessServerBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
connectionlessServerBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
@@ -143,12 +148,12 @@ public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport impleme
}
// and then shutdown the thread pools
- if (bossExecutor != null) {
- camelContext.getExecutorServiceManager().shutdown(bossExecutor);
- bossExecutor = null;
- }
if (workerExecutor != null) {
- camelContext.getExecutorServiceManager().shutdown(workerExecutor);
+ if (camelContext != null) {
+ camelContext.getExecutorServiceManager().shutdown(workerExecutor);
+ } else {
+ workerExecutor.shutdownNow();
+ }
workerExecutor = null;
}
}
[3/3] git commit: CAMEL-6488: camel-netty-http allow to share port in
OSGi environment. Work in progress.
Posted by da...@apache.org.
CAMEL-6488: camel-netty-http allow to share port in OSGi environment. Work in progress.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c90f9244
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c90f9244
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c90f9244
Branch: refs/heads/master
Commit: c90f924412bbcba14678d3eed8b7c32a57e95c05
Parents: 1f8d47a
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 26 09:39:24 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jun 26 09:39:24 2013 +0200
----------------------------------------------------------------------
.../http/DefaultNettySharedHttpServer.java | 83 ++++++++++
.../http/DefaultSharedNettyHttpServer.java | 78 ---------
.../netty/http/HttpServerPipelineFactory.java | 77 ++++-----
.../http/HttpServerSharedPipelineFactory.java | 166 +++++++++++++++++++
.../netty/http/NettyHttpComponent.java | 14 ++
.../component/netty/http/NettyHttpConsumer.java | 10 --
.../component/netty/http/NettyHttpEndpoint.java | 17 +-
.../netty/http/NettySharedHttpServer.java | 59 +++++++
.../netty/http/SharedNettyHttpServer.java | 53 ------
.../netty/http/NettySharedHttpServerTest.java | 18 +-
10 files changed, 370 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java
new file mode 100644
index 0000000..0917d51
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettySharedHttpServer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.component.netty.NettyServerBootstrapFactory;
+import org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler;
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * A default {@link NettySharedHttpServer} to make sharing Netty server in Camel applications easier.
+ */
+public class DefaultNettySharedHttpServer extends ServiceSupport implements NettySharedHttpServer {
+
+ private NettyServerBootstrapConfiguration configuration;
+ private HttpServerConsumerChannelFactory channelFactory;
+ private HttpServerBootstrapFactory bootstrapFactory;
+ private ClassResolver classResolver;
+
+ public void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public void setClassResolver(ClassResolver classResolver) {
+ this.classResolver = classResolver;
+ }
+
+ public int getPort() {
+ return configuration != null ? configuration.getPort() : -1;
+ }
+
+ public HttpServerConsumerChannelFactory getConsumerChannelFactory() {
+ return channelFactory;
+ }
+
+ public NettyServerBootstrapFactory getServerBootstrapFactory() {
+ return bootstrapFactory;
+ }
+
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(configuration, "setNettyServerBootstrapConfiguration() must be called with a NettyServerBootstrapConfiguration instance", this);
+
+ // port must be set
+ if (configuration.getPort() <= 0) {
+ throw new IllegalArgumentException("Port must be configured on NettyServerBootstrapConfiguration " + configuration);
+ }
+
+ // force using tcp as the underlying transport
+ configuration.setProtocol("tcp");
+
+ channelFactory = new HttpServerMultiplexChannelHandler();
+ channelFactory.init(configuration.getPort());
+
+ ChannelPipelineFactory pipelineFactory = new HttpServerSharedPipelineFactory(configuration, channelFactory, classResolver);
+
+ // create bootstrap factory and disable compatible check as its shared among the consumers
+ bootstrapFactory = new HttpServerBootstrapFactory(channelFactory, false);
+ bootstrapFactory.init(null, configuration, pipelineFactory);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopServices(bootstrapFactory, channelFactory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
deleted file mode 100644
index ff24819..0000000
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultSharedNettyHttpServer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty.http;
-
-import org.apache.camel.component.netty.DefaultServerPipelineFactory;
-import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
-import org.apache.camel.component.netty.NettyServerBootstrapFactory;
-import org.apache.camel.component.netty.http.handlers.HttpServerMultiplexChannelHandler;
-import org.apache.camel.support.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-
-/**
- * A default {@link SharedNettyHttpServer} to make sharing Netty server in Camel applications easier.
- */
-public class DefaultSharedNettyHttpServer extends ServiceSupport implements SharedNettyHttpServer {
-
- private NettyServerBootstrapConfiguration configuration;
- private HttpServerConsumerChannelFactory channelFactory;
- private HttpServerBootstrapFactory bootstrapFactory;
-
- public void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public int getPort() {
- return configuration != null ? configuration.getPort() : -1;
- }
-
- public HttpServerConsumerChannelFactory getConsumerChannelFactory() {
- return channelFactory;
- }
-
- public NettyServerBootstrapFactory getServerBootstrapFactory() {
- return bootstrapFactory;
- }
-
- protected void doStart() throws Exception {
- ObjectHelper.notNull(configuration, "setNettyServerBootstrapConfiguration() must be called with a NettyServerBootstrapConfiguration instance", this);
-
- // port must be set
- if (configuration.getPort() <= 0) {
- throw new IllegalArgumentException("Port must be configured on NettyServerBootstrapConfiguration " + configuration);
- }
-
- // force using tcp as the underlying transport
- configuration.setProtocol("tcp");
- // TODO: ChannelPipelineFactory should be a shared to handle adding consumers
- ChannelPipelineFactory pipelineFactory = new HttpServerPipelineFactory(configuration);
-
- channelFactory = new HttpServerMultiplexChannelHandler();
- channelFactory.init(configuration.getPort());
-
- // create bootstrap factory and disable compatible check as its shared among the consumers
- bootstrapFactory = new HttpServerBootstrapFactory(channelFactory, false);
- bootstrapFactory.init(null, configuration, pipelineFactory);
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopServices(bootstrapFactory, channelFactory);
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
index 9772460..6eae81c 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerPipelineFactory.java
@@ -23,6 +23,7 @@ import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
import org.apache.camel.component.netty.ServerPipelineFactory;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.camel.spi.ClassResolver;
import org.apache.camel.util.ObjectHelper;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
@@ -41,28 +42,17 @@ import org.slf4j.LoggerFactory;
public class HttpServerPipelineFactory extends ServerPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(HttpServerPipelineFactory.class);
- private NettyHttpConsumer consumer;
- private SSLContext sslContext;
+ protected NettyHttpConsumer consumer;
+ protected SSLContext sslContext;
+ protected NettyServerBootstrapConfiguration configuration;
public HttpServerPipelineFactory() {
// default constructor needed
}
- public HttpServerPipelineFactory(NettyServerBootstrapConfiguration configuration) {
- this.consumer = null;
- try {
- this.sslContext = createSSLContext(configuration);
- } catch (Exception e) {
- throw ObjectHelper.wrapRuntimeCamelException(e);
- }
-
- if (sslContext != null) {
- LOG.info("Created SslContext {}", sslContext);
- }
- }
-
public HttpServerPipelineFactory(NettyHttpConsumer nettyConsumer) {
this.consumer = nettyConsumer;
+ this.configuration = nettyConsumer.getConfiguration();
try {
this.sslContext = createSSLContext(consumer.getConfiguration());
} catch (Exception e) {
@@ -84,8 +74,7 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
- // TODO: on demand use configuration
- SslHandler sslHandler = configureServerSSLOnDemand();
+ SslHandler sslHandler = configureServerSSLOnDemand(configuration);
if (sslHandler != null) {
LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
pipeline.addLast("ssl", sslHandler);
@@ -101,15 +90,8 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
pipeline.addLast("deflater", new HttpContentCompressor());
}
- // TODO: shared netty http server in ctr
- // handler to route Camel messages
- ChannelHandler handler;
- if (consumer.getSharedNettyHttpServer() != null) {
- handler = consumer.getSharedNettyHttpServer().getConsumerChannelFactory().getChannelHandler();
- } else {
- int port = consumer.getConfiguration().getPort();
- handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port).getChannelHandler();
- }
+ int port = consumer.getConfiguration().getPort();
+ ChannelHandler handler = consumer.getEndpoint().getComponent().getMultiplexChannelHandler(port).getChannelHandler();
pipeline.addLast("handler", handler);
return pipeline;
@@ -129,47 +111,48 @@ public class HttpServerPipelineFactory extends ServerPipelineFactory {
return null;
}
- private SslHandler configureServerSSLOnDemand() throws Exception {
- if (!consumer.getConfiguration().isSsl()) {
+ private SslHandler configureServerSSLOnDemand(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
return null;
}
- if (consumer.getConfiguration().getSslHandler() != null) {
- return consumer.getConfiguration().getSslHandler();
+ if (configuration.getSslHandler() != null) {
+ return configuration.getSslHandler();
} else if (sslContext != null) {
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(false);
- engine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+ engine.setNeedClientAuth(configuration.isNeedClientAuth());
return new SslHandler(engine);
} else {
- if (consumer.getConfiguration().getKeyStoreFile() == null && consumer.getConfiguration().getKeyStoreResource() == null) {
+ if (configuration.getKeyStoreFile() == null && configuration.getKeyStoreResource() == null) {
LOG.debug("keystorefile is null");
}
- if (consumer.getConfiguration().getTrustStoreFile() == null && consumer.getConfiguration().getTrustStoreResource() == null) {
+ if (configuration.getTrustStoreFile() == null && configuration.getTrustStoreResource() == null) {
LOG.debug("truststorefile is null");
}
- if (consumer.getConfiguration().getPassphrase().toCharArray() == null) {
+ if (configuration.getPassphrase().toCharArray() == null) {
LOG.debug("passphrase is null");
}
SSLEngineFactory sslEngineFactory;
- if (consumer.getConfiguration().getKeyStoreFile() != null || consumer.getConfiguration().getTrustStoreFile() != null) {
+ if (configuration.getKeyStoreFile() != null || configuration.getTrustStoreFile() != null) {
sslEngineFactory = new SSLEngineFactory(
- consumer.getConfiguration().getKeyStoreFormat(),
- consumer.getConfiguration().getSecurityProvider(),
- consumer.getConfiguration().getKeyStoreFile(),
- consumer.getConfiguration().getTrustStoreFile(),
- consumer.getConfiguration().getPassphrase().toCharArray());
+ configuration.getKeyStoreFormat(),
+ configuration.getSecurityProvider(),
+ configuration.getKeyStoreFile(),
+ configuration.getTrustStoreFile(),
+ configuration.getPassphrase().toCharArray());
} else {
- sslEngineFactory = new SSLEngineFactory(consumer.getContext().getClassResolver(),
- consumer.getConfiguration().getKeyStoreFormat(),
- consumer.getConfiguration().getSecurityProvider(),
- consumer.getConfiguration().getKeyStoreResource(),
- consumer.getConfiguration().getTrustStoreResource(),
- consumer.getConfiguration().getPassphrase().toCharArray());
+ ClassResolver resolver = consumer != null ? consumer.getContext().getClassResolver() : null;
+ sslEngineFactory = new SSLEngineFactory(resolver,
+ configuration.getKeyStoreFormat(),
+ configuration.getSecurityProvider(),
+ configuration.getKeyStoreResource(),
+ configuration.getTrustStoreResource(),
+ configuration.getPassphrase().toCharArray());
}
SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
sslEngine.setUseClientMode(false);
- sslEngine.setNeedClientAuth(consumer.getConfiguration().isNeedClientAuth());
+ sslEngine.setNeedClientAuth(configuration.isNeedClientAuth());
return new SslHandler(sslEngine);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedPipelineFactory.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedPipelineFactory.java
new file mode 100644
index 0000000..d82143d
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/HttpServerSharedPipelineFactory.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.camel.component.netty.NettyConsumer;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.component.netty.ServerPipelineFactory;
+import org.apache.camel.component.netty.ssl.SSLEngineFactory;
+import org.apache.camel.spi.ClassResolver;
+import org.apache.camel.util.ObjectHelper;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpContentCompressor;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A shared {@link org.apache.camel.component.netty.ServerPipelineFactory} for a shared Netty HTTP server.
+ *
+ * @see NettySharedHttpServer
+ */
+public class HttpServerSharedPipelineFactory extends HttpServerPipelineFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HttpServerSharedPipelineFactory.class);
+ private final NettyServerBootstrapConfiguration configuration;
+ private final HttpServerConsumerChannelFactory channelFactory;
+ private final ClassResolver classResolver;
+ private SSLContext sslContext;
+
+ public HttpServerSharedPipelineFactory(NettyServerBootstrapConfiguration configuration, HttpServerConsumerChannelFactory channelFactory,
+ ClassResolver classResolver) {
+ this.configuration = configuration;
+ this.channelFactory = channelFactory;
+ this.classResolver = classResolver;
+ try {
+ this.sslContext = createSSLContext(configuration);
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+
+ if (sslContext != null) {
+ LOG.info("Created SslContext {}", sslContext);
+ }
+ }
+
+ @Override
+ public ServerPipelineFactory createPipelineFactory(NettyConsumer nettyConsumer) {
+ throw new UnsupportedOperationException("Should not call this operation");
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ SslHandler sslHandler = configureServerSSLOnDemand(configuration);
+ if (sslHandler != null) {
+ LOG.debug("Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}", sslHandler);
+ pipeline.addLast("ssl", sslHandler);
+ }
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ // Uncomment the following line if you don't want to handle HttpChunks.
+ if (supportChunked()) {
+ pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ }
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ if (supportCompressed()) {
+ pipeline.addLast("deflater", new HttpContentCompressor());
+ }
+
+ pipeline.addLast("handler", channelFactory.getChannelHandler());
+
+ return pipeline;
+ }
+
+ private SSLContext createSSLContext(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
+ return null;
+ }
+
+ // create ssl context once
+ if (configuration.getSslContextParameters() != null) {
+ return configuration.getSslContextParameters().createSSLContext();
+ }
+
+ return null;
+ }
+
+ private SslHandler configureServerSSLOnDemand(NettyServerBootstrapConfiguration configuration) throws Exception {
+ if (!configuration.isSsl()) {
+ return null;
+ }
+
+ if (configuration.getSslHandler() != null) {
+ return configuration.getSslHandler();
+ } else if (sslContext != null) {
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setUseClientMode(false);
+ engine.setNeedClientAuth(configuration.isNeedClientAuth());
+ return new SslHandler(engine);
+ } else {
+ if (configuration.getKeyStoreFile() == null && configuration.getKeyStoreResource() == null) {
+ LOG.debug("keystorefile is null");
+ }
+ if (configuration.getTrustStoreFile() == null && configuration.getTrustStoreResource() == null) {
+ LOG.debug("truststorefile is null");
+ }
+ if (configuration.getPassphrase().toCharArray() == null) {
+ LOG.debug("passphrase is null");
+ }
+ SSLEngineFactory sslEngineFactory;
+ if (configuration.getKeyStoreFile() != null || configuration.getTrustStoreFile() != null) {
+ sslEngineFactory = new SSLEngineFactory(
+ configuration.getKeyStoreFormat(),
+ configuration.getSecurityProvider(),
+ configuration.getKeyStoreFile(),
+ configuration.getTrustStoreFile(),
+ configuration.getPassphrase().toCharArray());
+ } else {
+ sslEngineFactory = new SSLEngineFactory(classResolver,
+ configuration.getKeyStoreFormat(),
+ configuration.getSecurityProvider(),
+ configuration.getKeyStoreResource(),
+ configuration.getTrustStoreResource(),
+ configuration.getPassphrase().toCharArray());
+ }
+ SSLEngine sslEngine = sslEngineFactory.createServerSSLEngine();
+ sslEngine.setUseClientMode(false);
+ sslEngine.setNeedClientAuth(configuration.isNeedClientAuth());
+ return new SslHandler(sslEngine);
+ }
+ }
+
+ private boolean supportChunked() {
+ // TODO: options on bootstrap
+ return true;
+ }
+
+ private boolean supportCompressed() {
+ // TODO: options on bootstrap
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
index 1542c9e..6d5c4c3 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpComponent.java
@@ -31,12 +31,16 @@ import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Netty HTTP based component.
*/
public class NettyHttpComponent extends NettyComponent implements HeaderFilterStrategyAware {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyHttpComponent.class);
+
// factories which is created by this component and therefore manage their lifecycles
private final Map<Integer, HttpServerConsumerChannelFactory> multiplexChannelHandlers = new HashMap<Integer, HttpServerConsumerChannelFactory>();
private final Map<String, HttpServerBootstrapFactory> bootstrapFactories = new HashMap<String, HttpServerBootstrapFactory>();
@@ -73,6 +77,14 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt
// validate config
config.validateConfiguration();
+ // are we using a shared http server?
+ NettySharedHttpServer shared = resolveAndRemoveReferenceParameter(parameters, "nettySharedHttpServer", NettySharedHttpServer.class);
+ if (shared != null) {
+ // use port number from the shared http server
+ LOG.debug("Using NettySharedHttpServer: {} with port: {}", shared, shared.getPort());
+ config.setPort(shared.getPort());
+ }
+
NettyHttpEndpoint answer = new NettyHttpEndpoint(remaining, this, config);
answer.setTimer(getTimer());
setProperties(answer.getConfiguration(), parameters);
@@ -90,6 +102,8 @@ public class NettyHttpComponent extends NettyComponent implements HeaderFilterSt
if (answer.getHeaderFilterStrategy() == null) {
answer.setHeaderFilterStrategy(getHeaderFilterStrategy());
}
+
+ answer.setNettySharedHttpServer(shared);
return answer;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
index e4f9b40..2c27204 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpConsumer.java
@@ -26,8 +26,6 @@ import org.apache.camel.util.ObjectHelper;
*/
public class NettyHttpConsumer extends NettyConsumer {
- private SharedNettyHttpServer sharedNettyHttpServer;
-
public NettyHttpConsumer(NettyHttpEndpoint nettyEndpoint, Processor processor, NettyConfiguration configuration) {
super(nettyEndpoint, processor, configuration);
}
@@ -42,14 +40,6 @@ public class NettyHttpConsumer extends NettyConsumer {
return (NettyHttpConfiguration) super.getConfiguration();
}
- public SharedNettyHttpServer getSharedNettyHttpServer() {
- return sharedNettyHttpServer;
- }
-
- public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) {
- this.sharedNettyHttpServer = sharedNettyHttpServer;
- }
-
@Override
protected void doStart() throws Exception {
super.doStart();
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
index aac08bd..18565b4 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettyHttpEndpoint.java
@@ -45,7 +45,7 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra
private HeaderFilterStrategy headerFilterStrategy;
private boolean traceEnabled;
private String httpMethodRestrict;
- private SharedNettyHttpServer sharedNettyHttpServer;
+ private NettySharedHttpServer nettySharedHttpServer;
public NettyHttpEndpoint(String endpointUri, NettyHttpComponent component, NettyConfiguration configuration) {
super(endpointUri, component, configuration);
@@ -61,10 +61,9 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra
NettyHttpConsumer answer = new NettyHttpConsumer(this, processor, getConfiguration());
configureConsumer(answer);
- if (sharedNettyHttpServer != null) {
- answer.setSharedNettyHttpServer(sharedNettyHttpServer);
- answer.setNettyServerBootstrapFactory(sharedNettyHttpServer.getServerBootstrapFactory());
- LOG.debug("Created NettyHttpConsumer: {} using SharedNettyHttpServer: {}", answer, sharedNettyHttpServer);
+ if (nettySharedHttpServer != null) {
+ answer.setNettyServerBootstrapFactory(nettySharedHttpServer.getServerBootstrapFactory());
+ LOG.debug("Created NettyHttpConsumer: {} using NettySharedHttpServer: {}", answer, nettySharedHttpServer);
} else {
// reuse pipeline factory for the same address
HttpServerBootstrapFactory factory = getComponent().getOrCreateHttpNettyServerBootstrapFactory(answer);
@@ -160,12 +159,12 @@ public class NettyHttpEndpoint extends NettyEndpoint implements HeaderFilterStra
this.uriParameters = uriParameters;
}
- public SharedNettyHttpServer getSharedNettyHttpServer() {
- return sharedNettyHttpServer;
+ public NettySharedHttpServer getNettySharedHttpServer() {
+ return nettySharedHttpServer;
}
- public void setSharedNettyHttpServer(SharedNettyHttpServer sharedNettyHttpServer) {
- this.sharedNettyHttpServer = sharedNettyHttpServer;
+ public void setNettySharedHttpServer(NettySharedHttpServer nettySharedHttpServer) {
+ this.nettySharedHttpServer = nettySharedHttpServer;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java
new file mode 100644
index 0000000..8e41d40
--- /dev/null
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/NettySharedHttpServer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty.http;
+
+import org.apache.camel.Service;
+import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
+import org.apache.camel.component.netty.NettyServerBootstrapFactory;
+import org.apache.camel.spi.ClassResolver;
+
+/**
+ * A single interface to easily configure and setup a shared Netty HTTP server
+ * to be re-used among other Camel applications.
+ * <p/>
+ * To use this, just define a {@link NettyServerBootstrapConfiguration} configuration, and
+ * set this using {@link #setNettyServerBootstrapConfiguration(org.apache.camel.component.netty.NettyServerBootstrapConfiguration)}.
+ * Then call the {@link #start()} to initialize this shared server.
+ */
+public interface NettySharedHttpServer extends Service {
+
+ /**
+ * Sets the bootstrap configuration to use by this shared Netty HTTP server.
+ */
+ void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration);
+
+ /**
+ * To use a custom {@link ClassResolver} for loading resource on the classpath.
+ */
+ void setClassResolver(ClassResolver classResolver);
+
+ /**
+ * Gets the port number this Netty HTTP server uses.
+ */
+ int getPort();
+
+ /**
+ * Gets the {@link HttpServerConsumerChannelFactory} to use.
+ */
+ HttpServerConsumerChannelFactory getConsumerChannelFactory();
+
+ /**
+ * Gets the {@link NettyServerBootstrapFactory} to use.
+ */
+ NettyServerBootstrapFactory getServerBootstrapFactory();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
deleted file mode 100644
index 66244bb..0000000
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/SharedNettyHttpServer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty.http;
-
-import org.apache.camel.Service;
-import org.apache.camel.component.netty.NettyServerBootstrapConfiguration;
-import org.apache.camel.component.netty.NettyServerBootstrapFactory;
-
-/**
- * A single interface to easily configure and setup a shared Netty HTTP server
- * to be re-used among other Camel applications.
- * <p/>
- * To use this, just define a {@link NettyServerBootstrapConfiguration} configuration, and
- * set this using {@link #setNettyServerBootstrapConfiguration(org.apache.camel.component.netty.NettyServerBootstrapConfiguration)}.
- * Then call the {@link #start()} to initialize this shared server.
- */
-public interface SharedNettyHttpServer extends Service {
-
- /**
- * Sets the bootstrap configuration to use by this shared Netty HTTP server.
- */
- void setNettyServerBootstrapConfiguration(NettyServerBootstrapConfiguration configuration);
-
- /**
- * Gets the port number this Netty HTTP server uses.
- */
- int getPort();
-
- /**
- * Gets the {@link HttpServerConsumerChannelFactory} to use.
- */
- HttpServerConsumerChannelFactory getConsumerChannelFactory();
-
- /**
- * Gets the {@link NettyServerBootstrapFactory} to use.
- */
- NettyServerBootstrapFactory getServerBootstrapFactory();
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/c90f9244/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
index 67bbff4..ca887ca 100644
--- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
+++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/NettySharedHttpServerTest.java
@@ -23,29 +23,29 @@ import org.junit.Test;
public class NettySharedHttpServerTest extends BaseNettyTest {
- private SharedNettyHttpServer sharedNettyHttpServer;
+ private NettySharedHttpServer nettySharedHttpServer;
@Override
protected JndiRegistry createRegistry() throws Exception {
- sharedNettyHttpServer = new DefaultSharedNettyHttpServer();
+ nettySharedHttpServer = new DefaultNettySharedHttpServer();
NettyServerBootstrapConfiguration configuration = new NettyServerBootstrapConfiguration();
configuration.setPort(getPort());
configuration.setHost("localhost");
configuration.setBacklog(20);
configuration.setKeepAlive(true);
- sharedNettyHttpServer.setNettyServerBootstrapConfiguration(configuration);
+ nettySharedHttpServer.setNettyServerBootstrapConfiguration(configuration);
- sharedNettyHttpServer.start();
+ nettySharedHttpServer.start();
JndiRegistry jndi = super.createRegistry();
- jndi.bind("myNettyServer", sharedNettyHttpServer);
+ jndi.bind("myNettyServer", nettySharedHttpServer);
return jndi;
}
@Override
public void tearDown() throws Exception {
- sharedNettyHttpServer.stop();
+ nettySharedHttpServer.stop();
super.tearDown();
}
@@ -68,11 +68,13 @@ public class NettySharedHttpServerTest extends BaseNettyTest {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("netty-http:http://0.0.0.0:{{port}}/foo?sharedNettyHttpServer=#myNettyServer")
+ // we are using a shared netty http server, so the port number is not needed to be defined in the uri
+ from("netty-http:http://localhost/foo?nettySharedHttpServer=#myNettyServer")
.to("mock:foo")
.transform().constant("Bye World");
- from("netty-http:http://0.0.0.0:{{port}}/bar?sharedNettyHttpServer=#myNettyServer")
+ // we are using a shared netty http server, so the port number is not needed to be defined in the uri
+ from("netty-http:http://localhost/bar?nettySharedHttpServer=#myNettyServer")
.to("mock:bar")
.transform().constant("Bye Camel");
}
[2/3] git commit: Upgraded to jruby 1.7.4
Posted by da...@apache.org.
Upgraded to jruby 1.7.4
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1f8d47a4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1f8d47a4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1f8d47a4
Branch: refs/heads/master
Commit: 1f8d47a4eb78c3ac7dfd838d898ffe6e72e199d2
Parents: 7c947b4
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 26 08:58:12 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jun 26 08:58:12 2013 +0200
----------------------------------------------------------------------
parent/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1f8d47a4/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 000662d..9fc13aa 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -220,7 +220,7 @@
<jodatime2-bundle-version>2.1</jodatime2-bundle-version>
<josql-bundle-version>1.5_5</josql-bundle-version>
<josql-version>1.5</josql-version>
- <jruby-version>1.7.3</jruby-version>
+ <jruby-version>1.7.4</jruby-version>
<jsendnsca-version>1.3.1</jsendnsca-version>
<jsmpp-version>2.1.0_4</jsmpp-version>
<jsch-bundle-version>0.1.49_1</jsch-bundle-version>