You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/25 15:22:03 UTC

svn commit: r778418 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/impl/ components/camel-cxf/...

Author: davsclaus
Date: Mon May 25 13:22:02 2009
New Revision: 778418

URL: http://svn.apache.org/viewvc?rev=778418&view=rev
Log:
CAMEL-1644: Added scope for Producer so we can have singleton/prototype producers. Allow to fix thread safety issues.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java   (with props)
    camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java   (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java
    camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
    camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
    camel/trunk/components/camel-mina/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java Mon May 25 13:22:02 2009
@@ -22,7 +22,7 @@
  * 
  * @version $Revision$
  */
-public interface Producer extends Processor, Service {
+public interface Producer extends Processor, Service, IsSingleton {
 
     /**
      * Gets the endpoint this producer sends to.

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java?rev=778418&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java Mon May 25 13:22:02 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Callback for sending a exchange message to a endpoint using a producer.
+ * <p/>
+ * Using this callback as a template pattern ensures that Camel handles the resource handling and will
+ * start and stop the given producer, to avoid resource leaks.
+ *
+ * @version $Revision$
+ */
+public interface ProducerCallback<T> {
+
+    /**
+     * Performs operation on the given producer to send the given exchange.
+     *
+     * @param producer  the producer, is newer <tt>null</tt>
+     * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
+     * @param exchangePattern the exchange pattern, can be <tt>null</tt>
+     * @return the response
+     * @throws Exception if an internal processing error has occurred.
+     */
+    T doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception;
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java Mon May 25 13:22:02 2009
@@ -23,11 +23,11 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.IsSingleton;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 /**
  * Cache containing created {@link org.apache.camel.Consumer}.
  *
@@ -48,7 +48,22 @@
             } catch (Exception e) {
                 throw new FailedToCreateConsumerException(endpoint, e);
             }
-            consumers.put(key, answer);
+
+            boolean singleton = true;
+            if (answer instanceof IsSingleton) {
+                singleton = ((IsSingleton)answer).isSingleton();
+            }
+
+            if (singleton) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding to consumer cache with key: " + endpoint + " for consumer: " + answer);
+                }
+                consumers.put(key, answer);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Consumer for endpoint: " + key + " is not singleton and thus not added to producer cache");
+                }
+            }
         }
         return answer;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Mon May 25 13:22:02 2009
@@ -54,6 +54,10 @@
         return endpoint.createExchange(exchange);
     }
 
+    public boolean isSingleton() {
+        return true;
+    }
+
     protected void doStart() throws Exception {
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Mon May 25 13:22:02 2009
@@ -49,9 +49,6 @@
     private static final int DEFAULT_THREADPOOL_SIZE = 5;
     private final CamelContext context;
     private final ProducerCache producerCache = new ProducerCache();
-    // TODO: why do we have endpoint cache as camel context also have endpoint cache?
-    private final Map<String, Endpoint> endpointCache = new HashMap<String, Endpoint>();
-    private boolean useEndpointCache = true;
     private Endpoint defaultEndpoint;
     private ExecutorService executor;
 
@@ -367,23 +364,8 @@
         setDefaultEndpoint(getContext().getEndpoint(endpointUri));
     }
 
-    public boolean isUseEndpointCache() {
-        return useEndpointCache;
-    }
-
-    public void setUseEndpointCache(boolean useEndpointCache) {
-        this.useEndpointCache = useEndpointCache;
-    }
-
     public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
-        Endpoint e;
-        synchronized (endpointCache) {
-            e = endpointCache.get(endpointUri);
-        }
-        if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
-            return expectedClass.asSubclass(expectedClass).cast(e);
-        }
-        return null;
+        return context.getEndpoint(endpointUri, expectedClass);
     }
 
     // Implementation methods
@@ -420,21 +402,7 @@
     }
 
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
-        Endpoint endpoint;
-
-        if (isUseEndpointCache()) {
-            synchronized (endpointCache) {
-                endpoint = endpointCache.get(endpointUri);
-                if (endpoint == null) {
-                    endpoint = context.getEndpoint(endpointUri);
-                    if (endpoint != null) {
-                        endpointCache.put(endpointUri, endpoint);
-                    }
-                }
-            }
-        } else {
-            endpoint = context.getEndpoint(endpointUri);
-        }
+        Endpoint endpoint = context.getEndpoint(endpointUri);
         if (endpoint == null) {
             throw new NoSuchEndpointException(endpointUri);
         }
@@ -453,7 +421,6 @@
 
     protected void doStop() throws Exception {
         producerCache.stop();
-        endpointCache.clear();
         if (executor != null) {
             executor.shutdown();
         }
@@ -571,4 +538,4 @@
         return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
     }
 
-}
\ No newline at end of file
+}

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java Mon May 25 13:22:02 2009
@@ -129,6 +129,10 @@
                 }
             }
 
+            public boolean isSingleton() {
+                return producer.isSingleton();
+            }
+
             public void start() throws Exception {
                 ServiceHelper.startService(detour);
             }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Mon May 25 13:22:02 2009
@@ -25,6 +25,8 @@
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.IsSingleton;
+import org.apache.camel.ProducerCallback;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,8 @@
 
     private final Map<String, Producer> producers = new HashMap<String, Producer>();
 
+    // TODO: Consider a pool for non singleton producers to leverage in the doInProducer template
+
     public synchronized Producer getProducer(Endpoint endpoint) {
         String key = endpoint.getEndpointUri();
         Producer answer = producers.get(key);
@@ -50,7 +54,23 @@
             } catch (Exception e) {
                 throw new FailedToCreateProducerException(endpoint, e);
             }
-            producers.put(key, answer);
+
+            // only add singletons to the cache
+            boolean singleton = true;
+            if (answer instanceof IsSingleton) {
+                singleton = ((IsSingleton)answer).isSingleton();
+            }
+
+            if (singleton) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
+                }
+                producers.put(key, answer);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Producer for endpoint: " + key + " is not singleton and thus not added to producer cache");
+                }
+            }
         }
         return answer;
     }
@@ -63,8 +83,7 @@
      */
     public void send(Endpoint endpoint, Exchange exchange) {
         try {
-            Producer producer = getProducer(endpoint);
-            producer.process(exchange);
+            sendExchange(endpoint, null, null, exchange);
         } catch (Exception e) {
             throw wrapRuntimeCamelException(e);
         }
@@ -76,12 +95,11 @@
      *
      * @param endpoint the endpoint to send the exchange to
      * @param processor the transformer used to populate the new exchange
+     * @return the exchange
      */
     public Exchange send(Endpoint endpoint, Processor processor) {
         try {
-            Producer producer = getProducer(endpoint);
-            Exchange exchange = producer.createExchange();
-            return sendExchange(endpoint, producer, processor, exchange);
+            return sendExchange(endpoint, null, processor, null);
         } catch (Exception e) {
             throw wrapRuntimeCamelException(e);
         }
@@ -95,28 +113,76 @@
      * @param pattern the message {@link ExchangePattern} such as
      *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
      * @param processor the transformer used to populate the new exchange
+     * @return the exchange
      */
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
         try {
-            Producer producer = getProducer(endpoint);
-            Exchange exchange = producer.createExchange(pattern);
-            return sendExchange(endpoint, producer, processor, exchange);
+            return sendExchange(endpoint, pattern, processor, null);
         } catch (Exception e) {
             throw wrapRuntimeCamelException(e);
         }
     }
 
 
-    protected Exchange sendExchange(Endpoint endpoint, Producer producer, Processor processor, Exchange exchange) throws Exception {
-        // lets populate using the processor callback
-        processor.process(exchange);
-
-        // now lets dispatch
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(">>>> " + endpoint + " " + exchange);
+    /**
+     * Sends an exchange to an endpoint using a supplied callback
+     *
+     * @param endpoint  the endpoint to send the exchange to
+     * @param exchange  the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
+     * @param pattern   the exchange pattern, can be <tt>null</tt>
+     * @param callback  the callback
+     * @return the response from the callback
+     * @throws Exception if an internal processing error has occurred.
+     */
+    public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
+        // get or create the producer
+        Producer producer = getProducer(endpoint);
+
+        if (producer == null) {
+            if (isStopped()) {
+                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
+                return null;
+            } else {
+                throw new IllegalStateException("No producer, this processor has not been started: " + this);
+            }
         }
-        producer.process(exchange);
-        return exchange;
+
+        try {
+            // invoke the callback
+            return callback.doInProducer(producer, exchange, pattern);
+        } finally {
+            // stop non singleton producers as we should not leak resources
+            boolean singleton = true;
+            if (producer instanceof IsSingleton) {
+                singleton = ((IsSingleton)producer).isSingleton();
+            }
+            if (!singleton) {
+                producer.stop();
+            }
+        }
+    }
+
+    protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
+                                    final Processor processor, Exchange exchange) throws Exception {
+        return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
+            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+                if (exchange == null) {
+                    exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
+                }
+
+                if (processor != null) {
+                    // lets populate using the processor callback
+                    processor.process(exchange);
+                }
+
+                // now lets dispatch
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(">>>> " + endpoint + " " + exchange);
+                }
+                producer.process(exchange);
+                return exchange;
+            }
+        });
     }
 
     protected void doStop() throws Exception {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Mon May 25 13:22:02 2009
@@ -21,9 +21,10 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
+import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -34,8 +35,8 @@
  */
 public class SendProcessor extends ServiceSupport implements Processor {
     protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
+    protected final ProducerCache producerCache = new ProducerCache();
     protected Endpoint destination;
-    protected Producer producer;
     protected ExchangePattern pattern;
 
     public SendProcessor(Endpoint destination) {
@@ -53,38 +54,37 @@
         return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")";
     }
 
-    public void process(Exchange exchange) throws Exception {
-        if (producer == null) {
-            if (isStopped()) {
-                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
-            } else {
-                throw new IllegalStateException("No producer, this processor has not been started: " + this);
+    public void process(final Exchange exchange) throws Exception {
+        producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+                exchange = configureExchange(exchange, pattern);
+                producer.process(exchange);
+                return exchange;
             }
-        } else {
-            exchange = configureExchange(exchange);
-            producer.process(exchange);
-        }
+        });
     }
 
     public Endpoint getDestination() {
         return destination;
     }
 
-    protected void doStart() throws Exception {
-        this.producer = destination.createProducer();
-        ServiceHelper.startService(this.producer);
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(this.producer);
-        this.producer = null;
+    protected Producer getProducer() {
+        return producerCache.getProducer(destination);
     }
 
-    protected Exchange configureExchange(Exchange exchange) {
+    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
         if (pattern != null) {
             exchange.setPattern(pattern);
         }
         return exchange;
     }
 
+    protected void doStart() throws Exception {
+        producerCache.start();
+    }
+
+    protected void doStop() throws Exception {
+        producerCache.stop();
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Mon May 25 13:22:02 2009
@@ -24,6 +24,8 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -69,16 +71,13 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        if (producer == null) {
-            if (isStopped()) {
-                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
-            } else {
-                throw new IllegalStateException("No producer, this processor has not been started!");
+        producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+            public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+                Exchange wireTapExchange = configureExchange(exchange, pattern);
+                procesWireTap(producer, wireTapExchange);
+                return wireTapExchange;
             }
-        } else {
-            Exchange wireTapExchange = configureExchange(exchange);
-            procesWireTap(wireTapExchange);
-        }
+        });
     }
 
     /**
@@ -86,7 +85,7 @@
      *
      * @param exchange  the exchange to wire tap
      */
