You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/26 16:31:01 UTC
svn commit: r778715 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/test/ja...
Author: davsclaus
Date: Tue May 26 14:31:00 2009
New Revision: 778715
URL: http://svn.apache.org/viewvc?rev=778715&view=rev
Log:
CAMEL-1647: First cut of a service pool. Allows us to pool producers for mina and ftp that are otherwise not thread safe.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java (with props)
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java (with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java (with props)
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java
- copied, changed from r778565, camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java
camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/resources/log4j.properties
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * A service pool is like a connection pool.
+ *
+ * @version $Revision$
+ */
+public interface ServicePool<Key, Service> extends org.apache.camel.Service {
+
+ /**
+ * Acquires the given service. If absent in pool the service
+ * is added to the pool.
+ *
+ * @param key the key
+ * @param service the service
+ * @return the acquired service, is newer <tt>null</tt>
+ */
+ Service acquireIfAbsent(Key key, Service service);
+
+ /**
+ * Tries to acquire the servie with the given key
+ * @param key the key
+ * @return the acquired service, or <tt>null</tt> if no free in pool
+ */
+ Service acquire(Key key);
+
+ /**
+ * Releases the service back to the pool
+ *
+ * @param key the key
+ * @param service the service
+ */
+ void release(Key key, Service service);
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java Tue May 26 14:31:00 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Marker interface to indicate this service can be pooled using a {@link org.apache.camel.ServicePool}.
+ *
+ * @version $Revision$
+ */
+public interface ServicePoolAware {
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Tue May 26 14:31:00 2009
@@ -341,10 +341,6 @@
// Properties
// -----------------------------------------------------------------------
- public Producer getProducer(Endpoint endpoint) {
- return producerCache.getProducer(endpoint);
- }
-
public CamelContext getContext() {
return context;
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.ServicePool;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Default implementation to inherit for a basic service pool.
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultServicePool<Key, Service> extends ServiceSupport implements ServicePool<Key, Service> {
+ protected final Log log = LogFactory.getLog(getClass());
+ protected final ConcurrentHashMap<Key, BlockingQueue<Service>> pool = new ConcurrentHashMap<Key, BlockingQueue<Service>>();
+ protected final int capacity;
+
+ /**
+ * The capacity, note this is per key.
+ *
+ * @param capacity the capacity per key.
+ */
+ public DefaultServicePool(int capacity) {
+ this.capacity = capacity;
+ }
+
+ public synchronized Service acquireIfAbsent(Key key, Service service) {
+ BlockingQueue<Service> entry = pool.get(key);
+ if (entry == null) {
+ entry = new ArrayBlockingQueue<Service>(capacity);
+ pool.put(key, entry);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("AddAndAcquire key: " + key + " service: " + service);
+ }
+ // do not add the service as we acquire it
+ return service;
+ }
+
+ public synchronized Service acquire(Key key) {
+ BlockingQueue<Service> services = pool.get(key);
+ if (services == null || services.isEmpty()) {
+ if (log.isTraceEnabled()) {
+ log.trace("No free services in pool to acquire for key: " + key);
+ }
+ return null;
+ }
+
+ Service answer = services.poll();
+ if (log.isTraceEnabled()) {
+ log.trace("Acquire: " + key + " service: " + answer);
+ }
+ return answer;
+ }
+
+ public synchronized void release(Key key, Service service) {
+ if (log.isTraceEnabled()) {
+ log.trace("Release: " + key + " service: " + service);
+ }
+ BlockingQueue<Service> services = pool.get(key);
+ if (services != null) {
+ services.add(service);
+ }
+ }
+
+ protected void doStart() throws Exception {
+ log.debug("Starting service pool: " + this);
+ }
+
+ protected void doStop() throws Exception {
+ log.debug("Stopping service pool: " + this);
+ for (BlockingQueue<Service> entry : pool.values()) {
+ Collection<Service> values = new ArrayList<Service>();
+ entry.drainTo(values);
+ ServiceHelper.stopServices(values);
+ entry.clear();
+ }
+ pool.clear();
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Tue May 26 14:31:00 2009
@@ -19,13 +19,14 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.ServicePool;
+import org.apache.camel.ServicePoolAware;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.IsSingleton;
import org.apache.camel.ProducerCallback;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -41,38 +42,21 @@
private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
private final Map<String, Producer> producers = new HashMap<String, Producer>();
-
- // TODO: Consider a pool for non singleton producers to leverage in the doInProducer template
-
- public synchronized Producer getProducer(Endpoint endpoint) {
- String key = endpoint.getEndpointUri();
- Producer answer = producers.get(key);
- if (answer == null) {
- try {
- answer = endpoint.createProducer();
- answer.start();
- } catch (Exception e) {
- throw new FailedToCreateProducerException(endpoint, e);
- }
-
- // only add singletons to the cache
- boolean singleton = true;
- if (answer instanceof IsSingleton) {
- singleton = ((IsSingleton)answer).isSingleton();
- }
-
- if (singleton) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
- }
- producers.put(key, answer);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Producer for endpoint: " + key + " is not singleton and thus not added to producer cache");
- }
- }
- }
- return answer;
+ // we use a capacity of 10 per endpoint, so for the same endpoint we have at most 10 producers in the pool
+ // so if we have 6 endpoints in the pool, we can have 6 x 10 producers in total
+ private final ServicePool<Endpoint, Producer> pool = new ProducerServicePool(10);
+
+ // TODO: Let CamelContext expose a global producer cache
+ // TODO: Have easy configuration of pooling in Camel
+ // TODO: Have a SPI interface for pluggable connection pools
+
+ public Producer getProducer(Endpoint endpoint) {
+ // As the producer is returned outside this method we do not want to return pooled producers
+ // so we pass in false to the method. if we returned pooled producers then the user had
+ // to remember to return it back in the pool.
+ // See method doInProducer that is safe template pattern where we handle the lifecycle and
+ // thus safely can use pooled producers there
+ return doGetProducer(endpoint, false);
}
/**
@@ -135,8 +119,8 @@
* @throws Exception if an internal processing error has occurred.
*/
public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
- // get or create the producer
- Producer producer = getProducer(endpoint);
+ // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
+ Producer producer = doGetProducer(endpoint, true);
if (producer == null) {
if (isStopped()) {
@@ -151,12 +135,11 @@
// invoke the callback
return callback.doInProducer(producer, exchange, pattern);
} finally {
- // stop non singleton producers as we should not leak resources
- boolean singleton = true;
- if (producer instanceof IsSingleton) {
- singleton = ((IsSingleton)producer).isSingleton();
- }
- if (!singleton) {
+ if (producer instanceof ServicePoolAware) {
+ // release back to the pool
+ pool.release(endpoint, producer);
+ } else if (!producer.isSingleton()) {
+ // stop non singleton producers as we should not leak resources
producer.stop();
}
}
@@ -185,11 +168,47 @@
});
}
+ protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
+ String key = endpoint.getEndpointUri();
+ Producer answer = producers.get(key);
+ if (pooled && answer == null) {
+ // try acquire from connection pool
+ answer = pool.acquire(endpoint);
+ }
+
+ if (answer == null) {
+ // create a new producer
+ try {
+ answer = endpoint.createProducer();
+ answer.start();
+ } catch (Exception e) {
+ throw new FailedToCreateProducerException(endpoint, e);
+ }
+
+ // add producer to cache or pool if applicable
+ if (pooled && answer instanceof ServicePoolAware) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding to producer service pool with key: " + endpoint + " for producer: " + answer);
+ }
+ answer = pool.acquireIfAbsent(endpoint, answer);
+ } else if (answer.isSingleton()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
+ }
+ producers.put(key, answer);
+ }
+ }
+
+ return answer;
+ }
+
protected void doStop() throws Exception {
ServiceHelper.stopServices(producers.values());
producers.clear();
+ ServiceHelper.stopServices(pool);
}
protected void doStart() throws Exception {
+ ServiceHelper.startServices(pool);
}
}
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+/**
+ * A service pool for {@link Producer}.
+ * <p/>
+ * For instance camel-mina and camel-ftp leverages this to allow a pool of producers so we
+ * can support concurrent producers in a thread safe manner.
+ *
+ * @version $Revision$
+ */
+public class ProducerServicePool extends DefaultServicePool<Endpoint, Producer> {
+
+ public ProducerServicePool(int capacity) {
+ super(capacity);
+ }
+
+ synchronized int size() {
+ int size = 0;
+ for (BlockingQueue<Producer> queue : pool.values()) {
+ size += queue.size();
+ }
+ return size;
+ }
+
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue May 26 14:31:00 2009
@@ -21,6 +21,8 @@
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.model.RoutingSlipDefinition;
@@ -64,15 +66,19 @@
for (String nextRecipient : recipients) {
Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
- Producer producer = producerCache.getProducer(endpoint);
- Exchange ex = current.newInstance();
+ Exchange copy = current.newInstance();
updateRoutingSlip(current);
- copyOutToIn(ex, current);
+ copyOutToIn(copy, current);
- producer.process(ex);
+ producerCache.doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
+ public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
+ producer.process(exchange);
+ return exchange;
+ }
+ });
- current = ex;
+ current = copy;
}
ExchangeHelper.copyResults(exchange, current);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Tue May 26 14:31:00 2009
@@ -68,10 +68,6 @@
return destination;
}
- protected Producer getProducer() {
- return producerCache.getProducer(destination);
- }
-
protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
if (pattern != null) {
exchange.setPattern(pattern);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java Tue May 26 14:31:00 2009
@@ -22,7 +22,7 @@
* An interface for an object which is interested in being injected with the root {@link TypeConverter}
* such as for implementing a fallback type converter
*
- * @see org.apache.camel.impl.converter.DefaultTypeConverter#addFallbackConverter(TypeConverter)
+ * @see org.apache.camel.impl.converter.DefaultTypeConverter#addFallbackTypeConverter(org.apache.camel.TypeConverter)
* DefaultTypeConverter.addFallbackConverter
* @version $Revision$
*/
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java Tue May 26 14:31:00 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ServicePool;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+
+/**
+ * @version $Revision$
+ */
+public class ServicePoolTest extends ContextTestSupport {
+
+ // TODO: Add unit test for only once stop/start of pooled service
+ // TODO: Add some stress test of the pool
+
+ private ProducerServicePool pool;
+
+ private class MyProducer extends DefaultProducer implements ServicePoolAware {
+
+ public MyProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ // noop
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ pool = new ProducerServicePool(5);
+ pool.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ pool.stop();
+ super.tearDown();
+ }
+
+ public void testSingleEnry() throws Exception {
+ Endpoint endpoint = context.getEndpoint("mock:foo");
+
+ assertNull(pool.acquire(endpoint));
+ assertEquals(0, pool.size());
+
+ Producer producer = new MyProducer(endpoint);
+ producer = pool.acquireIfAbsent(endpoint, producer);
+ assertEquals(0, pool.size());
+
+ pool.release(endpoint, producer);
+ assertEquals(1, pool.size());
+
+ producer = pool.acquire(endpoint);
+ assertNotNull(producer);
+ assertEquals(0, pool.size());
+
+ pool.release(endpoint, producer);
+ assertEquals(1, pool.size());
+
+ pool.stop();
+ assertEquals(0, pool.size());
+ }
+
+ public void testTwoEnries() throws Exception {
+ Endpoint endpoint = context.getEndpoint("mock:foo");
+
+ Producer producer1 = new MyProducer(endpoint);
+ Producer producer2 = new MyProducer(endpoint);
+
+ producer1 = pool.acquireIfAbsent(endpoint, producer1);
+ producer2 = pool.acquireIfAbsent(endpoint, producer2);
+
+ assertEquals(0, pool.size());
+ pool.release(endpoint, producer1);
+ assertEquals(1, pool.size());
+ pool.release(endpoint, producer2);
+ assertEquals(2, pool.size());
+
+ pool.stop();
+ assertEquals(0, pool.size());
+ }
+
+ public void testThreeEntries() throws Exception {
+ Endpoint endpoint = context.getEndpoint("mock:foo");
+
+ Producer producer1 = new MyProducer(endpoint);
+ Producer producer2 = new MyProducer(endpoint);
+ Producer producer3 = new MyProducer(endpoint);
+
+ producer1 = pool.acquireIfAbsent(endpoint, producer1);
+ producer2 = pool.acquireIfAbsent(endpoint, producer2);
+ producer3 = pool.acquireIfAbsent(endpoint, producer3);
+
+ assertEquals(0, pool.size());
+ pool.release(endpoint, producer1);
+ assertEquals(1, pool.size());
+ pool.release(endpoint, producer2);
+ assertEquals(2, pool.size());
+ pool.release(endpoint, producer3);
+ assertEquals(3, pool.size());
+
+ pool.stop();
+ assertEquals(0, pool.size());
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java Tue May 26 14:31:00 2009
@@ -16,11 +16,10 @@
*/
package org.apache.camel.component.file.remote;
-import java.io.File;
import java.io.IOException;
import org.apache.camel.Exchange;
-import org.apache.camel.IsSingleton;
+import org.apache.camel.ServicePoolAware;
import org.apache.camel.component.file.GenericFileExchange;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileProducer;
@@ -30,7 +29,7 @@
* Remote file producer. Handles connecting and disconnecting if we are not.
* Generic type F is the remote system implementation of a file.
*/
-public class RemoteFileProducer<T> extends GenericFileProducer<T> implements IsSingleton {
+public class RemoteFileProducer<T> extends GenericFileProducer<T> implements ServicePoolAware {
private boolean loggedIn;
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue May 26 14:31:00 2009
@@ -24,6 +24,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Producer;
+import org.apache.camel.ServicePoolAware;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
@@ -40,7 +41,7 @@
*
* @version $Revision$
*/
-public class MinaProducer extends DefaultProducer {
+public class MinaProducer extends DefaultProducer implements ServicePoolAware {
private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
private IoSession session;
private MinaEndpoint endpoint;
Copied: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java (from r778565, camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java?p2=camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java&p1=camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java&r1=778565&r2=778715&rev=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java Tue May 26 14:31:00 2009
@@ -16,9 +16,7 @@
*/
package org.apache.camel.component.mina;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -33,14 +31,19 @@
/**
* @version $Revision$
*/
-public class MinaProducerConcurrentTest extends ContextTestSupport {
+public class MinaProducerAnotherConcurrentTest extends ContextTestSupport {
+
+ public void testSimple() throws Exception {
+ String out = template.requestBody("direct:start", "A", String.class);
+ assertEquals("Bye A", out);
+ }
public void testNoConcurrentProducers() throws Exception {
doSendMessages(1, 1);
}
public void testConcurrentProducers() throws Exception {
- doSendMessages(10, 5);
+ doSendMessages(200, 5);
}
private void doSendMessages(int files, int poolSize) throws Exception {
@@ -52,7 +55,7 @@
final int index = i;
Future out = executor.submit(new Callable<Object>() {
public Object call() throws Exception {
- return template.requestBody("mina:tcp://localhost:8080?sync=true", index, String.class);
+ return template.requestBody("direct:start", index, String.class);
}
});
responses.put(index, out);
@@ -61,20 +64,18 @@
assertMockEndpointsSatisfied();
assertEquals(files, responses.size());
- // get all responses
- Set unique = new HashSet();
- for (Future future : responses.values()) {
- unique.add(future.get());
+ for (int i = 0; i < files; i++) {
+ Object out = responses.get(i).get();
+ assertEquals("Bye " + i, out);
}
-
- // should be 10 unique responses
- assertEquals("Should be " + files + " unique responses", files, unique.size());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
+ from("direct:start").to("mina:tcp://localhost:8080?sync=true");
+
from("mina:tcp://localhost:8080?sync=true").process(new Processor() {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
@@ -85,4 +86,4 @@
};
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Tue May 26 14:31:00 2009
@@ -21,6 +21,11 @@
log4j.rootLogger=INFO, file
#log4j.logger.org.apache.camel.component.mina=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
+log4j.logger.org.apache.camel.impl.converter=WARN
+log4j.logger.org.apache.camel.management=WARN
+log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender