You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/26 16:31:01 UTC

svn commit: r778715 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/test/ja...

Author: davsclaus
Date: Tue May 26 14:31:00 2009
New Revision: 778715

URL: http://svn.apache.org/viewvc?rev=778715&view=rev
Log:
CAMEL-1647: First cut of a service pool. Allows us to pool producers for mina and ftp that are otherwise not thread safe.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java   (with props)
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java
      - copied, changed from r778565, camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java
    camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/resources/log4j.properties

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * A service pool is like a connection pool.
+ *
+ * @version $Revision$
+ */
+public interface ServicePool<Key, Service> extends org.apache.camel.Service {
+
+    /**
+     * Acquires the given service. If absent in pool the service
+     * is added to the pool.
+     *
+     * @param key the key
+     * @param service the service
+     * @return the acquired service, is newer <tt>null</tt>
+     */
+    Service acquireIfAbsent(Key key, Service service);
+
+    /**
+     * Tries to acquire the servie with the given key
+     * @param key the key
+     * @return the acquired service, or <tt>null</tt> if no free in pool
+     */
+    Service acquire(Key key);
+
+    /**
+     * Releases the service back to the pool
+     *
+     * @param key  the key
+     * @param service the service
+     */
+    void release(Key key, Service service);
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java Tue May 26 14:31:00 2009
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Marker interface to indicate this service can be pooled using a {@link org.apache.camel.ServicePool}.
+ *
+ * @version $Revision$
+ */
+public interface ServicePoolAware {
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ServicePoolAware.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Tue May 26 14:31:00 2009
@@ -341,10 +341,6 @@
 
     // Properties
     // -----------------------------------------------------------------------
-    public Producer getProducer(Endpoint endpoint) {
-        return producerCache.getProducer(endpoint);
-    }
-
     public CamelContext getContext() {
         return context;
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.camel.ServicePool;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Default implementation to inherit for a basic service pool.
+ *
+ * @version $Revision$
+ */
+public abstract class DefaultServicePool<Key, Service> extends ServiceSupport implements ServicePool<Key, Service> {
+    protected final Log log = LogFactory.getLog(getClass());
+    protected final ConcurrentHashMap<Key, BlockingQueue<Service>> pool = new ConcurrentHashMap<Key, BlockingQueue<Service>>();
+    protected final int capacity;
+
+    /**
+     * The capacity, note this is per key.
+     *
+     * @param capacity the capacity per key.
+     */
+    public DefaultServicePool(int capacity) {
+        this.capacity = capacity;
+    }
+
+    public synchronized Service acquireIfAbsent(Key key, Service service) {
+        BlockingQueue<Service> entry = pool.get(key);
+        if (entry == null) {
+            entry = new ArrayBlockingQueue<Service>(capacity);
+            pool.put(key, entry);
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("AddAndAcquire key: " + key + " service: " + service);
+        }
+        // do not add the service as we acquire it
+        return service;
+    }
+
+    public synchronized Service acquire(Key key) {
+        BlockingQueue<Service> services = pool.get(key);
+        if (services == null || services.isEmpty()) {
+            if (log.isTraceEnabled()) {
+                log.trace("No free services in pool to acquire for key: " + key);
+            }
+            return null;
+        }
+
+        Service answer = services.poll();
+        if (log.isTraceEnabled()) {
+            log.trace("Acquire: " + key + " service: " + answer);
+        }
+        return answer;
+    }
+
+    public synchronized void release(Key key, Service service) {
+        if (log.isTraceEnabled()) {
+            log.trace("Release: " + key + " service: " + service);
+        }
+        BlockingQueue<Service> services = pool.get(key);
+        if (services != null) {
+            services.add(service);
+        }
+    }
+
+    protected void doStart() throws Exception {
+        log.debug("Starting service pool: " + this);
+    }
+
+    protected void doStop() throws Exception {
+        log.debug("Stopping service pool: " + this);
+        for (BlockingQueue<Service> entry : pool.values()) {
+            Collection<Service> values = new ArrayList<Service>();
+            entry.drainTo(values);
+            ServiceHelper.stopServices(values);
+            entry.clear();
+        }
+        pool.clear();
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultServicePool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Tue May 26 14:31:00 2009
@@ -19,13 +19,14 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.ServicePool;
+import org.apache.camel.ServicePoolAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.IsSingleton;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -41,38 +42,21 @@
     private static final transient Log LOG = LogFactory.getLog(ProducerCache.class);
 
     private final Map<String, Producer> producers = new HashMap<String, Producer>();
-
-    // TODO: Consider a pool for non singleton producers to leverage in the doInProducer template
-
-    public synchronized Producer getProducer(Endpoint endpoint) {
-        String key = endpoint.getEndpointUri();
-        Producer answer = producers.get(key);
-        if (answer == null) {
-            try {
-                answer = endpoint.createProducer();
-                answer.start();
-            } catch (Exception e) {
-                throw new FailedToCreateProducerException(endpoint, e);
-            }
-
-            // only add singletons to the cache
-            boolean singleton = true;
-            if (answer instanceof IsSingleton) {
-                singleton = ((IsSingleton)answer).isSingleton();
-            }
-
-            if (singleton) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
-                }
-                producers.put(key, answer);
-            } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Producer for endpoint: " + key + " is not singleton and thus not added to producer cache");
-                }
-            }
-        }
-        return answer;
+    // we use a capacity of 10 per endpoint, so for the same endpoint we have at most 10 producers in the pool
+    // so if we have 6 endpoints in the pool, we can have 6 x 10 producers in total
+    private final ServicePool<Endpoint, Producer> pool = new ProducerServicePool(10);
+
+    // TODO: Let CamelContext expose a global producer cache
+    // TODO: Have easy configuration of pooling in Camel
+    // TODO: Have a SPI interface for pluggable connection pools
+
+    public Producer getProducer(Endpoint endpoint) {
+        // As the producer is returned outside this method we do not want to return pooled producers
+        // so we pass in false to the method. if we returned pooled producers then the user had
+        // to remember to return it back in the pool.
+        // See method doInProducer that is safe template pattern where we handle the lifecycle and
+        // thus safely can use pooled producers there
+        return doGetProducer(endpoint, false);
     }
 
     /**
@@ -135,8 +119,8 @@
      * @throws Exception if an internal processing error has occurred.
      */
     public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
-        // get or create the producer
-        Producer producer = getProducer(endpoint);
+        // get the producer and we do not mind if its pooled as we can handle returning it back to the pool
+        Producer producer = doGetProducer(endpoint, true);
 
         if (producer == null) {
             if (isStopped()) {
@@ -151,12 +135,11 @@
             // invoke the callback
             return callback.doInProducer(producer, exchange, pattern);
         } finally {
-            // stop non singleton producers as we should not leak resources
-            boolean singleton = true;
-            if (producer instanceof IsSingleton) {
-                singleton = ((IsSingleton)producer).isSingleton();
-            }
-            if (!singleton) {
+            if (producer instanceof ServicePoolAware) {
+                // release back to the pool
+                pool.release(endpoint, producer);
+            } else if (!producer.isSingleton()) {
+                // stop non singleton producers as we should not leak resources
                 producer.stop();
             }
         }
@@ -185,11 +168,47 @@
         });
     }
 
+    protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) {
+        String key = endpoint.getEndpointUri();
+        Producer answer = producers.get(key);
+        if (pooled && answer == null) {
+            // try acquire from connection pool
+            answer = pool.acquire(endpoint);
+        }
+
+        if (answer == null) {
+            // create a new producer
+            try {
+                answer = endpoint.createProducer();
+                answer.start();
+            } catch (Exception e) {
+                throw new FailedToCreateProducerException(endpoint, e);
+            }
+
+            // add producer to cache or pool if applicable
+            if (pooled && answer instanceof ServicePoolAware) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding to producer service pool with key: " + endpoint + " for producer: " + answer);
+                }
+                answer = pool.acquireIfAbsent(endpoint, answer);
+            } else if (answer.isSingleton()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
+                }
+                producers.put(key, answer);
+            }
+        }
+
+        return answer;
+    }
+
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(producers.values());
         producers.clear();