-    protected void procesWireTap(final Exchange exchange) {
+    protected void procesWireTap(final Producer producer, final Exchange exchange) {
         // use submit instead of execute to force it to use a new thread, execute might
         // decide to use current thread, so we must submit a new task
         // as we dont care for the response we dont hold the future object and wait for the result
@@ -102,7 +101,7 @@
     }
 
     @Override
-    protected Exchange configureExchange(Exchange exchange) {
+    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
         if (newExchangeProcessor == null && newExchangeExpression == null) {
             // use a copy of the original exchange
             return configureCopyExchange(exchange);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java Mon May 25 13:22:02 2009
@@ -45,29 +45,26 @@
         endpointB = getMandatoryEndpoint("seda:test.b", SedaEndpoint.class);
         endpointC = getMandatoryEndpoint("seda:test.C", SedaEndpoint.class);
 
-        assertProducerAndConsumerCounts("endpointA", endpointA, 0, 1);
-        assertProducerAndConsumerCounts("endpointB", endpointB, 1, 0);
-        assertProducerAndConsumerCounts("endpointC", endpointC, 0, 0);
+        // send from A over B to results
+        MockEndpoint results = getMockEndpoint("mock:results");
+        results.expectedBodiesReceived(expectedBody);
 
-        context.stopRoute(route);
+        template.sendBody(endpointA, expectedBody);
 
-        assertProducerAndConsumerCounts("endpointA", endpointA, 0, 0);
-        assertProducerAndConsumerCounts("endpointB", endpointB, 0, 0);
-        assertProducerAndConsumerCounts("endpointC", endpointC, 0, 0);
+        assertMockEndpointsSatisfied();
 
+        // stop the route
+        context.stopRoute(route);
 
         // lets mutate the route...
         FromDefinition fromType = assertOneElement(route.getInputs());
         fromType.setUri("seda:test.C");
         context.startRoute(route);
 
-        assertProducerAndConsumerCounts("endpointA", endpointA, 0, 0);
-        assertProducerAndConsumerCounts("endpointB", endpointB, 1, 0);
-        assertProducerAndConsumerCounts("endpointC", endpointC, 0, 1);
-
-
         // now lets check it works
-        MockEndpoint results = getMockEndpoint("mock:results");
+        // send from C over B to results
+        results.reset();
+        results = getMockEndpoint("mock:results");
         results.expectedBodiesReceived(expectedBody);
 
         template.sendBody(endpointC, expectedBody);
@@ -75,11 +72,6 @@
         assertMockEndpointsSatisfied();
     }
 
-    protected void assertProducerAndConsumerCounts(String name, SedaEndpoint endpoint, int producerCount, int consumerCount) {
-        assertCollectionSize("Producers for " + name, endpoint.getProducers(), producerCount);
-        assertCollectionSize("Consumers for " + name, endpoint.getConsumers(), consumerCount);
-    }
-
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java Mon May 25 13:22:02 2009
@@ -114,6 +114,10 @@
         processor.process(exchange);
     }
 
+    public boolean isSingleton() {
+        return true;
+    }
+
     public void start() throws Exception {
         producer.start();
     }

Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Mon May 25 13:22:02 2009
@@ -226,7 +226,9 @@
         if (methodToUse.isEntityEnclosing()) {
             ((EntityEnclosingMethod)method).setRequestEntity(requestEntity);
             if (requestEntity.getContentType() == null) {
-                LOG.warn("Missing the ContentType in the request entity for the URI " + uri + ". The method is " + method);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No Content-Type provided for URI: " + uri + " with exchange: " + exchange);
+                }
             }
         }
 

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Mon May 25 13:22:02 2009
@@ -58,6 +58,13 @@
         this.sync = endpoint.getConfiguration().isSync();
     }
 
