You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/20 11:48:54 UTC

svn commit: r1424399 - in /camel/branches/camel-2.10.x: ./ components/camel-netty/src/main/java/org/apache/camel/component/netty/ components/camel-netty/src/test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Thu Dec 20 10:48:54 2012
New Revision: 1424399

URL: http://svn.apache.org/viewvc?rev=1424399&view=rev
Log:
CAMEL-5901: Allow to turn on|off netty producer pool.

Added:
    camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
      - copied unchanged from r1424398, camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
    camel/branches/camel-2.10.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java
      - copied unchanged from r1424398, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1424398

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1424399&r1=1424398&r2=1424399&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu Dec 20 10:48:54 2012
@@ -83,9 +83,10 @@ public class NettyConfiguration implemen
     private int producerPoolMinIdle;
     private int producerPoolMaxIdle = 100;
     private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
+    private boolean producerPoolEnabled = true;
     private int backlog;
     private Map<String, Object> options;
-    
+
     /**
      * Returns a copy of this configuration
      */
@@ -582,6 +583,14 @@ public class NettyConfiguration implemen
         this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle;
     }
 
+    public boolean isProducerPoolEnabled() {
+        return producerPoolEnabled;
+    }
+
+    public void setProducerPoolEnabled(boolean producerPoolEnabled) {
+        this.producerPoolEnabled = producerPoolEnabled;
+    }
+
     public int getBacklog() {
         return backlog;
     }

Modified: camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424399&r1=1424398&r2=1424399&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:48:54 2012
@@ -91,20 +91,32 @@ public class NettyProducer extends Defau
     protected void doStart() throws Exception {
         super.doStart();
 
-        // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
-        GenericObjectPool.Config config = new GenericObjectPool.Config();
-        config.maxActive = configuration.getProducerPoolMaxActive();
-        config.minIdle = configuration.getProducerPoolMinIdle();
-        config.maxIdle = configuration.getProducerPoolMaxIdle();
-        // we should test on borrow to ensure the channel is still valid
-        config.testOnBorrow = true;
-        // only evict channels which are no longer valid
-        config.testWhileIdle = true;
-        // run eviction every 30th second
-        config.timeBetweenEvictionRunsMillis = 30 * 1000L;
-        config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
-        config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
-        pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+        if (configuration.isProducerPoolEnabled()) {
+            // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
+            GenericObjectPool.Config config = new GenericObjectPool.Config();
+            config.maxActive = configuration.getProducerPoolMaxActive();
+            config.minIdle = configuration.getProducerPoolMinIdle();
+            config.maxIdle = configuration.getProducerPoolMaxIdle();
+            // we should test on borrow to ensure the channel is still valid
+            config.testOnBorrow = true;
+            // only evict channels which are no longer valid
+            config.testWhileIdle = true;
+            // run eviction every 30th second
+            config.timeBetweenEvictionRunsMillis = 30 * 1000L;
+            config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
+            config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
+            pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
+                        new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
+            }
+        } else {
+            pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
+            }
+        }
 
         // setup pipeline factory
         ClientPipelineFactory factory = configuration.getClientPipelineFactory();
@@ -122,7 +134,8 @@ public class NettyProducer extends Defau
 
         if (!configuration.isLazyChannelCreation()) {
             // ensure the connection can be established when we start up
-            openAndCloseConnection();
+            Channel channel = pool.borrowObject();
+            pool.returnObject(channel);
         }
     }
 
@@ -147,10 +160,13 @@ public class NettyProducer extends Defau
             context.getExecutorServiceManager().shutdownNow(workerExecutor);
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+        if (pool != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+            }
+            pool.close();
+            pool = null;
         }
-        pool.close();
 
         super.doStop();
     }
@@ -360,7 +376,7 @@ public class NettyProducer extends Defau
         }
     }
 
-    private Channel openChannel(ChannelFuture channelFuture) throws Exception {
+    protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
         // blocking for channel to be done
         if (LOG.isTraceEnabled()) {
             LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
@@ -380,13 +396,6 @@ public class NettyProducer extends Defau
         return answer;
     }
 
-    private void openAndCloseConnection() throws Exception {
-        ChannelFuture future = openConnection();
-        Channel channel = openChannel(future);
-        NettyHelper.close(channel);
-        ALL_CHANNELS.remove(channel);
-    }
-
     public NettyConfiguration getConfiguration() {
         return configuration;
     }