+        ServiceHelper.stopServices(pool);
     }
 
     protected void doStart() throws Exception {
+        ServiceHelper.startServices(pool);
     }
 }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java Tue May 26 14:31:00 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Producer;
+
+/**
+ * A service pool for {@link Producer}.
+ * <p/>
+ * For instance camel-mina and camel-ftp leverages this to allow a pool of producers so we
+ * can support concurrent producers in a thread safe manner.
+ *
+ * @version $Revision$
+ */
+public class ProducerServicePool extends DefaultServicePool<Endpoint, Producer> {
+
+    public ProducerServicePool(int capacity) {
+        super(capacity);
+    }
+
+    synchronized int size() {
+        int size = 0;
+        for (BlockingQueue<Producer> queue : pool.values()) {
+            size += queue.size();
+        }
+        return size;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerServicePool.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue May 26 14:31:00 2009
@@ -21,6 +21,8 @@
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.model.RoutingSlipDefinition;
@@ -64,15 +66,19 @@
 
         for (String nextRecipient : recipients) {
             Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
-            Producer producer = producerCache.getProducer(endpoint);
-            Exchange ex = current.newInstance();
 
+            Exchange copy = current.newInstance();
             updateRoutingSlip(current);
-            copyOutToIn(ex, current);
+            copyOutToIn(copy, current);
 
-            producer.process(ex);
+            producerCache.doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
+                public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
+                    producer.process(exchange);
+                    return exchange;
+                }
+            });
 