+    @Override
+    public boolean isSingleton() {
+        // the producer should not be singleton otherwise cannot use concurrent producers and safely
+        // use request/reply with correct correlation
+        return false;
+    }
+
     public void process(Exchange exchange) throws Exception {
         if (session == null && !lazySessionCreation) {
             throw new IllegalStateException("Not started yet!");

Added: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java?rev=778418&view=auto
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java (added)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java Mon May 25 13:22:02 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class MinaProducerConcurrentTest extends ContextTestSupport {
+
+    public void testNoConcurrentProducers() throws Exception {
+        doSendMessages(1, 1);
+    }
+
+    public void testConcurrentProducers() throws Exception {
+        doSendMessages(10, 5);
+    }
+
+    private void doSendMessages(int files, int poolSize) throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(files);
+
+        ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+        Map<Integer, Future> responses = new ConcurrentHashMap();
+        for (int i = 0; i < files; i++) {
+            final int index = i;
+            Future out = executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    return template.requestBody("mina:tcp://localhost:8080?sync=true", index, String.class);
+                }
+            });
+            responses.put(index, out);
+        }
+
+        assertMockEndpointsSatisfied();
+        assertEquals(files, responses.size());
+
+        // get all responses
+        Set unique = new HashSet();
+        for (Future future : responses.values()) {
+            unique.add(future.get());
+        }
+
+        // should be 10 unique responses
+        assertEquals("Should be " + files + " unique responses", files, unique.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("mina:tcp://localhost:8080?sync=true").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        exchange.getOut().setBody("Bye " + body);
+                    }
+                }).to("mock:result");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Mon May 25 13:22:02 2009
@@ -20,17 +20,16 @@
 #
 log4j.rootLogger=INFO, file
 
+#log4j.logger.org.apache.camel.component.mina=DEBUG
+
 # CONSOLE appender not used by default
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 
 # File appender
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
 log4j.appender.file.file=target/camel-mina-test.log
-
-# debug logging for camel-mina
-log4j.logger.org.apache.camel.component.mina=DEBUG
-#log4j.logger.org.apache.camel=DEBUG
+log4j.appender.file.append=false