You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/07 15:45:54 UTC
[5/6] CAMEL-6555: camel-netty4 for Netty 4.x based component. Work in
progress.
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
new file mode 100644
index 0000000..37e28f9
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.SocketAddress;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.NoTypeConversionAvailableException;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class used internally by camel-netty using Netty.
+ *
+ * @version
+ */
+public final class NettyHelper {
+
+ public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
+ private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class);
+
+ private NettyHelper() {
+ // Utility class
+ }
+
+ /**
+ * Gets the string body to be used when sending with the textline codec.
+ *
+ * @param body the current body
+ * @param exchange the exchange
+ * @param delimiter the textline delimiter
+ * @param autoAppendDelimiter whether absent delimiter should be auto appended
+ * @return the string body to send
+ * @throws NoTypeConversionAvailableException is thrown if the current body could not be converted to a String type
+ */
+ public static String getTextlineBody(Object body, Exchange exchange, TextLineDelimiter delimiter, boolean autoAppendDelimiter) throws NoTypeConversionAvailableException {
+ String s = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+
+ // auto append delimiter if missing?
+ if (autoAppendDelimiter) {
+ if (TextLineDelimiter.LINE.equals(delimiter)) {
+ // line delimiter so ensure it ends with newline
+ if (!s.endsWith("\n")) {
+ LOG.trace("Auto appending missing newline delimiter to body");
+ s = s + "\n";
+ }
+ } else {
+ // null delimiter so ensure it ends with null
+ if (!s.endsWith("\u0000")) {
+ LOG.trace("Auto appending missing null delimiter to body");
+ s = s + "\u0000";
+ }
+ }
+ }
+
+ return s;
+ }
+
+ /**
+ * Writes the given body to Netty channel. Will <b>not</b >wait until the body has been written.
+ *
+ * @param log logger to use
+ * @param channel the Netty channel
+ * @param remoteAddress the remote address when using UDP
+ * @param body the body to write (send)
+ * @param exchange the exchange
+ * @param listener listener with work to be executed when the operation is complete
+ */
+ public static void writeBodyAsync(Logger log, Channel channel, SocketAddress remoteAddress, Object body,
+ Exchange exchange, ChannelFutureListener listener) {
+ ChannelFuture future;
+ if (remoteAddress != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: {} remote address: {} writing body: {}", new Object[]{channel, remoteAddress, body});
+ }
+ future = channel.write(body, remoteAddress);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Channel: {} writing body: {}", new Object[]{channel, body});
+ }
+ future = channel.write(body);
+ }
+
+ if (listener != null) {
+ future.addListener(listener);
+ }
+ }
+
+ /**
+ * Closes the given channel asynchronously
+ *
+ * @param channel the channel to close
+ */
+ public static void close(Channel channel) {
+ if (channel != null) {
+ LOG.trace("Closing channel: {}", channel);
+ channel.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
new file mode 100644
index 0000000..dca12b2
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Helper to get and set the correct payload when transferring data using camel-netty.
+ * Always use this helper instead of direct access on the exchange object.
+ * <p/>
+ * This helper ensures that we can also transfer exchange objects over the wire using the
+ * <tt>transferExchange=true</tt> option.
+ *
+ * @version
+ */
+public final class NettyPayloadHelper {
+
+ private NettyPayloadHelper() {
+ //Helper class
+ }
+
+ public static Object getIn(NettyEndpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getIn().getBody();
+ }
+ }
+
+ public static Object getOut(NettyEndpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getOut().getBody();
+ }
+ }
+
+ public static void setIn(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only
+ exchange.getIn().setBody(payload);
+ }
+ }
+
+ public static void setOut(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only and preserve the headers
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setBody(payload);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
new file mode 100644
index 0000000..e584b58
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -0,0 +1,535 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.CamelLogger;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyProducer extends DefaultAsyncProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
+ private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
+ private CamelContext context;
+ private NettyConfiguration configuration;
+ private ChannelFactory channelFactory;
+ private DatagramChannelFactory datagramChannelFactory;
+ private ClientPipelineFactory pipelineFactory;
+ private CamelLogger noReplyLogger;
+ private BossPool bossPool;
+ private WorkerPool workerPool;
+ private ObjectPool<Channel> pool;
+
+ public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
+ super(nettyEndpoint);
+ this.configuration = configuration;
+ this.context = this.getEndpoint().getCamelContext();
+ this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
+ }
+
+ @Override
+ public NettyEndpoint getEndpoint() {
+ return (NettyEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public CamelContext getContext() {
+ return context;
+ }
+
+ protected boolean isTcp() {
+ return configuration.getProtocol().equalsIgnoreCase("tcp");
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ if (configuration.isProducerPoolEnabled()) {
+ // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
+ GenericObjectPool.Config config = new GenericObjectPool.Config();
+ config.maxActive = configuration.getProducerPoolMaxActive();
+ config.minIdle = configuration.getProducerPoolMinIdle();
+ config.maxIdle = configuration.getProducerPoolMaxIdle();
+ // we should test on borrow to ensure the channel is still valid
+ config.testOnBorrow = true;
+ // only evict channels which are no longer valid
+ config.testWhileIdle = true;
+ // run eviction every 30th second
+ config.timeBetweenEvictionRunsMillis = 30 * 1000L;
+ config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
+ config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
+ pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
+ new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
+ }
+ } else {
+ pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
+ }
+ }
+
+ // setup pipeline factory
+ ClientPipelineFactory factory = configuration.getClientPipelineFactory();
+ if (factory != null) {
+ pipelineFactory = factory.createPipelineFactory(this);
+ } else {
+ pipelineFactory = new DefaultClientPipelineFactory(this);
+ }
+
+ if (isTcp()) {
+ setupTCPCommunication();
+ } else {
+ setupUDPCommunication();
+ }
+
+ if (!configuration.isLazyChannelCreation()) {
+ // ensure the connection can be established when we start up
+ Channel channel = pool.borrowObject();
+ pool.returnObject(channel);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ LOG.debug("Stopping producer at address: {}", configuration.getAddress());
+ // close all channels
+ LOG.trace("Closing {} channels", ALL_CHANNELS.size());
+ ChannelGroupFuture future = ALL_CHANNELS.close();
+ future.awaitUninterruptibly();
+
+ // and then release other resources
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ }
+
+ // and then shutdown the thread pools
+ if (bossPool != null) {
+ bossPool.shutdown();
+ bossPool = null;
+ }
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
+ }
+
+ if (pool != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+ }
+ pool.close();
+ pool = null;
+ }
+
+ super.doStop();
+ }
+
+ public boolean process(final Exchange exchange, AsyncCallback callback) {
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(true);
+ return true;
+ }
+
+ Object body;
+ try {
+ body = getRequestBody(exchange);
+ if (body == null) {
+ noReplyLogger.log("No payload to send for exchange: " + exchange);
+ callback.done(true);
+ return true;
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
+ // set the exchange encoding property
+ if (getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+ }
+
+ // get a channel from the pool
+ Channel existing;
+ try {
+ existing = pool.borrowObject();
+ if (existing != null) {
+ LOG.trace("Got channel from pool {}", existing);
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
+ // we must have a channel
+ if (existing == null) {
+ exchange.setException(new CamelExchangeException("Cannot get channel from pool", exchange));
+ callback.done(true);
+ return true;
+ }
+
+ // need to declare as final
+ final Channel channel = existing;
+ final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);
+
+ // setup state as attachment on the channel, so we can access the state later when needed
+ channel.setAttachment(new NettyCamelState(producerCallback, exchange));
+
+ // write body
+ NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ LOG.trace("Operation complete {}", channelFuture);
+ if (!channelFuture.isSuccess()) {
+ // no success the set the caused exception and signal callback and break
+ exchange.setException(channelFuture.getCause());
+ producerCallback.done(false);
+ return;
+ }
+
+ // if we do not expect any reply then signal callback to continue routing
+ if (!configuration.isSync()) {
+ try {
+ // should channel be closed after complete?
+ Boolean close;
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ } else {
+ close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
+ }
+
+ // should we disconnect, the header can override the configuration
+ boolean disconnect = getConfiguration().isDisconnect();
+ if (close != null) {
+ disconnect = close;
+ }
+ if (disconnect) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
+ }
+ NettyHelper.close(channel);
+ }
+ } finally {
+ // signal callback to continue routing
+ producerCallback.done(false);
+ }
+ }
+ }
+ });
+
+ // continue routing asynchronously
+ return false;
+ }
+
+ /**
+ * Gets the object we want to use as the request object for sending to netty.
+ *
+ * @param exchange the exchange
+ * @return the object to use as request
+ * @throws Exception is thrown if error getting the request body
+ */
+ protected Object getRequestBody(Exchange exchange) throws Exception {
+ Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
+ if (body == null) {
+ return null;
+ }
+
+ // if textline enabled then covert to a String which must be used for textline
+ if (getConfiguration().isTextline()) {
+ body = NettyHelper.getTextlineBody(body, exchange, getConfiguration().getDelimiter(), getConfiguration().isAutoAppendDelimiter());
+ }
+
+ return body;
+ }
+
+ /**
+ * To get the {@link NettyCamelState} from the given channel.
+ */
+ public NettyCamelState getState(Channel channel) {
+ return (NettyCamelState) channel.getAttachment();
+ }
+
+ /**
+ * To remove the {@link NettyCamelState} stored on the channel,
+ * when no longer needed
+ */
+ public void removeState(Channel channel) {
+ channel.setAttachment(null);
+ }
+
+ protected void setupTCPCommunication() throws Exception {
+ if (channelFactory == null) {
+ // prefer using explicit configured thread pools
+ BossPool bp = configuration.getBossPool();
+ WorkerPool wp = configuration.getWorkerPool();
+
+ if (bp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ bossPool = new NettyClientBossPoolBuilder()
+ .withBossCount(configuration.getBossCount())
+ .withName("NettyClientTCPBoss")
+ .build();
+ bp = bossPool;
+ }
+ if (wp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ workerPool = new NettyWorkerPoolBuilder()
+ .withWorkerCount(configuration.getWorkerCount())
+ .withName("NettyClientTCPWorker")
+ .build();
+ wp = workerPool;
+ }
+ channelFactory = new NioClientSocketChannelFactory(bp, wp);
+ }
+ }
+
+ protected void setupUDPCommunication() throws Exception {
+ if (datagramChannelFactory == null) {
+ int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
+ workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
+ datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
+ }
+ }
+
+ protected ChannelFuture openConnection() throws Exception {
+ ChannelFuture answer;
+
+ if (isTcp()) {
+ // its okay to create a new bootstrap for each new channel
+ ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
+ clientBootstrap.setOption("keepAlive", configuration.isKeepAlive());
+ clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
+ clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+ clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
+
+ // set any additional netty options
+ if (configuration.getOptions() != null) {
+ for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+ clientBootstrap.setOption(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // set the pipeline factory, which creates the pipeline for each newly created channels
+ clientBootstrap.setPipelineFactory(pipelineFactory);
+ answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}",
+ new Object[]{configuration.getHost(), configuration.getPort(), clientBootstrap.getOptions()});
+ }
+ return answer;
+ } else {
+ // its okay to create a new bootstrap for each new channel
+ ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+ connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+ connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+ connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+ connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+
+ // set any additional netty options
+ if (configuration.getOptions() != null) {
+ for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+ connectionlessClientBootstrap.setOption(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // set the pipeline factory, which creates the pipeline for each newly created channels
+ connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
+ // bind and store channel so we can close it when stopping
+ Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
+ ALL_CHANNELS.add(channel);
+ answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}",
+ new Object[]{configuration.getHost(), configuration.getPort(), connectionlessClientBootstrap.getOptions()});
+ }
+ return answer;
+ }
+ }
+
+ protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
+ // blocking for channel to be done
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
+ }
+ // here we need to wait it in other thread
+ final CountDownLatch channelLatch = new CountDownLatch(1);
+ channelFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture cf) throws Exception {
+ channelLatch.countDown();
+ }
+ });
+
+ try {
+ channelLatch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ throw new CamelException("Interrupted while waiting for " + "connection to "
+ + configuration.getAddress());
+ }
+
+
+ if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
+ throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
+ }
+ Channel answer = channelFuture.getChannel();
+ // to keep track of all channels in use
+ ALL_CHANNELS.add(answer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating connector to address: {}", configuration.getAddress());
+ }
+ return answer;
+ }
+
+ public NettyConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(NettyConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public ChannelFactory getChannelFactory() {
+ return channelFactory;
+ }
+
+ public void setChannelFactory(ChannelFactory channelFactory) {
+ this.channelFactory = channelFactory;
+ }
+
+ public ChannelGroup getAllChannels() {
+ return ALL_CHANNELS;
+ }
+
+ /**
+ * Callback that ensures the channel is returned to the pool when we are done.
+ */
+ private final class NettyProducerCallback implements AsyncCallback {
+
+ private final Channel channel;
+ private final AsyncCallback callback;
+
+ private NettyProducerCallback(Channel channel, AsyncCallback callback) {
+ this.channel = channel;
+ this.callback = callback;
+ }
+
+ @Override
+ public void done(boolean doneSync) {
+ // put back in pool
+ try {
+ LOG.trace("Putting channel back to pool {}", channel);
+ pool.returnObject(channel);
+ } catch (Exception e) {
+ LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel);
+ } finally {
+ // ensure we call the delegated callback
+ callback.done(doneSync);
+ }
+ }
+ }
+
+ /**
+ * Object factory to create {@link Channel} used by the pool.
+ */
+ private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<Channel> {
+
+ @Override
+ public Channel makeObject() throws Exception {
+ ChannelFuture channelFuture = openConnection();
+ Channel answer = openChannel(channelFuture);
+ LOG.trace("Created channel: {}", answer);
+ return answer;
+ }
+
+ @Override
+ public void destroyObject(Channel channel) throws Exception {
+ LOG.trace("Destroying channel: {}", channel);
+ NettyHelper.close(channel);
+ ALL_CHANNELS.remove(channel);
+ }
+
+ @Override
+ public boolean validateObject(Channel channel) {
+ // we need a connected channel to be valid
+ boolean answer = channel.isConnected();
+ LOG.trace("Validating channel: {} -> {}", channel, answer);
+ return answer;
+ }
+
+ @Override
+ public void activateObject(Channel channel) throws Exception {
+ // noop
+ }
+
+ @Override
+ public void passivateObject(Channel channel) throws Exception {
+ // noop
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
new file mode 100644
index 0000000..22effc8
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+public class NettyServerBootstrapConfiguration implements Cloneable {
+
+ protected String protocol;
+ protected String host;
+ protected int port;
+ protected boolean broadcast;
+ protected long sendBufferSize = 65536;
+ protected long receiveBufferSize = 65536;
+ protected int receiveBufferSizePredictor;
+ protected int bossCount = 1;
+ protected int workerCount;
+ protected boolean keepAlive = true;
+ protected boolean tcpNoDelay = true;
+ protected boolean reuseAddress = true;
+ protected long connectTimeout = 10000;
+ protected int backlog;
+ protected ServerPipelineFactory serverPipelineFactory;
+ protected NettyServerBootstrapFactory nettyServerBootstrapFactory;
+ protected Map<String, Object> options;
+ // SSL options is also part of the server bootstrap as the server listener on port X is either plain or SSL
+ protected boolean ssl;
+ protected boolean sslClientCertHeaders;
+ protected SslHandler sslHandler;
+ protected SSLContextParameters sslContextParameters;
+ protected boolean needClientAuth;
+ protected File keyStoreFile;
+ protected File trustStoreFile;
+ protected String keyStoreResource;
+ protected String trustStoreResource;
+ protected String keyStoreFormat;
+ protected String securityProvider;
+ protected String passphrase;
+ protected BossPool bossPool;
+ protected WorkerPool workerPool;
+ protected String networkInterface;
+
+ public String getAddress() {
+ return host + ":" + port;
+ }
+
+ public boolean isTcp() {
+ return protocol.equalsIgnoreCase("tcp");
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public boolean isBroadcast() {
+ return broadcast;
+ }
+
+ public void setBroadcast(boolean broadcast) {
+ this.broadcast = broadcast;
+ }
+
+ public long getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(long sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public long getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(long receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getReceiveBufferSizePredictor() {
+ return receiveBufferSizePredictor;
+ }
+
+ public void setReceiveBufferSizePredictor(int receiveBufferSizePredictor) {
+ this.receiveBufferSizePredictor = receiveBufferSizePredictor;
+ }
+
+ public int getWorkerCount() {
+ return workerCount;
+ }
+
+ public void setWorkerCount(int workerCount) {
+ this.workerCount = workerCount;
+ }
+
+ public int getBossCount() {
+ return bossCount;
+ }
+
+ public void setBossCount(int bossCount) {
+ this.bossCount = bossCount;
+ }
+
+ public boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ public void setKeepAlive(boolean keepAlive) {
+ this.keepAlive = keepAlive;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isReuseAddress() {
+ return reuseAddress;
+ }
+
+ public void setReuseAddress(boolean reuseAddress) {
+ this.reuseAddress = reuseAddress;
+ }
+
+ public long getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public int getBacklog() {
+ return backlog;
+ }
+
+ public void setBacklog(int backlog) {
+ this.backlog = backlog;
+ }
+
+ public boolean isSsl() {
+ return ssl;
+ }
+
+ public void setSsl(boolean ssl) {
+ this.ssl = ssl;
+ }
+
+ public boolean isSslClientCertHeaders() {
+ return sslClientCertHeaders;
+ }
+
+ public void setSslClientCertHeaders(boolean sslClientCertHeaders) {
+ this.sslClientCertHeaders = sslClientCertHeaders;
+ }
+
+ public SslHandler getSslHandler() {
+ return sslHandler;
+ }
+
+ public void setSslHandler(SslHandler sslHandler) {
+ this.sslHandler = sslHandler;
+ }
+
+ public SSLContextParameters getSslContextParameters() {
+ return sslContextParameters;
+ }
+
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ this.sslContextParameters = sslContextParameters;
+ }
+
+ public boolean isNeedClientAuth() {
+ return needClientAuth;
+ }
+
+ public void setNeedClientAuth(boolean needClientAuth) {
+ this.needClientAuth = needClientAuth;
+ }
+
+ @Deprecated
+ public File getKeyStoreFile() {
+ return keyStoreFile;
+ }
+
+ @Deprecated
+ public void setKeyStoreFile(File keyStoreFile) {
+ this.keyStoreFile = keyStoreFile;
+ }
+
+ @Deprecated
+ public File getTrustStoreFile() {
+ return trustStoreFile;
+ }
+
+ @Deprecated
+ public void setTrustStoreFile(File trustStoreFile) {
+ this.trustStoreFile = trustStoreFile;
+ }
+
+ public String getKeyStoreResource() {
+ return keyStoreResource;
+ }
+
+ public void setKeyStoreResource(String keyStoreResource) {
+ this.keyStoreResource = keyStoreResource;
+ }
+
+ public String getTrustStoreResource() {
+ return trustStoreResource;
+ }
+
+ public void setTrustStoreResource(String trustStoreResource) {
+ this.trustStoreResource = trustStoreResource;
+ }
+
+ public String getKeyStoreFormat() {
+ return keyStoreFormat;
+ }
+
+ public void setKeyStoreFormat(String keyStoreFormat) {
+ this.keyStoreFormat = keyStoreFormat;
+ }
+
+ public String getSecurityProvider() {
+ return securityProvider;
+ }
+
+ public void setSecurityProvider(String securityProvider) {
+ this.securityProvider = securityProvider;
+ }
+
+ public String getPassphrase() {
+ return passphrase;
+ }
+
+ public void setPassphrase(String passphrase) {
+ this.passphrase = passphrase;
+ }
+
+ public ServerPipelineFactory getServerPipelineFactory() {
+ return serverPipelineFactory;
+ }
+
+ public void setServerPipelineFactory(ServerPipelineFactory serverPipelineFactory) {
+ this.serverPipelineFactory = serverPipelineFactory;
+ }
+
+ public NettyServerBootstrapFactory getNettyServerBootstrapFactory() {
+ return nettyServerBootstrapFactory;
+ }
+
+ public void setNettyServerBootstrapFactory(NettyServerBootstrapFactory nettyServerBootstrapFactory) {
+ this.nettyServerBootstrapFactory = nettyServerBootstrapFactory;
+ }
+
+ public Map<String, Object> getOptions() {
+ return options;
+ }
+
+ public void setOptions(Map<String, Object> options) {
+ this.options = options;
+ }
+
+ public BossPool getBossPool() {
+ return bossPool;
+ }
+
+ public void setBossPool(BossPool bossPool) {
+ this.bossPool = bossPool;
+ }
+
+ public WorkerPool getWorkerPool() {
+ return workerPool;
+ }
+
+ public void setWorkerPool(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public String getNetworkInterface() {
+ return networkInterface;
+ }
+
+ public void setNetworkInterface(String networkInterface) {
+ this.networkInterface = networkInterface;
+ }
+
+ /**
+ * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
+ * with this, as a Netty listener bound on port X shares the same common
+ * {@link NettyServerBootstrapConfiguration}, which must be identical.
+ */
+ public boolean compatible(NettyServerBootstrapConfiguration other) {
+ boolean isCompatible = true;
+
+ if (!protocol.equals(other.protocol)) {
+ isCompatible = false;
+ } else if (!host.equals(other.host)) {
+ isCompatible = false;
+ } else if (port != other.port) {
+ isCompatible = false;
+ } else if (broadcast != other.broadcast) {
+ isCompatible = false;
+ } else if (sendBufferSize != other.sendBufferSize) {
+ return false;
+ } else if (receiveBufferSize != other.receiveBufferSize) {
+ isCompatible = false;
+ } else if (receiveBufferSizePredictor != other.receiveBufferSizePredictor) {
+ isCompatible = false;
+ } else if (workerCount != other.workerCount) {
+ isCompatible = false;
+ } else if (bossCount != other.bossCount) {
+ isCompatible = false;
+ } else if (keepAlive != other.keepAlive) {
+ isCompatible = false;
+ } else if (tcpNoDelay != other.tcpNoDelay) {
+ isCompatible = false;
+ } else if (reuseAddress != other.reuseAddress) {
+ isCompatible = false;
+ } else if (connectTimeout != other.connectTimeout) {
+ isCompatible = false;
+ } else if (backlog != other.backlog) {
+ isCompatible = false;
+ } else if (serverPipelineFactory != other.serverPipelineFactory) {
+ isCompatible = false;
+ } else if (nettyServerBootstrapFactory != other.nettyServerBootstrapFactory) {
+ isCompatible = false;
+ } else if (options == null && other.options != null) {
+ // validate all the options is identical
+ isCompatible = false;
+ } else if (options != null && other.options == null) {
+ isCompatible = false;
+ } else if (options != null && other.options != null && options.size() != other.options.size()) {
+ isCompatible = false;
+ } else if (options != null && other.options != null && !options.keySet().containsAll(other.options.keySet())) {
+ isCompatible = false;
+ } else if (options != null && other.options != null && !options.values().containsAll(other.options.values())) {
+ isCompatible = false;
+ } else if (ssl != other.ssl) {
+ isCompatible = false;
+ } else if (sslHandler != other.sslHandler) {
+ isCompatible = false;
+ } else if (sslContextParameters != other.sslContextParameters) {
+ isCompatible = false;
+ } else if (needClientAuth != other.needClientAuth) {
+ isCompatible = false;
+ } else if (keyStoreFile != other.keyStoreFile) {
+ isCompatible = false;
+ } else if (trustStoreFile != other.trustStoreFile) {
+ isCompatible = false;
+ } else if (keyStoreResource != null && !keyStoreResource.equals(other.keyStoreResource)) {
+ isCompatible = false;
+ } else if (trustStoreResource != null && !trustStoreResource.equals(other.trustStoreResource)) {
+ isCompatible = false;
+ } else if (keyStoreFormat != null && !keyStoreFormat.equals(other.keyStoreFormat)) {
+ isCompatible = false;
+ } else if (securityProvider != null && !securityProvider.equals(other.securityProvider)) {
+ isCompatible = false;
+ } else if (passphrase != null && !passphrase.equals(other.passphrase)) {
+ isCompatible = false;
+ } else if (bossPool != other.bossPool) {
+ isCompatible = false;
+ } else if (workerPool != other.workerPool) {
+ isCompatible = false;
+ } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) {
+ isCompatible = false;
+ }
+
+ return isCompatible;
+ }
+
+ public String toStringBootstrapConfiguration() {
+ return "NettyServerBootstrapConfiguration{"
+ + "protocol='" + protocol + '\''
+ + ", host='" + host + '\''
+ + ", port=" + port
+ + ", broadcast=" + broadcast
+ + ", sendBufferSize=" + sendBufferSize
+ + ", receiveBufferSize=" + receiveBufferSize
+ + ", receiveBufferSizePredictor=" + receiveBufferSizePredictor
+ + ", workerCount=" + workerCount
+ + ", bossCount=" + bossCount
+ + ", keepAlive=" + keepAlive
+ + ", tcpNoDelay=" + tcpNoDelay
+ + ", reuseAddress=" + reuseAddress
+ + ", connectTimeout=" + connectTimeout
+ + ", backlog=" + backlog
+ + ", serverPipelineFactory=" + serverPipelineFactory
+ + ", nettyServerBootstrapFactory=" + nettyServerBootstrapFactory
+ + ", options=" + options
+ + ", ssl=" + ssl
+ + ", sslHandler=" + sslHandler
+ + ", sslContextParameters='" + sslContextParameters + '\''
+ + ", needClientAuth=" + needClientAuth
+ + ", keyStoreFile=" + keyStoreFile
+ + ", trustStoreFile=" + trustStoreFile
+ + ", keyStoreResource='" + keyStoreResource + '\''
+ + ", trustStoreResource='" + trustStoreResource + '\''
+ + ", keyStoreFormat='" + keyStoreFormat + '\''
+ + ", securityProvider='" + securityProvider + '\''
+ + ", passphrase='" + passphrase + '\''
+ + ", bossPool=" + bossPool
+ + ", workerPool=" + workerPool
+ + ", networkInterface='" + networkInterface + '\''
+ + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
new file mode 100644
index 0000000..250366f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Service;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory for setting up Netty {@link org.jboss.netty.bootstrap.ServerBootstrap} and all
+ * the needed logic for doing that.
+ * <p/>
+ * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
+ * allows to share the same port for multiple consumers.
+ */
+public interface NettyServerBootstrapFactory extends Service {
+
+ /**
+ * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}.
+ *
+ * @param camelContext the {@link CamelContext} for non-shared bootstrap factory
+ * @param configuration the bootstrap configuration
+ * @param pipelineFactory the pipeline factory
+ */
+ void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
+
+ /**
+ * Initializes this <b>shared</b> {@link NettyServerBootstrapFactory}.
+ *
+ * @param threadFactory the thread factory to use for shared bootstrap factory
+ * @param configuration the bootstrap configuration
+ * @param pipelineFactory the pipeline factory
+ */
+ void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
+
+ /**
+ * When a new {@link Channel} is opened.
+ */
+ void addChannel(Channel channel);
+
+ /**
+ * When a {@link Channel} is closed.
+ */
+ void removeChannel(Channel channel);
+
+ /**
+ * When a {@link NettyConsumer} is added and uses this bootstrap factory.
+ */
+ void addConsumer(NettyConsumer consumer);
+
+ /**
+ * When a {@link NettyConsumer} is removed and no longer using this bootstrap factory.
+ */
+ void removeConsumer(NettyConsumer consumer);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
new file mode 100644
index 0000000..c90565f
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerBossPool;
+
+/**
+ * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
+ * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyServerBossPoolBuilder {
+
+ private String name = "NettyServerBoss";
+ private String pattern;
+ private int bossCount = 1;
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ public void setBossCount(int bossCount) {
+ this.bossCount = bossCount;
+ }
+
+ public NettyServerBossPoolBuilder withName(String name) {
+ setName(name);
+ return this;
+ }
+
+ public NettyServerBossPoolBuilder withPattern(String pattern) {
+ setPattern(pattern);
+ return this;
+ }
+
+ public NettyServerBossPoolBuilder withBossCount(int bossCount) {
+ setBossCount(bossCount);
+ return this;
+ }
+
+ /**
+ * Creates a new boss pool.
+ */
+ BossPool build() {
+ return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
new file mode 100644
index 0000000..0179d2b
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.channel.socket.nio.NioWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+
+/**
+ * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools
+ * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
+ */
+public final class NettyWorkerPoolBuilder {
+
+ private String name = "NettyWorker";
+ private String pattern;
+ private int workerCount;
+ private volatile WorkerPool workerPool;
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setPattern(String pattern) {
+ this.pattern = pattern;
+ }
+
+ public void setWorkerCount(int workerCount) {
+ this.workerCount = workerCount;
+ }
+
+ public NettyWorkerPoolBuilder withName(String name) {
+ setName(name);
+ return this;
+ }
+
+ public NettyWorkerPoolBuilder withPattern(String pattern) {
+ setPattern(pattern);
+ return this;
+ }
+
+ public NettyWorkerPoolBuilder withWorkerCount(int workerCount) {
+ setWorkerCount(workerCount);
+ return this;
+ }
+
+ /**
+ * Creates a new worker pool.
+ */
+ WorkerPool build() {
+ int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
+ workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name));
+ return workerPool;
+ }
+
+ /**
+ * Shutdown the created worker pool
+ */
+ public void destroy() {
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
new file mode 100644
index 0000000..e8ffe11
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+/**
+ * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
+ * <p/>
+ * Implementators must support creating a new instance of this factory which is associated
+ * to the given {@link NettyConsumer} using the {@link #createPipelineFactory(NettyConsumer)}
+ * method.
+ *
+ * @see ChannelPipelineFactory
+ */
+public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
+
+ /**
+ * Creates a new {@link ClientPipelineFactory} using the given {@link NettyConsumer}
+ *
+ * @param consumer the associated consumer
+ * @return the {@link ClientPipelineFactory} associated to ghe given consumer.
+ */
+ public abstract ServerPipelineFactory createPipelineFactory(NettyConsumer consumer);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
new file mode 100644
index 0000000..68f3e40
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import org.jboss.netty.channel.ChannelHandler;
+
+/**
+ * A {@link ChannelHandlerFactory} returning a shareable {@link ChannelHandler}.
+ */
+public class ShareableChannelHandlerFactory implements ChannelHandlerFactory {
+
+ private final ChannelHandler channelHandler;
+
+ public ShareableChannelHandlerFactory(ChannelHandler channelHandler) {
+ this.channelHandler = channelHandler;
+ }
+
+ @Override
+ public ChannelHandler newChannelHandler() {
+ return channelHandler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
new file mode 100644
index 0000000..9121838
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.util.NoSuchElementException;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * An {@link org.apache.commons.pool.ObjectPool} that uses a single shared instance.
+ * <p/>
+ * This implementation will always return <tt>1</tt> in {@link #getNumActive()} and
+ * return <tt>0</tt> in {@link #getNumIdle()}.
+ */
+public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
+
+ private final PoolableObjectFactory<T> factory;
+ private volatile T t;
+
+ public SharedSingletonObjectPool(PoolableObjectFactory<T> factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public synchronized T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+ if (t == null) {
+ t = factory.makeObject();
+ }
+ return t;
+ }
+
+ @Override
+ public void returnObject(T obj) throws Exception {
+ // noop
+ }
+
+ @Override
+ public void invalidateObject(T obj) throws Exception {
+ t = null;
+ }
+
+ @Override
+ public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+ // noop
+ }
+
+ @Override
+ public int getNumIdle() throws UnsupportedOperationException {
+ return 0;
+ }
+
+ @Override
+ public int getNumActive() throws UnsupportedOperationException {
+ return 1;
+ }
+
+ @Override
+ public void clear() throws Exception, UnsupportedOperationException {
+ t = null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ t = null;
+ }
+
+ @Override
+ public void setFactory(PoolableObjectFactory<T> factory) throws IllegalStateException, UnsupportedOperationException {
+ // noop
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
new file mode 100644
index 0000000..9806a38
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.support.ServiceSupport;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.BossPool;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
+ */
+public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class);
+ private final ChannelGroup allChannels;
+ private CamelContext camelContext;
+ private ThreadFactory threadFactory;
+ private NettyServerBootstrapConfiguration configuration;
+ private ChannelPipelineFactory pipelineFactory;
+ private ChannelFactory channelFactory;
+ private ServerBootstrap serverBootstrap;
+ private Channel channel;
+ private BossPool bossPool;
+ private WorkerPool workerPool;
+
+ public SingleTCPNettyServerBootstrapFactory() {
+ this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
+ }
+
+ public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ this.camelContext = camelContext;
+ this.configuration = configuration;
+ this.pipelineFactory = pipelineFactory;
+ }
+
+ public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ this.threadFactory = threadFactory;
+ this.configuration = configuration;
+ this.pipelineFactory = pipelineFactory;
+ }
+
+ public void addChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ public void removeChannel(Channel channel) {
+ allChannels.remove(channel);
+ }
+
+ public void addConsumer(NettyConsumer consumer) {
+ // does not allow sharing
+ }
+
+ public void removeConsumer(NettyConsumer consumer) {
+ // does not allow sharing
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (camelContext == null && threadFactory == null) {
+ throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
+ }
+ startServerBootstrap();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ stopServerBootstrap();
+ }
+
+ protected void startServerBootstrap() {
+ // prefer using explicit configured thread pools
+ BossPool bp = configuration.getBossPool();
+ WorkerPool wp = configuration.getWorkerPool();
+
+ if (bp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ bossPool = new NettyServerBossPoolBuilder()
+ .withBossCount(configuration.getBossCount())
+ .withName("NettyServerTCPBoss")
+ .build();
+ bp = bossPool;
+ }
+ if (wp == null) {
+ // create new pool which we should shutdown when stopping as its not shared
+ workerPool = new NettyWorkerPoolBuilder()
+ .withWorkerCount(configuration.getWorkerCount())
+ .withName("NettyServerTCPWorker")
+ .build();
+ wp = workerPool;
+ }
+
+ channelFactory = new NioServerSocketChannelFactory(bp, wp);
+
+ serverBootstrap = new ServerBootstrap(channelFactory);
+ serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+ serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+ if (configuration.getBacklog() > 0) {
+ serverBootstrap.setOption("backlog", configuration.getBacklog());
+ }
+
+ // set any additional netty options
+ if (configuration.getOptions() != null) {
+ for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+ serverBootstrap.setOption(entry.getKey(), entry.getValue());
+ }
+ }
+
+ LOG.debug("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions());
+
+ // set the pipeline factory, which creates the pipeline for each newly created channels
+ serverBootstrap.setPipelineFactory(pipelineFactory);
+
+ LOG.info("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+ channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
+ // to keep track of all channels in use
+ allChannels.add(channel);
+ }
+
+ protected void stopServerBootstrap() {
+ // close all channels
+ LOG.info("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
+
+ LOG.trace("Closing {} channels", allChannels.size());
+ ChannelGroupFuture future = allChannels.close();
+ future.awaitUninterruptibly();
+
+ // close server external resources
+ if (channelFactory != null) {
+ channelFactory.releaseExternalResources();
+ channelFactory = null;
+ }
+
+ // and then shutdown the thread pools
+ if (bossPool != null) {
+ bossPool.shutdown();
+ bossPool = null;
+ }
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
new file mode 100644
index 0000000..1a8d984
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.support.ServiceSupport;
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.DatagramChannel;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
+import org.jboss.netty.channel.socket.nio.WorkerPool;
+import org.jboss.netty.handler.ipfilter.IpV4Subnet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
+ */
+public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
+ private static final String LOOPBACK_INTERFACE = "lo";
+ private static final String MULTICAST_SUBNET = "224.0.0.0/4";
+ private final ChannelGroup allChannels;
+ private CamelContext camelContext;
+ private ThreadFactory threadFactory;
+ private NettyServerBootstrapConfiguration configuration;
+ private ChannelPipelineFactory pipelineFactory;
+ private DatagramChannelFactory datagramChannelFactory;
+ private ConnectionlessBootstrap connectionlessBootstrap;
+ private WorkerPool workerPool;
+
+ public SingleUDPNettyServerBootstrapFactory() {
+ this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
+ }
+
+ public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ this.camelContext = camelContext;
+ this.configuration = configuration;
+ this.pipelineFactory = pipelineFactory;
+ }
+
+ public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
+ this.threadFactory = threadFactory;
+ this.configuration = configuration;
+ this.pipelineFactory = pipelineFactory;
+ }
+
+ public void addChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ public void removeChannel(Channel channel) {
+ allChannels.remove(channel);
+ }
+
+ public void addConsumer(NettyConsumer consumer) {
+ // does not allow sharing
+ }
+
+ public void removeConsumer(NettyConsumer consumer) {
+ // does not allow sharing
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (camelContext == null && threadFactory == null) {
+ throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
+ }
+ startServerBootstrap();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ stopServerBootstrap();
+ }
+
+ protected void startServerBootstrap() throws UnknownHostException, SocketException {
+ // create non-shared worker pool
+ int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
+ workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
+
+ datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
+
+ connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
+ connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
+ connectionlessBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
+ connectionlessBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
+ connectionlessBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
+ connectionlessBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
+ connectionlessBootstrap.setOption("child.broadcast", configuration.isBroadcast());
+ connectionlessBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
+ connectionlessBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
+ // only set this if user has specified
+ if (configuration.getReceiveBufferSizePredictor() > 0) {
+ connectionlessBootstrap.setOption("receiveBufferSizePredictorFactory",
+ new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
+ }
+ if (configuration.getBacklog() > 0) {
+ connectionlessBootstrap.setOption("backlog", configuration.getBacklog());
+ }
+
+ // set any additional netty options
+ if (configuration.getOptions() != null) {
+ for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
+ connectionlessBootstrap.setOption(entry.getKey(), entry.getValue());
+ }
+ }
+
+ LOG.debug("Created ConnectionlessBootstrap {} with options: {}", connectionlessBootstrap, connectionlessBootstrap.getOptions());
+
+ // set the pipeline factory, which creates the pipeline for each newly created channels
+ connectionlessBootstrap.setPipelineFactory(pipelineFactory);
+
+ InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
+ IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
+
+ if (multicastSubnet.contains(configuration.getHost())) {
+ DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
+ String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
+ NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
+ LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
+ channel.joinGroup(hostAddress, multicastNetworkInterface);
+ allChannels.add(channel);
+ } else {
+ LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
+ Channel channel = connectionlessBootstrap.bind(hostAddress);
+ allChannels.add(channel);
+ }
+ }
+
+ protected void stopServerBootstrap() {
+ // close all channels
+ LOG.info("ConnectionlessBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort());
+
+ LOG.trace("Closing {} channels", allChannels.size());
+ ChannelGroupFuture future = allChannels.close();
+ future.awaitUninterruptibly();
+
+ // close server external resources
+ if (datagramChannelFactory != null) {
+ datagramChannelFactory.releaseExternalResources();
+ datagramChannelFactory = null;
+ }
+
+ // and then shutdown the thread pools
+ if (workerPool != null) {
+ workerPool.shutdown();
+ workerPool = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/1bb756cd/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
new file mode 100644
index 0000000..9791e57
--- /dev/null
+++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/TextLineDelimiter.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4;
+
+/**
+ * Possible text line delimiters to be used with the textline codec.
+ *
+ * @version
+ */
+public enum TextLineDelimiter {
+ LINE, NULL;
+}