-            current = ex;
+            current = copy;
         }
         ExchangeHelper.copyResults(exchange, current);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Tue May 26 14:31:00 2009
@@ -68,10 +68,6 @@
         return destination;
     }
 
-    protected Producer getProducer() {
-        return producerCache.getProducer(destination);
-    }
-
     protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
         if (pattern != null) {
             exchange.setPattern(pattern);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/TypeConverterAware.java Tue May 26 14:31:00 2009
@@ -22,7 +22,7 @@
  * An interface for an object which is interested in being injected with the root {@link TypeConverter}
  * such as for implementing a fallback type converter
  *
- * @see org.apache.camel.impl.converter.DefaultTypeConverter#addFallbackConverter(TypeConverter)
+ * @see org.apache.camel.impl.converter.DefaultTypeConverter#addFallbackTypeConverter(org.apache.camel.TypeConverter)
  *         DefaultTypeConverter.addFallbackConverter
  * @version $Revision$
  */

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java?rev=778715&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java Tue May 26 14:31:00 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ServicePool;
+import org.apache.camel.ServicePoolAware;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+
+/**
+ * @version $Revision$
+ */
+public class ServicePoolTest extends ContextTestSupport {
+
+    // TODO: Add unit test for only once stop/start of pooled service
+    // TODO: Add some stress test of the pool
+
+    private ProducerServicePool pool;
+
+    private class MyProducer extends DefaultProducer implements ServicePoolAware {
+
+        public MyProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+
+        public void process(Exchange exchange) throws Exception {
+            // noop
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        pool = new ProducerServicePool(5);
+        pool.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        pool.stop();
+        super.tearDown();
+    }
+
+    public void testSingleEnry() throws Exception {
+        Endpoint endpoint = context.getEndpoint("mock:foo");
+
+        assertNull(pool.acquire(endpoint));
+        assertEquals(0, pool.size());
+
+        Producer producer = new MyProducer(endpoint);
+        producer = pool.acquireIfAbsent(endpoint, producer);
+        assertEquals(0, pool.size());
+
+        pool.release(endpoint, producer);
+        assertEquals(1, pool.size());
+
+        producer = pool.acquire(endpoint);
+        assertNotNull(producer);
+        assertEquals(0, pool.size());
+
+        pool.release(endpoint, producer);
+        assertEquals(1, pool.size());
+
+        pool.stop();
+        assertEquals(0, pool.size());
+    }
+
+    public void testTwoEnries() throws Exception {
+        Endpoint endpoint = context.getEndpoint("mock:foo");
+
+        Producer producer1 = new MyProducer(endpoint);
+        Producer producer2 = new MyProducer(endpoint);
+
+        producer1 = pool.acquireIfAbsent(endpoint, producer1);
+        producer2 = pool.acquireIfAbsent(endpoint, producer2);
+
+        assertEquals(0, pool.size());
+        pool.release(endpoint, producer1);
+        assertEquals(1, pool.size());
+        pool.release(endpoint, producer2);
+        assertEquals(2, pool.size());
+
+        pool.stop();
+        assertEquals(0, pool.size());
+    }
+
+    public void testThreeEntries() throws Exception {
+        Endpoint endpoint = context.getEndpoint("mock:foo");
+
+        Producer producer1 = new MyProducer(endpoint);
+        Producer producer2 = new MyProducer(endpoint);
+        Producer producer3 = new MyProducer(endpoint);
+
+        producer1 = pool.acquireIfAbsent(endpoint, producer1);
+        producer2 = pool.acquireIfAbsent(endpoint, producer2);
+        producer3 = pool.acquireIfAbsent(endpoint, producer3);
+
+        assertEquals(0, pool.size());
+        pool.release(endpoint, producer1);
+        assertEquals(1, pool.size());
+        pool.release(endpoint, producer2);
+        assertEquals(2, pool.size());
+        pool.release(endpoint, producer3);
+        assertEquals(3, pool.size());
+
+        pool.stop();
+        assertEquals(0, pool.size());
+    }
+    
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ServicePoolTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java (original)
+++ camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java Tue May 26 14:31:00 2009
@@ -16,11 +16,10 @@
  */
 package org.apache.camel.component.file.remote;
 
-import java.io.File;
 import java.io.IOException;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.IsSingleton;
+import org.apache.camel.ServicePoolAware;
 import org.apache.camel.component.file.GenericFileExchange;
 import org.apache.camel.component.file.GenericFileOperationFailedException;
 import org.apache.camel.component.file.GenericFileProducer;
@@ -30,7 +29,7 @@
  * Remote file producer. Handles connecting and disconnecting if we are not.
  * Generic type F is the remote system implementation of a file.
  */
-public class RemoteFileProducer<T> extends GenericFileProducer<T> implements IsSingleton {
+public class RemoteFileProducer<T> extends GenericFileProducer<T> implements ServicePoolAware {
 
     private boolean loggedIn;
     

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Tue May 26 14:31:00 2009
@@ -24,6 +24,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Producer;
+import org.apache.camel.ServicePoolAware;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.commons.logging.Log;
@@ -40,7 +41,7 @@
  *
  * @version $Revision$
  */
-public class MinaProducer extends DefaultProducer {
+public class MinaProducer extends DefaultProducer implements ServicePoolAware {
     private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
     private IoSession session;
     private MinaEndpoint endpoint;

Copied: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java (from r778565, camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java?p2=camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java&p1=camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java&r1=778565&r2=778715&rev=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerAnotherConcurrentTest.java Tue May 26 14:31:00 2009
@@ -16,9 +16,7 @@
  */
 package org.apache.camel.component.mina;
 
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -33,14 +31,19 @@
 /**
  * @version $Revision$
  */
-public class MinaProducerConcurrentTest extends ContextTestSupport {
+public class MinaProducerAnotherConcurrentTest extends ContextTestSupport {
+
+    public void testSimple() throws Exception {
+        String out = template.requestBody("direct:start", "A", String.class);
+        assertEquals("Bye A", out);
+    }
 
     public void testNoConcurrentProducers() throws Exception {
         doSendMessages(1, 1);
     }
 
     public void testConcurrentProducers() throws Exception {
-        doSendMessages(10, 5);
+        doSendMessages(200, 5);
     }
 
     private void doSendMessages(int files, int poolSize) throws Exception {
@@ -52,7 +55,7 @@
             final int index = i;
             Future out = executor.submit(new Callable<Object>() {
                 public Object call() throws Exception {
-                    return template.requestBody("mina:tcp://localhost:8080?sync=true", index, String.class);
+                    return template.requestBody("direct:start", index, String.class);
                 }
             });
             responses.put(index, out);
@@ -61,20 +64,18 @@
         assertMockEndpointsSatisfied();
         assertEquals(files, responses.size());
 
-        // get all responses
-        Set unique = new HashSet();
-        for (Future future : responses.values()) {
-            unique.add(future.get());
+        for (int i = 0; i < files; i++) {
+            Object out = responses.get(i).get();
+            assertEquals("Bye " + i, out);
         }
-
-        // should be 10 unique responses
-        assertEquals("Should be " + files + " unique responses", files, unique.size());
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
+                from("direct:start").to("mina:tcp://localhost:8080?sync=true");
+
                 from("mina:tcp://localhost:8080?sync=true").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String body = exchange.getIn().getBody(String.class);
@@ -85,4 +86,4 @@
         };
     }
 
-}
+}
\ No newline at end of file

Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=778715&r1=778714&r2=778715&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Tue May 26 14:31:00 2009
@@ -21,6 +21,11 @@
 log4j.rootLogger=INFO, file
 
 #log4j.logger.org.apache.camel.component.mina=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
+log4j.logger.org.apache.camel.impl.converter=WARN
+log4j.logger.org.apache.camel.management=WARN
+log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender