You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/02/03 10:11:55 UTC
[4/5] Removed camel-netty4 from kit as its not in development.
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
deleted file mode 100644
index 3bc9e90..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyEndpoint.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.math.BigInteger;
-import java.security.Principal;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import javax.security.cert.X509Certificate;
-
-import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.impl.SynchronousDelegateProducer;
-import org.apache.camel.util.ObjectHelper;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.util.Timer;
-
-public class NettyEndpoint extends DefaultEndpoint {
- private NettyConfiguration configuration;
- private Timer timer;
-
- public NettyEndpoint(String endpointUri, NettyComponent component, NettyConfiguration configuration) {
- super(endpointUri, component);
- this.configuration = configuration;
- }
-
- public Consumer createConsumer(Processor processor) throws Exception {
- Consumer answer = new NettyConsumer(this, processor, configuration);
- configureConsumer(answer);
- return answer;
- }
-
- public Producer createProducer() throws Exception {
- Producer answer = new NettyProducer(this, configuration);
- if (isSynchronous()) {
- return new SynchronousDelegateProducer(answer);
- } else {
- return answer;
- }
- }
-
- public Exchange createExchange(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
- Exchange exchange = createExchange();
- updateMessageHeader(exchange.getIn(), ctx, messageEvent);
- NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
- return exchange;
- }
-
- public boolean isSingleton() {
- return true;
- }
-
- @Override
- public NettyComponent getComponent() {
- return (NettyComponent) super.getComponent();
- }
-
- public NettyConfiguration getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(NettyConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public void setTimer(Timer timer) {
- this.timer = timer;
- }
-
- public Timer getTimer() {
- return timer;
- }
-
- @Override
- protected String createEndpointUri() {
- ObjectHelper.notNull(configuration, "configuration");
- return "netty4:" + getConfiguration().getProtocol() + "://" + getConfiguration().getHost() + ":" + getConfiguration().getPort();
- }
-
- @Override
- protected void doStart() throws Exception {
- ObjectHelper.notNull(timer, "timer");
- }
-
- protected SSLSession getSSLSession(ChannelHandlerContext ctx) {
- final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
- SSLSession sslSession = null;
- if (sslHandler != null) {
- sslSession = sslHandler.getEngine().getSession();
- }
- return sslSession;
- }
-
- protected void updateMessageHeader(Message in, ChannelHandlerContext ctx, MessageEvent messageEvent) {
- in.setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
- in.setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
- in.setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
- in.setHeader(NettyConstants.NETTY_LOCAL_ADDRESS, messageEvent.getChannel().getLocalAddress());
-
- if (configuration.isSsl()) {
- // setup the SslSession header
- SSLSession sslSession = getSSLSession(ctx);
- in.setHeader(NettyConstants.NETTY_SSL_SESSION, sslSession);
-
- // enrich headers with details from the client certificate if option is enabled
- if (configuration.isSslClientCertHeaders()) {
- enrichWithClientCertInformation(sslSession, in);
- }
- }
- }
-
- /**
- * Enriches the message with client certificate details such as subject name, serial number etc.
- * <p/>
- * If the certificate is unverified then the headers is not enriched.
- *
- * @param sslSession the SSL session
- * @param message the message to enrich
- */
- protected void enrichWithClientCertInformation(SSLSession sslSession, Message message) {
- try {
- X509Certificate[] certificates = sslSession.getPeerCertificateChain();
- if (certificates != null && certificates.length > 0) {
- X509Certificate cert = certificates[0];
-
- Principal subject = cert.getSubjectDN();
- if (subject != null) {
- message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SUBJECT_NAME, subject.getName());
- }
- Principal issuer = cert.getIssuerDN();
- if (issuer != null) {
- message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_ISSUER_NAME, issuer.getName());
- }
- BigInteger serial = cert.getSerialNumber();
- if (serial != null) {
- message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_SERIAL_NO, serial.toString());
- }
- message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_BEFORE, cert.getNotBefore());
- message.setHeader(NettyConstants.NETTY_SSL_CLIENT_CERT_NOT_AFTER, cert.getNotAfter());
- }
- } catch (SSLPeerUnverifiedException e) {
- // ignore
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
deleted file mode 100644
index 37e28f9..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.net.SocketAddress;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.NoTypeConversionAvailableException;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Helper class used internally by camel-netty using Netty.
- *
- * @version
- */
-public final class NettyHelper {
-
- public static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
- private static final Logger LOG = LoggerFactory.getLogger(NettyHelper.class);
-
- private NettyHelper() {
- // Utility class
- }
-
- /**
- * Gets the string body to be used when sending with the textline codec.
- *
- * @param body the current body
- * @param exchange the exchange
- * @param delimiter the textline delimiter
- * @param autoAppendDelimiter whether absent delimiter should be auto appended
- * @return the string body to send
- * @throws NoTypeConversionAvailableException is thrown if the current body could not be converted to a String type
- */
- public static String getTextlineBody(Object body, Exchange exchange, TextLineDelimiter delimiter, boolean autoAppendDelimiter) throws NoTypeConversionAvailableException {
- String s = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
-
- // auto append delimiter if missing?
- if (autoAppendDelimiter) {
- if (TextLineDelimiter.LINE.equals(delimiter)) {
- // line delimiter so ensure it ends with newline
- if (!s.endsWith("\n")) {
- LOG.trace("Auto appending missing newline delimiter to body");
- s = s + "\n";
- }
- } else {
- // null delimiter so ensure it ends with null
- if (!s.endsWith("\u0000")) {
- LOG.trace("Auto appending missing null delimiter to body");
- s = s + "\u0000";
- }
- }
- }
-
- return s;
- }
-
- /**
- * Writes the given body to Netty channel. Will <b>not</b >wait until the body has been written.
- *
- * @param log logger to use
- * @param channel the Netty channel
- * @param remoteAddress the remote address when using UDP
- * @param body the body to write (send)
- * @param exchange the exchange
- * @param listener listener with work to be executed when the operation is complete
- */
- public static void writeBodyAsync(Logger log, Channel channel, SocketAddress remoteAddress, Object body,
- Exchange exchange, ChannelFutureListener listener) {
- ChannelFuture future;
- if (remoteAddress != null) {
- if (log.isDebugEnabled()) {
- log.debug("Channel: {} remote address: {} writing body: {}", new Object[]{channel, remoteAddress, body});
- }
- future = channel.write(body, remoteAddress);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Channel: {} writing body: {}", new Object[]{channel, body});
- }
- future = channel.write(body);
- }
-
- if (listener != null) {
- future.addListener(listener);
- }
- }
-
- /**
- * Closes the given channel asynchronously
- *
- * @param channel the channel to close
- */
- public static void close(Channel channel) {
- if (channel != null) {
- LOG.trace("Closing channel: {}", channel);
- channel.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
deleted file mode 100644
index dca12b2..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultExchangeHolder;
-
-/**
- * Helper to get and set the correct payload when transferring data using camel-netty.
- * Always use this helper instead of direct access on the exchange object.
- * <p/>
- * This helper ensures that we can also transfer exchange objects over the wire using the
- * <tt>transferExchange=true</tt> option.
- *
- * @version
- */
-public final class NettyPayloadHelper {
-
- private NettyPayloadHelper() {
- //Helper class
- }
-
- public static Object getIn(NettyEndpoint endpoint, Exchange exchange) {
- if (endpoint.getConfiguration().isTransferExchange()) {
- // we should transfer the entire exchange over the wire (includes in/out)
- return DefaultExchangeHolder.marshal(exchange);
- } else {
- // normal transfer using the body only
- return exchange.getIn().getBody();
- }
- }
-
- public static Object getOut(NettyEndpoint endpoint, Exchange exchange) {
- if (endpoint.getConfiguration().isTransferExchange()) {
- // we should transfer the entire exchange over the wire (includes in/out)
- return DefaultExchangeHolder.marshal(exchange);
- } else {
- // normal transfer using the body only
- return exchange.getOut().getBody();
- }
- }
-
- public static void setIn(Exchange exchange, Object payload) {
- if (payload instanceof DefaultExchangeHolder) {
- DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
- } else {
- // normal transfer using the body only
- exchange.getIn().setBody(payload);
- }
- }
-
- public static void setOut(Exchange exchange, Object payload) {
- if (payload instanceof DefaultExchangeHolder) {
- DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
- } else {
- // normal transfer using the body only and preserve the headers
- exchange.getOut().setHeaders(exchange.getIn().getHeaders());
- exchange.getOut().setBody(payload);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
deleted file mode 100644
index e584b58..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ /dev/null
@@ -1,535 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelException;
-import org.apache.camel.CamelExchangeException;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.CamelLogger;
-import org.apache.camel.util.ExchangeHelper;
-import org.apache.camel.util.IOHelper;
-import org.apache.commons.pool.ObjectPool;
-import org.apache.commons.pool.PoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.DatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.BossPool;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
-import org.jboss.netty.channel.socket.nio.WorkerPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NettyProducer extends DefaultAsyncProducer {
- private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
- private static final ChannelGroup ALL_CHANNELS = new DefaultChannelGroup("NettyProducer");
- private CamelContext context;
- private NettyConfiguration configuration;
- private ChannelFactory channelFactory;
- private DatagramChannelFactory datagramChannelFactory;
- private ClientPipelineFactory pipelineFactory;
- private CamelLogger noReplyLogger;
- private BossPool bossPool;
- private WorkerPool workerPool;
- private ObjectPool<Channel> pool;
-
- public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
- super(nettyEndpoint);
- this.configuration = configuration;
- this.context = this.getEndpoint().getCamelContext();
- this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
- }
-
- @Override
- public NettyEndpoint getEndpoint() {
- return (NettyEndpoint) super.getEndpoint();
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
- public CamelContext getContext() {
- return context;
- }
-
- protected boolean isTcp() {
- return configuration.getProtocol().equalsIgnoreCase("tcp");
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
-
- if (configuration.isProducerPoolEnabled()) {
- // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
- GenericObjectPool.Config config = new GenericObjectPool.Config();
- config.maxActive = configuration.getProducerPoolMaxActive();
- config.minIdle = configuration.getProducerPoolMinIdle();
- config.maxIdle = configuration.getProducerPoolMaxIdle();
- // we should test on borrow to ensure the channel is still valid
- config.testOnBorrow = true;
- // only evict channels which are no longer valid
- config.testWhileIdle = true;
- // run eviction every 30th second
- config.timeBetweenEvictionRunsMillis = 30 * 1000L;
- config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
- config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
- pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
- new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
- }
- } else {
- pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
- if (LOG.isDebugEnabled()) {
- LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
- }
- }
-
- // setup pipeline factory
- ClientPipelineFactory factory = configuration.getClientPipelineFactory();
- if (factory != null) {
- pipelineFactory = factory.createPipelineFactory(this);
- } else {
- pipelineFactory = new DefaultClientPipelineFactory(this);
- }
-
- if (isTcp()) {
- setupTCPCommunication();
- } else {
- setupUDPCommunication();
- }
-
- if (!configuration.isLazyChannelCreation()) {
- // ensure the connection can be established when we start up
- Channel channel = pool.borrowObject();
- pool.returnObject(channel);
- }
- }
-
- @Override
- protected void doStop() throws Exception {
- LOG.debug("Stopping producer at address: {}", configuration.getAddress());
- // close all channels
- LOG.trace("Closing {} channels", ALL_CHANNELS.size());
- ChannelGroupFuture future = ALL_CHANNELS.close();
- future.awaitUninterruptibly();
-
- // and then release other resources
- if (channelFactory != null) {
- channelFactory.releaseExternalResources();
- }
-
- // and then shutdown the thread pools
- if (bossPool != null) {
- bossPool.shutdown();
- bossPool = null;
- }
- if (workerPool != null) {
- workerPool.shutdown();
- workerPool = null;
- }
-
- if (pool != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
- }
- pool.close();
- pool = null;
- }
-
- super.doStop();
- }
-
- public boolean process(final Exchange exchange, AsyncCallback callback) {
- if (!isRunAllowed()) {
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- }
- callback.done(true);
- return true;
- }
-
- Object body;
- try {
- body = getRequestBody(exchange);
- if (body == null) {
- noReplyLogger.log("No payload to send for exchange: " + exchange);
- callback.done(true);
- return true;
- }
- } catch (Exception e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
-
- // set the exchange encoding property
- if (getConfiguration().getCharsetName() != null) {
- exchange.setProperty(Exchange.CHARSET_NAME, IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
- }
-
- // get a channel from the pool
- Channel existing;
- try {
- existing = pool.borrowObject();
- if (existing != null) {
- LOG.trace("Got channel from pool {}", existing);
- }
- } catch (Exception e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
-
- // we must have a channel
- if (existing == null) {
- exchange.setException(new CamelExchangeException("Cannot get channel from pool", exchange));
- callback.done(true);
- return true;
- }
-
- // need to declare as final
- final Channel channel = existing;
- final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);
-
- // setup state as attachment on the channel, so we can access the state later when needed
- channel.setAttachment(new NettyCamelState(producerCallback, exchange));
-
- // write body
- NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener() {
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- LOG.trace("Operation complete {}", channelFuture);
- if (!channelFuture.isSuccess()) {
- // no success the set the caused exception and signal callback and break
- exchange.setException(channelFuture.getCause());
- producerCallback.done(false);
- return;
- }
-
- // if we do not expect any reply then signal callback to continue routing
- if (!configuration.isSync()) {
- try {
- // should channel be closed after complete?
- Boolean close;
- if (ExchangeHelper.isOutCapable(exchange)) {
- close = exchange.getOut().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
- } else {
- close = exchange.getIn().getHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, Boolean.class);
- }
-
- // should we disconnect, the header can override the configuration
- boolean disconnect = getConfiguration().isDisconnect();
- if (close != null) {
- disconnect = close;
- }
- if (disconnect) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closing channel when complete at address: {}", getEndpoint().getConfiguration().getAddress());
- }
- NettyHelper.close(channel);
- }
- } finally {
- // signal callback to continue routing
- producerCallback.done(false);
- }
- }
- }
- });
-
- // continue routing asynchronously
- return false;
- }
-
- /**
- * Gets the object we want to use as the request object for sending to netty.
- *
- * @param exchange the exchange
- * @return the object to use as request
- * @throws Exception is thrown if error getting the request body
- */
- protected Object getRequestBody(Exchange exchange) throws Exception {
- Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
- if (body == null) {
- return null;
- }
-
- // if textline enabled then covert to a String which must be used for textline
- if (getConfiguration().isTextline()) {
- body = NettyHelper.getTextlineBody(body, exchange, getConfiguration().getDelimiter(), getConfiguration().isAutoAppendDelimiter());
- }
-
- return body;
- }
-
- /**
- * To get the {@link NettyCamelState} from the given channel.
- */
- public NettyCamelState getState(Channel channel) {
- return (NettyCamelState) channel.getAttachment();
- }
-
- /**
- * To remove the {@link NettyCamelState} stored on the channel,
- * when no longer needed
- */
- public void removeState(Channel channel) {
- channel.setAttachment(null);
- }
-
- protected void setupTCPCommunication() throws Exception {
- if (channelFactory == null) {
- // prefer using explicit configured thread pools
- BossPool bp = configuration.getBossPool();
- WorkerPool wp = configuration.getWorkerPool();
-
- if (bp == null) {
- // create new pool which we should shutdown when stopping as its not shared
- bossPool = new NettyClientBossPoolBuilder()
- .withBossCount(configuration.getBossCount())
- .withName("NettyClientTCPBoss")
- .build();
- bp = bossPool;
- }
- if (wp == null) {
- // create new pool which we should shutdown when stopping as its not shared
- workerPool = new NettyWorkerPoolBuilder()
- .withWorkerCount(configuration.getWorkerCount())
- .withName("NettyClientTCPWorker")
- .build();
- wp = workerPool;
- }
- channelFactory = new NioClientSocketChannelFactory(bp, wp);
- }
- }
-
- protected void setupUDPCommunication() throws Exception {
- if (datagramChannelFactory == null) {
- int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
- workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
- datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
- }
- }
-
- protected ChannelFuture openConnection() throws Exception {
- ChannelFuture answer;
-
- if (isTcp()) {
- // its okay to create a new bootstrap for each new channel
- ClientBootstrap clientBootstrap = new ClientBootstrap(channelFactory);
- clientBootstrap.setOption("keepAlive", configuration.isKeepAlive());
- clientBootstrap.setOption("tcpNoDelay", configuration.isTcpNoDelay());
- clientBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
- clientBootstrap.setOption("connectTimeoutMillis", configuration.getConnectTimeout());
-
- // set any additional netty options
- if (configuration.getOptions() != null) {
- for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
- clientBootstrap.setOption(entry.getKey(), entry.getValue());
- }
- }
-
- // set the pipeline factory, which creates the pipeline for each newly created channels
- clientBootstrap.setPipelineFactory(pipelineFactory);
- answer = clientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}",
- new Object[]{configuration.getHost(), configuration.getPort(), clientBootstrap.getOptions()});
- }
- return answer;
- } else {
- // its okay to create a new bootstrap for each new channel
- ConnectionlessBootstrap connectionlessClientBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
- connectionlessClientBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- connectionlessClientBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- connectionlessClientBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- connectionlessClientBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
- connectionlessClientBootstrap.setOption("child.broadcast", configuration.isBroadcast());
- connectionlessClientBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
- connectionlessClientBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
-
- // set any additional netty options
- if (configuration.getOptions() != null) {
- for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
- connectionlessClientBootstrap.setOption(entry.getKey(), entry.getValue());
- }
- }
-
- // set the pipeline factory, which creates the pipeline for each newly created channels
- connectionlessClientBootstrap.setPipelineFactory(pipelineFactory);
- // bind and store channel so we can close it when stopping
- Channel channel = connectionlessClientBootstrap.bind(new InetSocketAddress(0));
- ALL_CHANNELS.add(channel);
- answer = connectionlessClientBootstrap.connect(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}",
- new Object[]{configuration.getHost(), configuration.getPort(), connectionlessClientBootstrap.getOptions()});
- }
- return answer;
- }
- }
-
- protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
- // blocking for channel to be done
- if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
- }
- // here we need to wait it in other thread
- final CountDownLatch channelLatch = new CountDownLatch(1);
- channelFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture cf) throws Exception {
- channelLatch.countDown();
- }
- });
-
- try {
- channelLatch.await(configuration.getConnectTimeout(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException ex) {
- throw new CamelException("Interrupted while waiting for " + "connection to "
- + configuration.getAddress());
- }
-
-
- if (!channelFuture.isDone() || !channelFuture.isSuccess()) {
- throw new CamelException("Cannot connect to " + configuration.getAddress(), channelFuture.getCause());
- }
- Channel answer = channelFuture.getChannel();
- // to keep track of all channels in use
- ALL_CHANNELS.add(answer);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating connector to address: {}", configuration.getAddress());
- }
- return answer;
- }
-
- public NettyConfiguration getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(NettyConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public ChannelFactory getChannelFactory() {
- return channelFactory;
- }
-
- public void setChannelFactory(ChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
- }
-
- public ChannelGroup getAllChannels() {
- return ALL_CHANNELS;
- }
-
- /**
- * Callback that ensures the channel is returned to the pool when we are done.
- */
- private final class NettyProducerCallback implements AsyncCallback {
-
- private final Channel channel;
- private final AsyncCallback callback;
-
- private NettyProducerCallback(Channel channel, AsyncCallback callback) {
- this.channel = channel;
- this.callback = callback;
- }
-
- @Override
- public void done(boolean doneSync) {
- // put back in pool
- try {
- LOG.trace("Putting channel back to pool {}", channel);
- pool.returnObject(channel);
- } catch (Exception e) {
- LOG.warn("Error returning channel to pool {}. This exception will be ignored.", channel);
- } finally {
- // ensure we call the delegated callback
- callback.done(doneSync);
- }
- }
- }
-
- /**
- * Object factory to create {@link Channel} used by the pool.
- */
- private final class NettyProducerPoolableObjectFactory implements PoolableObjectFactory<Channel> {
-
- @Override
- public Channel makeObject() throws Exception {
- ChannelFuture channelFuture = openConnection();
- Channel answer = openChannel(channelFuture);
- LOG.trace("Created channel: {}", answer);
- return answer;
- }
-
- @Override
- public void destroyObject(Channel channel) throws Exception {
- LOG.trace("Destroying channel: {}", channel);
- NettyHelper.close(channel);
- ALL_CHANNELS.remove(channel);
- }
-
- @Override
- public boolean validateObject(Channel channel) {
- // we need a connected channel to be valid
- boolean answer = channel.isConnected();
- LOG.trace("Validating channel: {} -> {}", channel, answer);
- return answer;
- }
-
- @Override
- public void activateObject(Channel channel) throws Exception {
- // noop
- }
-
- @Override
- public void passivateObject(Channel channel) throws Exception {
- // noop
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
deleted file mode 100644
index 22effc8..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapConfiguration.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.io.File;
-import java.util.Map;
-
-import org.apache.camel.util.jsse.SSLContextParameters;
-import org.jboss.netty.channel.socket.nio.BossPool;
-import org.jboss.netty.channel.socket.nio.WorkerPool;
-import org.jboss.netty.handler.ssl.SslHandler;
-
-public class NettyServerBootstrapConfiguration implements Cloneable {
-
- protected String protocol;
- protected String host;
- protected int port;
- protected boolean broadcast;
- protected long sendBufferSize = 65536;
- protected long receiveBufferSize = 65536;
- protected int receiveBufferSizePredictor;
- protected int bossCount = 1;
- protected int workerCount;
- protected boolean keepAlive = true;
- protected boolean tcpNoDelay = true;
- protected boolean reuseAddress = true;
- protected long connectTimeout = 10000;
- protected int backlog;
- protected ServerPipelineFactory serverPipelineFactory;
- protected NettyServerBootstrapFactory nettyServerBootstrapFactory;
- protected Map<String, Object> options;
- // SSL options is also part of the server bootstrap as the server listener on port X is either plain or SSL
- protected boolean ssl;
- protected boolean sslClientCertHeaders;
- protected SslHandler sslHandler;
- protected SSLContextParameters sslContextParameters;
- protected boolean needClientAuth;
- protected File keyStoreFile;
- protected File trustStoreFile;
- protected String keyStoreResource;
- protected String trustStoreResource;
- protected String keyStoreFormat;
- protected String securityProvider;
- protected String passphrase;
- protected BossPool bossPool;
- protected WorkerPool workerPool;
- protected String networkInterface;
-
- public String getAddress() {
- return host + ":" + port;
- }
-
- public boolean isTcp() {
- return protocol.equalsIgnoreCase("tcp");
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public void setProtocol(String protocol) {
- this.protocol = protocol;
- }
-
- public String getHost() {
- return host;
- }
-
- public void setHost(String host) {
- this.host = host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public boolean isBroadcast() {
- return broadcast;
- }
-
- public void setBroadcast(boolean broadcast) {
- this.broadcast = broadcast;
- }
-
- public long getSendBufferSize() {
- return sendBufferSize;
- }
-
- public void setSendBufferSize(long sendBufferSize) {
- this.sendBufferSize = sendBufferSize;
- }
-
- public long getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- public void setReceiveBufferSize(long receiveBufferSize) {
- this.receiveBufferSize = receiveBufferSize;
- }
-
- public int getReceiveBufferSizePredictor() {
- return receiveBufferSizePredictor;
- }
-
- public void setReceiveBufferSizePredictor(int receiveBufferSizePredictor) {
- this.receiveBufferSizePredictor = receiveBufferSizePredictor;
- }
-
- public int getWorkerCount() {
- return workerCount;
- }
-
- public void setWorkerCount(int workerCount) {
- this.workerCount = workerCount;
- }
-
- public int getBossCount() {
- return bossCount;
- }
-
- public void setBossCount(int bossCount) {
- this.bossCount = bossCount;
- }
-
- public boolean isKeepAlive() {
- return keepAlive;
- }
-
- public void setKeepAlive(boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public boolean isReuseAddress() {
- return reuseAddress;
- }
-
- public void setReuseAddress(boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
-
- public long getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(long connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-
- public int getBacklog() {
- return backlog;
- }
-
- public void setBacklog(int backlog) {
- this.backlog = backlog;
- }
-
- public boolean isSsl() {
- return ssl;
- }
-
- public void setSsl(boolean ssl) {
- this.ssl = ssl;
- }
-
- public boolean isSslClientCertHeaders() {
- return sslClientCertHeaders;
- }
-
- public void setSslClientCertHeaders(boolean sslClientCertHeaders) {
- this.sslClientCertHeaders = sslClientCertHeaders;
- }
-
- public SslHandler getSslHandler() {
- return sslHandler;
- }
-
- public void setSslHandler(SslHandler sslHandler) {
- this.sslHandler = sslHandler;
- }
-
- public SSLContextParameters getSslContextParameters() {
- return sslContextParameters;
- }
-
- public void setSslContextParameters(SSLContextParameters sslContextParameters) {
- this.sslContextParameters = sslContextParameters;
- }
-
- public boolean isNeedClientAuth() {
- return needClientAuth;
- }
-
- public void setNeedClientAuth(boolean needClientAuth) {
- this.needClientAuth = needClientAuth;
- }
-
- @Deprecated
- public File getKeyStoreFile() {
- return keyStoreFile;
- }
-
- @Deprecated
- public void setKeyStoreFile(File keyStoreFile) {
- this.keyStoreFile = keyStoreFile;
- }
-
- @Deprecated
- public File getTrustStoreFile() {
- return trustStoreFile;
- }
-
- @Deprecated
- public void setTrustStoreFile(File trustStoreFile) {
- this.trustStoreFile = trustStoreFile;
- }
-
- public String getKeyStoreResource() {
- return keyStoreResource;
- }
-
- public void setKeyStoreResource(String keyStoreResource) {
- this.keyStoreResource = keyStoreResource;
- }
-
- public String getTrustStoreResource() {
- return trustStoreResource;
- }
-
- public void setTrustStoreResource(String trustStoreResource) {
- this.trustStoreResource = trustStoreResource;
- }
-
- public String getKeyStoreFormat() {
- return keyStoreFormat;
- }
-
- public void setKeyStoreFormat(String keyStoreFormat) {
- this.keyStoreFormat = keyStoreFormat;
- }
-
- public String getSecurityProvider() {
- return securityProvider;
- }
-
- public void setSecurityProvider(String securityProvider) {
- this.securityProvider = securityProvider;
- }
-
- public String getPassphrase() {
- return passphrase;
- }
-
- public void setPassphrase(String passphrase) {
- this.passphrase = passphrase;
- }
-
- public ServerPipelineFactory getServerPipelineFactory() {
- return serverPipelineFactory;
- }
-
- public void setServerPipelineFactory(ServerPipelineFactory serverPipelineFactory) {
- this.serverPipelineFactory = serverPipelineFactory;
- }
-
- public NettyServerBootstrapFactory getNettyServerBootstrapFactory() {
- return nettyServerBootstrapFactory;
- }
-
- public void setNettyServerBootstrapFactory(NettyServerBootstrapFactory nettyServerBootstrapFactory) {
- this.nettyServerBootstrapFactory = nettyServerBootstrapFactory;
- }
-
- public Map<String, Object> getOptions() {
- return options;
- }
-
- public void setOptions(Map<String, Object> options) {
- this.options = options;
- }
-
- public BossPool getBossPool() {
- return bossPool;
- }
-
- public void setBossPool(BossPool bossPool) {
- this.bossPool = bossPool;
- }
-
- public WorkerPool getWorkerPool() {
- return workerPool;
- }
-
- public void setWorkerPool(WorkerPool workerPool) {
- this.workerPool = workerPool;
- }
-
- public String getNetworkInterface() {
- return networkInterface;
- }
-
- public void setNetworkInterface(String networkInterface) {
- this.networkInterface = networkInterface;
- }
-
- /**
- * Checks if the other {@link NettyServerBootstrapConfiguration} is compatible
- * with this, as a Netty listener bound on port X shares the same common
- * {@link NettyServerBootstrapConfiguration}, which must be identical.
- */
- public boolean compatible(NettyServerBootstrapConfiguration other) {
- boolean isCompatible = true;
-
- if (!protocol.equals(other.protocol)) {
- isCompatible = false;
- } else if (!host.equals(other.host)) {
- isCompatible = false;
- } else if (port != other.port) {
- isCompatible = false;
- } else if (broadcast != other.broadcast) {
- isCompatible = false;
- } else if (sendBufferSize != other.sendBufferSize) {
- return false;
- } else if (receiveBufferSize != other.receiveBufferSize) {
- isCompatible = false;
- } else if (receiveBufferSizePredictor != other.receiveBufferSizePredictor) {
- isCompatible = false;
- } else if (workerCount != other.workerCount) {
- isCompatible = false;
- } else if (bossCount != other.bossCount) {
- isCompatible = false;
- } else if (keepAlive != other.keepAlive) {
- isCompatible = false;
- } else if (tcpNoDelay != other.tcpNoDelay) {
- isCompatible = false;
- } else if (reuseAddress != other.reuseAddress) {
- isCompatible = false;
- } else if (connectTimeout != other.connectTimeout) {
- isCompatible = false;
- } else if (backlog != other.backlog) {
- isCompatible = false;
- } else if (serverPipelineFactory != other.serverPipelineFactory) {
- isCompatible = false;
- } else if (nettyServerBootstrapFactory != other.nettyServerBootstrapFactory) {
- isCompatible = false;
- } else if (options == null && other.options != null) {
- // validate all the options is identical
- isCompatible = false;
- } else if (options != null && other.options == null) {
- isCompatible = false;
- } else if (options != null && other.options != null && options.size() != other.options.size()) {
- isCompatible = false;
- } else if (options != null && other.options != null && !options.keySet().containsAll(other.options.keySet())) {
- isCompatible = false;
- } else if (options != null && other.options != null && !options.values().containsAll(other.options.values())) {
- isCompatible = false;
- } else if (ssl != other.ssl) {
- isCompatible = false;
- } else if (sslHandler != other.sslHandler) {
- isCompatible = false;
- } else if (sslContextParameters != other.sslContextParameters) {
- isCompatible = false;
- } else if (needClientAuth != other.needClientAuth) {
- isCompatible = false;
- } else if (keyStoreFile != other.keyStoreFile) {
- isCompatible = false;
- } else if (trustStoreFile != other.trustStoreFile) {
- isCompatible = false;
- } else if (keyStoreResource != null && !keyStoreResource.equals(other.keyStoreResource)) {
- isCompatible = false;
- } else if (trustStoreResource != null && !trustStoreResource.equals(other.trustStoreResource)) {
- isCompatible = false;
- } else if (keyStoreFormat != null && !keyStoreFormat.equals(other.keyStoreFormat)) {
- isCompatible = false;
- } else if (securityProvider != null && !securityProvider.equals(other.securityProvider)) {
- isCompatible = false;
- } else if (passphrase != null && !passphrase.equals(other.passphrase)) {
- isCompatible = false;
- } else if (bossPool != other.bossPool) {
- isCompatible = false;
- } else if (workerPool != other.workerPool) {
- isCompatible = false;
- } else if (networkInterface != null && !networkInterface.equals(other.networkInterface)) {
- isCompatible = false;
- }
-
- return isCompatible;
- }
-
- public String toStringBootstrapConfiguration() {
- return "NettyServerBootstrapConfiguration{"
- + "protocol='" + protocol + '\''
- + ", host='" + host + '\''
- + ", port=" + port
- + ", broadcast=" + broadcast
- + ", sendBufferSize=" + sendBufferSize
- + ", receiveBufferSize=" + receiveBufferSize
- + ", receiveBufferSizePredictor=" + receiveBufferSizePredictor
- + ", workerCount=" + workerCount
- + ", bossCount=" + bossCount
- + ", keepAlive=" + keepAlive
- + ", tcpNoDelay=" + tcpNoDelay
- + ", reuseAddress=" + reuseAddress
- + ", connectTimeout=" + connectTimeout
- + ", backlog=" + backlog
- + ", serverPipelineFactory=" + serverPipelineFactory
- + ", nettyServerBootstrapFactory=" + nettyServerBootstrapFactory
- + ", options=" + options
- + ", ssl=" + ssl
- + ", sslHandler=" + sslHandler
- + ", sslContextParameters='" + sslContextParameters + '\''
- + ", needClientAuth=" + needClientAuth
- + ", keyStoreFile=" + keyStoreFile
- + ", trustStoreFile=" + trustStoreFile
- + ", keyStoreResource='" + keyStoreResource + '\''
- + ", trustStoreResource='" + trustStoreResource + '\''
- + ", keyStoreFormat='" + keyStoreFormat + '\''
- + ", securityProvider='" + securityProvider + '\''
- + ", passphrase='" + passphrase + '\''
- + ", bossPool=" + bossPool
- + ", workerPool=" + workerPool
- + ", networkInterface='" + networkInterface + '\''
- + '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
deleted file mode 100644
index 250366f..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBootstrapFactory.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.util.concurrent.ThreadFactory;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Service;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-
-/**
- * Factory for setting up Netty {@link org.jboss.netty.bootstrap.ServerBootstrap} and all
- * the needed logic for doing that.
- * <p/>
- * This factory allows for consumers to reuse existing {@link org.jboss.netty.bootstrap.ServerBootstrap} which
- * allows to share the same port for multiple consumers.
- */
-public interface NettyServerBootstrapFactory extends Service {
-
- /**
- * Initializes this <b>non-shared</b> {@link NettyServerBootstrapFactory}.
- *
- * @param camelContext the {@link CamelContext} for non-shared bootstrap factory
- * @param configuration the bootstrap configuration
- * @param pipelineFactory the pipeline factory
- */
- void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
-
- /**
- * Initializes this <b>shared</b> {@link NettyServerBootstrapFactory}.
- *
- * @param threadFactory the thread factory to use for shared bootstrap factory
- * @param configuration the bootstrap configuration
- * @param pipelineFactory the pipeline factory
- */
- void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory);
-
- /**
- * When a new {@link Channel} is opened.
- */
- void addChannel(Channel channel);
-
- /**
- * When a {@link Channel} is closed.
- */
- void removeChannel(Channel channel);
-
- /**
- * When a {@link NettyConsumer} is added and uses this bootstrap factory.
- */
- void addConsumer(NettyConsumer consumer);
-
- /**
- * When a {@link NettyConsumer} is removed and no longer using this bootstrap factory.
- */
- void removeConsumer(NettyConsumer consumer);
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
deleted file mode 100644
index c90565f..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyServerBossPoolBuilder.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.channel.socket.nio.BossPool;
-import org.jboss.netty.channel.socket.nio.NioServerBossPool;
-
-/**
- * A builder to create Netty {@link org.jboss.netty.channel.socket.nio.BossPool} which can be used for sharing boos pools
- * with multiple Netty {@link org.apache.camel.component.netty.NettyServerBootstrapFactory} server bootstrap configurations.
- */
-public final class NettyServerBossPoolBuilder {
-
- private String name = "NettyServerBoss";
- private String pattern;
- private int bossCount = 1;
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setPattern(String pattern) {
- this.pattern = pattern;
- }
-
- public void setBossCount(int bossCount) {
- this.bossCount = bossCount;
- }
-
- public NettyServerBossPoolBuilder withName(String name) {
- setName(name);
- return this;
- }
-
- public NettyServerBossPoolBuilder withPattern(String pattern) {
- setPattern(pattern);
- return this;
- }
-
- public NettyServerBossPoolBuilder withBossCount(int bossCount) {
- setBossCount(bossCount);
- return this;
- }
-
- /**
- * Creates a new boss pool.
- */
- BossPool build() {
- return new NioServerBossPool(Executors.newCachedThreadPool(), bossCount, new CamelNettyThreadNameDeterminer(pattern, name));
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
deleted file mode 100644
index 0179d2b..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyWorkerPoolBuilder.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.channel.socket.nio.WorkerPool;
-
-/**
- * A builder to create Netty {@link WorkerPool} which can be used for sharing worker pools
- * with multiple Netty {@link NettyServerBootstrapFactory} server bootstrap configurations.
- */
-public final class NettyWorkerPoolBuilder {
-
- private String name = "NettyWorker";
- private String pattern;
- private int workerCount;
- private volatile WorkerPool workerPool;
-
- public void setName(String name) {
- this.name = name;
- }
-
- public void setPattern(String pattern) {
- this.pattern = pattern;
- }
-
- public void setWorkerCount(int workerCount) {
- this.workerCount = workerCount;
- }
-
- public NettyWorkerPoolBuilder withName(String name) {
- setName(name);
- return this;
- }
-
- public NettyWorkerPoolBuilder withPattern(String pattern) {
- setPattern(pattern);
- return this;
- }
-
- public NettyWorkerPoolBuilder withWorkerCount(int workerCount) {
- setWorkerCount(workerCount);
- return this;
- }
-
- /**
- * Creates a new worker pool.
- */
- WorkerPool build() {
- int count = workerCount > 0 ? workerCount : NettyHelper.DEFAULT_IO_THREADS;
- workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), count, new CamelNettyThreadNameDeterminer(pattern, name));
- return workerPool;
- }
-
- /**
- * Shutdown the created worker pool
- */
- public void destroy() {
- if (workerPool != null) {
- workerPool.shutdown();
- workerPool = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
deleted file mode 100644
index e8ffe11..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ServerPipelineFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-
-/**
- * Factory to create {@link ChannelPipeline} for clients, eg {@link NettyConsumer}.
- * <p/>
- * Implementators must support creating a new instance of this factory which is associated
- * to the given {@link NettyConsumer} using the {@link #createPipelineFactory(NettyConsumer)}
- * method.
- *
- * @see ChannelPipelineFactory
- */
-public abstract class ServerPipelineFactory implements ChannelPipelineFactory {
-
- /**
- * Creates a new {@link ClientPipelineFactory} using the given {@link NettyConsumer}
- *
- * @param consumer the associated consumer
- * @return the {@link ClientPipelineFactory} associated to ghe given consumer.
- */
- public abstract ServerPipelineFactory createPipelineFactory(NettyConsumer consumer);
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
deleted file mode 100644
index 68f3e40..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ShareableChannelHandlerFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import org.jboss.netty.channel.ChannelHandler;
-
-/**
- * A {@link ChannelHandlerFactory} returning a shareable {@link ChannelHandler}.
- */
-public class ShareableChannelHandlerFactory implements ChannelHandlerFactory {
-
- private final ChannelHandler channelHandler;
-
- public ShareableChannelHandlerFactory(ChannelHandler channelHandler) {
- this.channelHandler = channelHandler;
- }
-
- @Override
- public ChannelHandler newChannelHandler() {
- return channelHandler;
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
deleted file mode 100644
index 9121838..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SharedSingletonObjectPool.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.util.NoSuchElementException;
-
-import org.apache.commons.pool.ObjectPool;
-import org.apache.commons.pool.PoolableObjectFactory;
-
-/**
- * An {@link org.apache.commons.pool.ObjectPool} that uses a single shared instance.
- * <p/>
- * This implementation will always return <tt>1</tt> in {@link #getNumActive()} and
- * return <tt>0</tt> in {@link #getNumIdle()}.
- */
-public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
-
- private final PoolableObjectFactory<T> factory;
- private volatile T t;
-
- public SharedSingletonObjectPool(PoolableObjectFactory<T> factory) {
- this.factory = factory;
- }
-
- @Override
- public synchronized T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
- if (t == null) {
- t = factory.makeObject();
- }
- return t;
- }
-
- @Override
- public void returnObject(T obj) throws Exception {
- // noop
- }
-
- @Override
- public void invalidateObject(T obj) throws Exception {
- t = null;
- }
-
- @Override
- public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
- // noop
- }
-
- @Override
- public int getNumIdle() throws UnsupportedOperationException {
- return 0;
- }
-
- @Override
- public int getNumActive() throws UnsupportedOperationException {
- return 1;
- }
-
- @Override
- public void clear() throws Exception, UnsupportedOperationException {
- t = null;
- }
-
- @Override
- public void close() throws Exception {
- t = null;
- }
-
- @Override
- public void setFactory(PoolableObjectFactory<T> factory) throws IllegalStateException, UnsupportedOperationException {
- // noop
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
deleted file mode 100644
index 9806a38..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleTCPNettyServerBootstrapFactory.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ThreadFactory;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.support.ServiceSupport;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.BossPool;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.WorkerPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
- */
-public class SingleTCPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
-
- protected static final Logger LOG = LoggerFactory.getLogger(SingleTCPNettyServerBootstrapFactory.class);
- private final ChannelGroup allChannels;
- private CamelContext camelContext;
- private ThreadFactory threadFactory;
- private NettyServerBootstrapConfiguration configuration;
- private ChannelPipelineFactory pipelineFactory;
- private ChannelFactory channelFactory;
- private ServerBootstrap serverBootstrap;
- private Channel channel;
- private BossPool bossPool;
- private WorkerPool workerPool;
-
- public SingleTCPNettyServerBootstrapFactory() {
- this.allChannels = new DefaultChannelGroup(SingleTCPNettyServerBootstrapFactory.class.getName());
- }
-
- public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
- this.camelContext = camelContext;
- this.configuration = configuration;
- this.pipelineFactory = pipelineFactory;
- }
-
- public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
- this.threadFactory = threadFactory;
- this.configuration = configuration;
- this.pipelineFactory = pipelineFactory;
- }
-
- public void addChannel(Channel channel) {
- allChannels.add(channel);
- }
-
- public void removeChannel(Channel channel) {
- allChannels.remove(channel);
- }
-
- public void addConsumer(NettyConsumer consumer) {
- // does not allow sharing
- }
-
- public void removeConsumer(NettyConsumer consumer) {
- // does not allow sharing
- }
-
- @Override
- protected void doStart() throws Exception {
- if (camelContext == null && threadFactory == null) {
- throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
- }
- startServerBootstrap();
- }
-
- @Override
- protected void doStop() throws Exception {
- stopServerBootstrap();
- }
-
- protected void startServerBootstrap() {
- // prefer using explicit configured thread pools
- BossPool bp = configuration.getBossPool();
- WorkerPool wp = configuration.getWorkerPool();
-
- if (bp == null) {
- // create new pool which we should shutdown when stopping as its not shared
- bossPool = new NettyServerBossPoolBuilder()
- .withBossCount(configuration.getBossCount())
- .withName("NettyServerTCPBoss")
- .build();
- bp = bossPool;
- }
- if (wp == null) {
- // create new pool which we should shutdown when stopping as its not shared
- workerPool = new NettyWorkerPoolBuilder()
- .withWorkerCount(configuration.getWorkerCount())
- .withName("NettyServerTCPWorker")
- .build();
- wp = workerPool;
- }
-
- channelFactory = new NioServerSocketChannelFactory(bp, wp);
-
- serverBootstrap = new ServerBootstrap(channelFactory);
- serverBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- serverBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- serverBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
- serverBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- serverBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
- if (configuration.getBacklog() > 0) {
- serverBootstrap.setOption("backlog", configuration.getBacklog());
- }
-
- // set any additional netty options
- if (configuration.getOptions() != null) {
- for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
- serverBootstrap.setOption(entry.getKey(), entry.getValue());
- }
- }
-
- LOG.debug("Created ServerBootstrap {} with options: {}", serverBootstrap, serverBootstrap.getOptions());
-
- // set the pipeline factory, which creates the pipeline for each newly created channels
- serverBootstrap.setPipelineFactory(pipelineFactory);
-
- LOG.info("ServerBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
- channel = serverBootstrap.bind(new InetSocketAddress(configuration.getHost(), configuration.getPort()));
- // to keep track of all channels in use
- allChannels.add(channel);
- }
-
- protected void stopServerBootstrap() {
- // close all channels
- LOG.info("ServerBootstrap unbinding from {}:{}", configuration.getHost(), configuration.getPort());
-
- LOG.trace("Closing {} channels", allChannels.size());
- ChannelGroupFuture future = allChannels.close();
- future.awaitUninterruptibly();
-
- // close server external resources
- if (channelFactory != null) {
- channelFactory.releaseExternalResources();
- channelFactory = null;
- }
-
- // and then shutdown the thread pools
- if (bossPool != null) {
- bossPool.shutdown();
- bossPool = null;
- }
- if (workerPool != null) {
- workerPool.shutdown();
- workerPool = null;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/ed07738a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
deleted file mode 100644
index 1a8d984..0000000
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/SingleUDPNettyServerBootstrapFactory.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.netty4;
-
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.support.ServiceSupport;
-import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.DatagramChannel;
-import org.jboss.netty.channel.socket.DatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioDatagramWorkerPool;
-import org.jboss.netty.channel.socket.nio.WorkerPool;
-import org.jboss.netty.handler.ipfilter.IpV4Subnet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link NettyServerBootstrapFactory} which is used by a single consumer (not shared).
- */
-public class SingleUDPNettyServerBootstrapFactory extends ServiceSupport implements NettyServerBootstrapFactory {
-
- protected static final Logger LOG = LoggerFactory.getLogger(SingleUDPNettyServerBootstrapFactory.class);
- private static final String LOOPBACK_INTERFACE = "lo";
- private static final String MULTICAST_SUBNET = "224.0.0.0/4";
- private final ChannelGroup allChannels;
- private CamelContext camelContext;
- private ThreadFactory threadFactory;
- private NettyServerBootstrapConfiguration configuration;
- private ChannelPipelineFactory pipelineFactory;
- private DatagramChannelFactory datagramChannelFactory;
- private ConnectionlessBootstrap connectionlessBootstrap;
- private WorkerPool workerPool;
-
- public SingleUDPNettyServerBootstrapFactory() {
- this.allChannels = new DefaultChannelGroup(SingleUDPNettyServerBootstrapFactory.class.getName());
- }
-
- public void init(CamelContext camelContext, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
- this.camelContext = camelContext;
- this.configuration = configuration;
- this.pipelineFactory = pipelineFactory;
- }
-
- public void init(ThreadFactory threadFactory, NettyServerBootstrapConfiguration configuration, ChannelPipelineFactory pipelineFactory) {
- this.threadFactory = threadFactory;
- this.configuration = configuration;
- this.pipelineFactory = pipelineFactory;
- }
-
- public void addChannel(Channel channel) {
- allChannels.add(channel);
- }
-
- public void removeChannel(Channel channel) {
- allChannels.remove(channel);
- }
-
- public void addConsumer(NettyConsumer consumer) {
- // does not allow sharing
- }
-
- public void removeConsumer(NettyConsumer consumer) {
- // does not allow sharing
- }
-
- @Override
- protected void doStart() throws Exception {
- if (camelContext == null && threadFactory == null) {
- throw new IllegalArgumentException("Either CamelContext or ThreadFactory must be set on " + this);
- }
- startServerBootstrap();
- }
-
- @Override
- protected void doStop() throws Exception {
- stopServerBootstrap();
- }
-
- protected void startServerBootstrap() throws UnknownHostException, SocketException {
- // create non-shared worker pool
- int count = configuration.getWorkerCount() > 0 ? configuration.getWorkerCount() : NettyHelper.DEFAULT_IO_THREADS;
- workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), count);
-
- datagramChannelFactory = new NioDatagramChannelFactory(workerPool);
-
- connectionlessBootstrap = new ConnectionlessBootstrap(datagramChannelFactory);
- connectionlessBootstrap.setOption("child.keepAlive", configuration.isKeepAlive());
- connectionlessBootstrap.setOption("child.tcpNoDelay", configuration.isTcpNoDelay());
- connectionlessBootstrap.setOption("reuseAddress", configuration.isReuseAddress());
- connectionlessBootstrap.setOption("child.reuseAddress", configuration.isReuseAddress());
- connectionlessBootstrap.setOption("child.connectTimeoutMillis", configuration.getConnectTimeout());
- connectionlessBootstrap.setOption("child.broadcast", configuration.isBroadcast());
- connectionlessBootstrap.setOption("sendBufferSize", configuration.getSendBufferSize());
- connectionlessBootstrap.setOption("receiveBufferSize", configuration.getReceiveBufferSize());
- // only set this if user has specified
- if (configuration.getReceiveBufferSizePredictor() > 0) {
- connectionlessBootstrap.setOption("receiveBufferSizePredictorFactory",
- new FixedReceiveBufferSizePredictorFactory(configuration.getReceiveBufferSizePredictor()));
- }
- if (configuration.getBacklog() > 0) {
- connectionlessBootstrap.setOption("backlog", configuration.getBacklog());
- }
-
- // set any additional netty options
- if (configuration.getOptions() != null) {
- for (Map.Entry<String, Object> entry : configuration.getOptions().entrySet()) {
- connectionlessBootstrap.setOption(entry.getKey(), entry.getValue());
- }
- }
-
- LOG.debug("Created ConnectionlessBootstrap {} with options: {}", connectionlessBootstrap, connectionlessBootstrap.getOptions());
-
- // set the pipeline factory, which creates the pipeline for each newly created channels
- connectionlessBootstrap.setPipelineFactory(pipelineFactory);
-
- InetSocketAddress hostAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort());
- IpV4Subnet multicastSubnet = new IpV4Subnet(MULTICAST_SUBNET);
-
- if (multicastSubnet.contains(configuration.getHost())) {
- DatagramChannel channel = (DatagramChannel)connectionlessBootstrap.bind(hostAddress);
- String networkInterface = configuration.getNetworkInterface() == null ? LOOPBACK_INTERFACE : configuration.getNetworkInterface();
- NetworkInterface multicastNetworkInterface = NetworkInterface.getByName(networkInterface);
- LOG.info("ConnectionlessBootstrap joining {}:{} using network interface: {}", new Object[]{configuration.getHost(), configuration.getPort(), multicastNetworkInterface.getName()});
- channel.joinGroup(hostAddress, multicastNetworkInterface);
- allChannels.add(channel);
- } else {
- LOG.info("ConnectionlessBootstrap binding to {}:{}", configuration.getHost(), configuration.getPort());
- Channel channel = connectionlessBootstrap.bind(hostAddress);
- allChannels.add(channel);
- }
- }
-
- protected void stopServerBootstrap() {
- // close all channels
- LOG.info("ConnectionlessBootstrap disconnecting from {}:{}", configuration.getHost(), configuration.getPort());
-
- LOG.trace("Closing {} channels", allChannels.size());
- ChannelGroupFuture future = allChannels.close();
- future.awaitUninterruptibly();
-
- // close server external resources
- if (datagramChannelFactory != null) {
- datagramChannelFactory.releaseExternalResources();
- datagramChannelFactory = null;
- }
-
- // and then shutdown the thread pools
- if (workerPool != null) {
- workerPool.shutdown();
- workerPool = null;
- }
- }
-
-}