You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/20 11:48:02 UTC

svn commit: r1424398 - in /camel/trunk/components/camel-netty/src: main/java/org/apache/camel/component/netty/ test/java/org/apache/camel/component/netty/

Author: davsclaus
Date: Thu Dec 20 10:48:02 2012
New Revision: 1424398

URL: http://svn.apache.org/viewvc?rev=1424398&view=rev
Log:
CAMEL-5901: Allow to turn on|off netty producer pool.

Added:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java   (with props)
    camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java
      - copied, changed from r1424372, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
Modified:
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
    camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1424398&r1=1424397&r2=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu Dec 20 10:48:02 2012
@@ -83,9 +83,10 @@ public class NettyConfiguration implemen
     private int producerPoolMinIdle;
     private int producerPoolMaxIdle = 100;
     private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
+    private boolean producerPoolEnabled = true;
     private int backlog;
     private Map<String, Object> options;
-    
+
     /**
      * Returns a copy of this configuration
      */
@@ -582,6 +583,14 @@ public class NettyConfiguration implemen
         this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle;
     }
 
+    public boolean isProducerPoolEnabled() {
+        return producerPoolEnabled;
+    }
+
+    public void setProducerPoolEnabled(boolean producerPoolEnabled) {
+        this.producerPoolEnabled = producerPoolEnabled;
+    }
+
     public int getBacklog() {
         return backlog;
     }

Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424398&r1=1424397&r2=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:48:02 2012
@@ -91,20 +91,32 @@ public class NettyProducer extends Defau
     protected void doStart() throws Exception {
         super.doStart();
 
-        // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
-        GenericObjectPool.Config config = new GenericObjectPool.Config();
-        config.maxActive = configuration.getProducerPoolMaxActive();
-        config.minIdle = configuration.getProducerPoolMinIdle();
-        config.maxIdle = configuration.getProducerPoolMaxIdle();
-        // we should test on borrow to ensure the channel is still valid
-        config.testOnBorrow = true;
-        // only evict channels which are no longer valid
-        config.testWhileIdle = true;
-        // run eviction every 30th second
-        config.timeBetweenEvictionRunsMillis = 30 * 1000L;
-        config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
-        config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
-        pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+        if (configuration.isProducerPoolEnabled()) {
+            // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
+            GenericObjectPool.Config config = new GenericObjectPool.Config();
+            config.maxActive = configuration.getProducerPoolMaxActive();
+            config.minIdle = configuration.getProducerPoolMinIdle();
+            config.maxIdle = configuration.getProducerPoolMaxIdle();
+            // we should test on borrow to ensure the channel is still valid
+            config.testOnBorrow = true;
+            // only evict channels which are no longer valid
+            config.testWhileIdle = true;
+            // run eviction every 30th second
+            config.timeBetweenEvictionRunsMillis = 30 * 1000L;
+            config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
+            config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
+            pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
+                        new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
+            }
+        } else {
+            pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
+            }
+        }
 
         // setup pipeline factory
         ClientPipelineFactory factory = configuration.getClientPipelineFactory();
@@ -122,7 +134,8 @@ public class NettyProducer extends Defau
 
         if (!configuration.isLazyChannelCreation()) {
             // ensure the connection can be established when we start up
-            openAndCloseConnection();
+            Channel channel = pool.borrowObject();
+            pool.returnObject(channel);
         }
     }
 
@@ -149,10 +162,13 @@ public class NettyProducer extends Defau
             workerExecutor = null;
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+        if (pool != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+            }
+            pool.close();
+            pool = null;
         }
-        pool.close();
 
         super.doStop();
     }
@@ -362,7 +378,7 @@ public class NettyProducer extends Defau
         }
     }
 
-    private Channel openChannel(ChannelFuture channelFuture) throws Exception {
+    protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
         // blocking for channel to be done
         if (LOG.isTraceEnabled()) {
             LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
@@ -382,13 +398,6 @@ public class NettyProducer extends Defau
         return answer;
     }
 
-    private void openAndCloseConnection() throws Exception {
-        ChannelFuture future = openConnection();
-        Channel channel = openChannel(future);
-        NettyHelper.close(channel);
-        ALL_CHANNELS.remove(channel);
-    }
-
     public NettyConfiguration getConfiguration() {
         return configuration;
     }

Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java?rev=1424398&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java Thu Dec 20 10:48:02 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.NoSuchElementException;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * An {@link org.apache.commons.pool.ObjectPool} that uses a single shared instance.
+ * <p/>
+ * This implementation will always return <tt>1</tt> in {@link #getNumActive()} and
+ * return <tt>0</tt> in {@link #getNumIdle()}.
+ */
+public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
+
+    private final PoolableObjectFactory<T> factory;
+    private volatile T t;
+
+    public SharedSingletonObjectPool(PoolableObjectFactory<T> factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    public synchronized T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+        if (t == null) {
+            t = factory.makeObject();
+        }
+        return t;
+    }
+
+    @Override
+    public void returnObject(T obj) throws Exception {
+        // noop
+    }
+
+    @Override
+    public void invalidateObject(T obj) throws Exception {
+        t = null;
+    }
+
+    @Override
+    public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+        // noop
+    }
+
+    @Override
+    public int getNumIdle() throws UnsupportedOperationException {
+        return 0;
+    }
+
+    @Override
+    public int getNumActive() throws UnsupportedOperationException {
+        return 1;
+    }
+
+    @Override
+    public void clear() throws Exception, UnsupportedOperationException {
+        t = null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        t = null;
+    }
+
+    @Override
+    public void setFactory(PoolableObjectFactory<T> factory) throws IllegalStateException, UnsupportedOperationException {
+        // noop
+    }
+}

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java (from r1424372, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java&r1=1424372&r2=1424398&rev=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java Thu Dec 20 10:48:02 2012
@@ -22,16 +22,14 @@ import org.junit.Test;
 /**
  * @version 
  */
-public class NettyTextlineInOutTest extends BaseNettyTest {
+public class NettyProducerPoolDisabledTest extends BaseNettyTest {
 
     @Test
-    public void testTextlineInOut() throws Exception {
-        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
-
-        String reply = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true", "Hello World", String.class);
-        assertEquals("Bye World", reply);
-
-        assertMockEndpointsSatisfied();
+    public void testProducerPoolDisabled() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            String reply = template.requestBody("direct:start", "Hello " + i, String.class);
+            assertEquals("Bye " + i, reply);
+        }
     }
 
     @Override
@@ -39,10 +37,12 @@ public class NettyTextlineInOutTest exte
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                from("direct:start")
+                    .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&producerPoolEnabled=false");
+
                 from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
                     // body should be a String when using textline codec
                     .validate(body().isInstanceOf(String.class))
-                    .to("mock:result")
                     .transform(body().regexReplaceAll("Hello", "Bye"));
             }
         };