You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/12/20 11:48:02 UTC
svn commit: r1424398 - in /camel/trunk/components/camel-netty/src:
main/java/org/apache/camel/component/netty/
test/java/org/apache/camel/component/netty/
Author: davsclaus
Date: Thu Dec 20 10:48:02 2012
New Revision: 1424398
URL: http://svn.apache.org/viewvc?rev=1424398&view=rev
Log:
CAMEL-5901: Allow to turn on|off netty producer pool.
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java
- copied, changed from r1424372, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1424398&r1=1424397&r2=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu Dec 20 10:48:02 2012
@@ -83,9 +83,10 @@ public class NettyConfiguration implemen
private int producerPoolMinIdle;
private int producerPoolMaxIdle = 100;
private long producerPoolMinEvictableIdle = 5 * 60 * 1000L;
+ private boolean producerPoolEnabled = true;
private int backlog;
private Map<String, Object> options;
-
+
/**
* Returns a copy of this configuration
*/
@@ -582,6 +583,14 @@ public class NettyConfiguration implemen
this.producerPoolMinEvictableIdle = producerPoolMinEvictableIdle;
}
+ public boolean isProducerPoolEnabled() {
+ return producerPoolEnabled;
+ }
+
+ public void setProducerPoolEnabled(boolean producerPoolEnabled) {
+ this.producerPoolEnabled = producerPoolEnabled;
+ }
+
public int getBacklog() {
return backlog;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1424398&r1=1424397&r2=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu Dec 20 10:48:02 2012
@@ -91,20 +91,32 @@ public class NettyProducer extends Defau
protected void doStart() throws Exception {
super.doStart();
- // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
- GenericObjectPool.Config config = new GenericObjectPool.Config();
- config.maxActive = configuration.getProducerPoolMaxActive();
- config.minIdle = configuration.getProducerPoolMinIdle();
- config.maxIdle = configuration.getProducerPoolMaxIdle();
- // we should test on borrow to ensure the channel is still valid
- config.testOnBorrow = true;
- // only evict channels which are no longer valid
- config.testWhileIdle = true;
- // run eviction every 30th second
- config.timeBetweenEvictionRunsMillis = 30 * 1000L;
- config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
- config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
- pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+ if (configuration.isProducerPoolEnabled()) {
+ // setup pool where we want an unbounded pool, which allows the pool to shrink on no demand
+ GenericObjectPool.Config config = new GenericObjectPool.Config();
+ config.maxActive = configuration.getProducerPoolMaxActive();
+ config.minIdle = configuration.getProducerPoolMinIdle();
+ config.maxIdle = configuration.getProducerPoolMaxIdle();
+ // we should test on borrow to ensure the channel is still valid
+ config.testOnBorrow = true;
+ // only evict channels which are no longer valid
+ config.testWhileIdle = true;
+ // run eviction every 30th second
+ config.timeBetweenEvictionRunsMillis = 30 * 1000L;
+ config.minEvictableIdleTimeMillis = configuration.getProducerPoolMinEvictableIdle();
+ config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_FAIL;
+ pool = new GenericObjectPool<Channel>(new NettyProducerPoolableObjectFactory(), config);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created NettyProducer pool[maxActive={}, minIdle={}, maxIdle={}, minEvictableIdleTimeMillis={}] -> {}",
+ new Object[]{config.maxActive, config.minIdle, config.maxIdle, config.minEvictableIdleTimeMillis, pool});
+ }
+ } else {
+ pool = new SharedSingletonObjectPool<Channel>(new NettyProducerPoolableObjectFactory());
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Created NettyProducer shared singleton pool -> {}", pool);
+ }
+ }
// setup pipeline factory
ClientPipelineFactory factory = configuration.getClientPipelineFactory();
@@ -122,7 +134,8 @@ public class NettyProducer extends Defau
if (!configuration.isLazyChannelCreation()) {
// ensure the connection can be established when we start up
- openAndCloseConnection();
+ Channel channel = pool.borrowObject();
+ pool.returnObject(channel);
}
}
@@ -149,10 +162,13 @@ public class NettyProducer extends Defau
workerExecutor = null;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+ if (pool != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping producer with channel pool[active={}, idle={}]", pool.getNumActive(), pool.getNumIdle());
+ }
+ pool.close();
+ pool = null;
}
- pool.close();
super.doStop();
}
@@ -362,7 +378,7 @@ public class NettyProducer extends Defau
}
}
- private Channel openChannel(ChannelFuture channelFuture) throws Exception {
+ protected Channel openChannel(ChannelFuture channelFuture) throws Exception {
// blocking for channel to be done
if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for operation to complete {} for {} millis", channelFuture, configuration.getConnectTimeout());
@@ -382,13 +398,6 @@ public class NettyProducer extends Defau
return answer;
}
- private void openAndCloseConnection() throws Exception {
- ChannelFuture future = openConnection();
- Channel channel = openChannel(future);
- NettyHelper.close(channel);
- ALL_CHANNELS.remove(channel);
- }
-
public NettyConfiguration getConfiguration() {
return configuration;
}
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java?rev=1424398&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java Thu Dec 20 10:48:02 2012
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.NoSuchElementException;
+
+import org.apache.commons.pool.ObjectPool;
+import org.apache.commons.pool.PoolableObjectFactory;
+
+/**
+ * An {@link org.apache.commons.pool.ObjectPool} that uses a single shared instance.
+ * <p/>
+ * This implementation will always return <tt>1</tt> in {@link #getNumActive()} and
+ * return <tt>0</tt> in {@link #getNumIdle()}.
+ */
+public class SharedSingletonObjectPool<T> implements ObjectPool<T> {
+
+ private final PoolableObjectFactory<T> factory;
+ private volatile T t;
+
+ public SharedSingletonObjectPool(PoolableObjectFactory<T> factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public synchronized T borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
+ if (t == null) {
+ t = factory.makeObject();
+ }
+ return t;
+ }
+
+ @Override
+ public void returnObject(T obj) throws Exception {
+ // noop
+ }
+
+ @Override
+ public void invalidateObject(T obj) throws Exception {
+ t = null;
+ }
+
+ @Override
+ public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
+ // noop
+ }
+
+ @Override
+ public int getNumIdle() throws UnsupportedOperationException {
+ return 0;
+ }
+
+ @Override
+ public int getNumActive() throws UnsupportedOperationException {
+ return 1;
+ }
+
+ @Override
+ public void clear() throws Exception, UnsupportedOperationException {
+ t = null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ t = null;
+ }
+
+ @Override
+ public void setFactory(PoolableObjectFactory<T> factory) throws IllegalStateException, UnsupportedOperationException {
+ // noop
+ }
+}
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/SharedSingletonObjectPool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java (from r1424372, camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java?p2=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java&p1=camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java&r1=1424372&r2=1424398&rev=1424398&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTextlineInOutTest.java (original)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProducerPoolDisabledTest.java Thu Dec 20 10:48:02 2012
@@ -22,16 +22,14 @@ import org.junit.Test;
/**
* @version
*/
-public class NettyTextlineInOutTest extends BaseNettyTest {
+public class NettyProducerPoolDisabledTest extends BaseNettyTest {
@Test
- public void testTextlineInOut() throws Exception {
- getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
-
- String reply = template.requestBody("netty:tcp://localhost:{{port}}?textline=true&sync=true", "Hello World", String.class);
- assertEquals("Bye World", reply);
-
- assertMockEndpointsSatisfied();
+ public void testProducerPoolDisabled() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ String reply = template.requestBody("direct:start", "Hello " + i, String.class);
+ assertEquals("Bye " + i, reply);
+ }
}
@Override
@@ -39,10 +37,12 @@ public class NettyTextlineInOutTest exte
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ from("direct:start")
+ .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&producerPoolEnabled=false");
+
from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
// body should be a String when using textline codec
.validate(body().isInstanceOf(String.class))
- .to("mock:result")
.transform(body().regexReplaceAll("Hello", "Bye"));
}
};