You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/25 15:22:03 UTC
svn commit: r778418 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/impl/ components/camel-cxf/...
Author: davsclaus
Date: Mon May 25 13:22:02 2009
New Revision: 778418
URL: http://svn.apache.org/viewvc?rev=778418&view=rev
Log:
CAMEL-1644: Added scope for Producer so we can have singleton/prototype producers. Allow to fix thread safety issues.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java (with props)
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java (with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java
camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/resources/log4j.properties
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java Mon May 25 13:22:02 2009
@@ -22,7 +22,7 @@
*
* @version $Revision$
*/
-public interface Producer extends Processor, Service {
+public interface Producer extends Processor, Service, IsSingleton {
/**
* Gets the endpoint this producer sends to.
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java?rev=778418&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java Mon May 25 13:22:02 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * Callback for sending a exchange message to a endpoint using a producer.
+ * <p/>
+ * Using this callback as a template pattern ensures that Camel handles the resource handling and will
+ * start and stop the given producer, to avoid resource leaks.
+ *
+ * @version $Revision$
+ */
+public interface ProducerCallback<T> {
+
+ /**
+ * Performs operation on the given producer to send the given exchange.
+ *
+ * @param producer the producer, is newer <tt>null</tt>
+ * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
+ * @param exchangePattern the exchange pattern, can be <tt>null</tt>
+ * @return the response
+ * @throws Exception if an internal processing error has occurred.
+ */
+ T doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception;
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerCallback.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java Mon May 25 13:22:02 2009
@@ -23,11 +23,11 @@
import org.apache.camel.Exchange;
import org.apache.camel.FailedToCreateConsumerException;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.IsSingleton;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* Cache containing created {@link org.apache.camel.Consumer}.
*
@@ -48,7 +48,22 @@
} catch (Exception e) {
throw new FailedToCreateConsumerException(endpoint, e);
}
- consumers.put(key, answer);
+
+ boolean singleton = true;
+ if (answer instanceof IsSingleton) {
+ singleton = ((IsSingleton)answer).isSingleton();
+ }
+
+ if (singleton) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding to consumer cache with key: " + endpoint + " for consumer: " + answer);
+ }
+ consumers.put(key, answer);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer for endpoint: " + key + " is not singleton and thus not added to producer cache");
+ }
+ }
}
return answer;
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Mon May 25 13:22:02 2009
@@ -54,6 +54,10 @@
return endpoint.createExchange(exchange);
}
+ public boolean isSingleton() {
+ return true;
+ }
+
protected void doStart() throws Exception {
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Mon May 25 13:22:02 2009
@@ -49,9 +49,6 @@
private static final int DEFAULT_THREADPOOL_SIZE = 5;
private final CamelContext context;
private final ProducerCache producerCache = new ProducerCache();
- // TODO: why do we have endpoint cache as camel context also have endpoint cache?
- private final Map<String, Endpoint> endpointCache = new HashMap<String, Endpoint>();
- private boolean useEndpointCache = true;
private Endpoint defaultEndpoint;
private ExecutorService executor;
@@ -367,23 +364,8 @@
setDefaultEndpoint(getContext().getEndpoint(endpointUri));
}
- public boolean isUseEndpointCache() {
- return useEndpointCache;
- }
-
- public void setUseEndpointCache(boolean useEndpointCache) {
- this.useEndpointCache = useEndpointCache;
- }
-
public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
- Endpoint e;
- synchronized (endpointCache) {
- e = endpointCache.get(endpointUri);
- }
- if (e != null && expectedClass.isAssignableFrom(e.getClass())) {
- return expectedClass.asSubclass(expectedClass).cast(e);
- }
- return null;
+ return context.getEndpoint(endpointUri, expectedClass);
}
// Implementation methods
@@ -420,21 +402,7 @@
}
protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
- Endpoint endpoint;
-
- if (isUseEndpointCache()) {
- synchronized (endpointCache) {
- endpoint = endpointCache.get(endpointUri);
- if (endpoint == null) {
- endpoint = context.getEndpoint(endpointUri);
- if (endpoint != null) {
- endpointCache.put(endpointUri, endpoint);
- }
- }
- }
- } else {
- endpoint = context.getEndpoint(endpointUri);
- }
+ Endpoint endpoint = context.getEndpoint(endpointUri);
if (endpoint == null) {
throw new NoSuchEndpointException(endpointUri);
}
@@ -453,7 +421,6 @@
protected void doStop() throws Exception {
producerCache.stop();
- endpointCache.clear();
if (executor != null) {
executor.shutdown();
}
@@ -571,4 +538,4 @@
return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
}
-}
\ No newline at end of file
+}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java Mon May 25 13:22:02 2009
@@ -129,6 +129,10 @@
}
}
+ public boolean isSingleton() {
+ return producer.isSingleton();
+ }
+
public void start() throws Exception {
ServiceHelper.startService(detour);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Mon May 25 13:22:02 2009
@@ -25,6 +25,8 @@
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.IsSingleton;
+import org.apache.camel.ProducerCallback;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,8 @@
private final Map<String, Producer> producers = new HashMap<String, Producer>();
+ // TODO: Consider a pool for non singleton producers to leverage in the doInProducer template
+
public synchronized Producer getProducer(Endpoint endpoint) {
String key = endpoint.getEndpointUri();
Producer answer = producers.get(key);
@@ -50,7 +54,23 @@
} catch (Exception e) {
throw new FailedToCreateProducerException(endpoint, e);
}
- producers.put(key, answer);
+
+ // only add singletons to the cache
+ boolean singleton = true;
+ if (answer instanceof IsSingleton) {
+ singleton = ((IsSingleton)answer).isSingleton();
+ }
+
+ if (singleton) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding to producer cache with key: " + endpoint + " for producer: " + answer);
+ }
+ producers.put(key, answer);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Producer for endpoint: " + key + " is not singleton and thus not added to producer cache");
+ }
+ }
}
return answer;
}
@@ -63,8 +83,7 @@
*/
public void send(Endpoint endpoint, Exchange exchange) {
try {
- Producer producer = getProducer(endpoint);
- producer.process(exchange);
+ sendExchange(endpoint, null, null, exchange);
} catch (Exception e) {
throw wrapRuntimeCamelException(e);
}
@@ -76,12 +95,11 @@
*
* @param endpoint the endpoint to send the exchange to
* @param processor the transformer used to populate the new exchange
+ * @return the exchange
*/
public Exchange send(Endpoint endpoint, Processor processor) {
try {
- Producer producer = getProducer(endpoint);
- Exchange exchange = producer.createExchange();
- return sendExchange(endpoint, producer, processor, exchange);
+ return sendExchange(endpoint, null, processor, null);
} catch (Exception e) {
throw wrapRuntimeCamelException(e);
}
@@ -95,28 +113,76 @@
* @param pattern the message {@link ExchangePattern} such as
* {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
* @param processor the transformer used to populate the new exchange
+ * @return the exchange
*/
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
try {
- Producer producer = getProducer(endpoint);
- Exchange exchange = producer.createExchange(pattern);
- return sendExchange(endpoint, producer, processor, exchange);
+ return sendExchange(endpoint, pattern, processor, null);
} catch (Exception e) {
throw wrapRuntimeCamelException(e);
}
}
- protected Exchange sendExchange(Endpoint endpoint, Producer producer, Processor processor, Exchange exchange) throws Exception {
- // lets populate using the processor callback
- processor.process(exchange);
-
- // now lets dispatch
- if (LOG.isDebugEnabled()) {
- LOG.debug(">>>> " + endpoint + " " + exchange);
+ /**
+ * Sends an exchange to an endpoint using a supplied callback
+ *
+ * @param endpoint the endpoint to send the exchange to
+ * @param exchange the exchange, can be <tt>null</tt> if so then create a new exchange from the producer
+ * @param pattern the exchange pattern, can be <tt>null</tt>
+ * @param callback the callback
+ * @return the response from the callback
+ * @throws Exception if an internal processing error has occurred.
+ */
+ public <T> T doInProducer(Endpoint endpoint, Exchange exchange, ExchangePattern pattern, ProducerCallback<T> callback) throws Exception {
+ // get or create the producer
+ Producer producer = getProducer(endpoint);
+
+ if (producer == null) {
+ if (isStopped()) {
+ LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
+ return null;
+ } else {
+ throw new IllegalStateException("No producer, this processor has not been started: " + this);
+ }
}
- producer.process(exchange);
- return exchange;
+
+ try {
+ // invoke the callback
+ return callback.doInProducer(producer, exchange, pattern);
+ } finally {
+ // stop non singleton producers as we should not leak resources
+ boolean singleton = true;
+ if (producer instanceof IsSingleton) {
+ singleton = ((IsSingleton)producer).isSingleton();
+ }
+ if (!singleton) {
+ producer.stop();
+ }
+ }
+ }
+
+ protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
+ final Processor processor, Exchange exchange) throws Exception {
+ return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() {
+ public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+ if (exchange == null) {
+ exchange = pattern != null ? producer.createExchange(pattern) : producer.createExchange();
+ }
+
+ if (processor != null) {
+ // lets populate using the processor callback
+ processor.process(exchange);
+ }
+
+ // now lets dispatch
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(">>>> " + endpoint + " " + exchange);
+ }
+ producer.process(exchange);
+ return exchange;
+ }
+ });
}
protected void doStop() throws Exception {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Mon May 25 13:22:02 2009
@@ -21,9 +21,10 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
+import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,8 +35,8 @@
*/
public class SendProcessor extends ServiceSupport implements Processor {
protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
+ protected final ProducerCache producerCache = new ProducerCache();
protected Endpoint destination;
- protected Producer producer;
protected ExchangePattern pattern;
public SendProcessor(Endpoint destination) {
@@ -53,38 +54,37 @@
return "sendTo(" + destination + (pattern != null ? " " + pattern : "") + ")";
}
- public void process(Exchange exchange) throws Exception {
- if (producer == null) {
- if (isStopped()) {
- LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
- } else {
- throw new IllegalStateException("No producer, this processor has not been started: " + this);
+ public void process(final Exchange exchange) throws Exception {
+ producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+ public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+ exchange = configureExchange(exchange, pattern);
+ producer.process(exchange);
+ return exchange;
}
- } else {
- exchange = configureExchange(exchange);
- producer.process(exchange);
- }
+ });
}
public Endpoint getDestination() {
return destination;
}
- protected void doStart() throws Exception {
- this.producer = destination.createProducer();
- ServiceHelper.startService(this.producer);
- }
-
- protected void doStop() throws Exception {
- ServiceHelper.stopService(this.producer);
- this.producer = null;
+ protected Producer getProducer() {
+ return producerCache.getProducer(destination);
}
- protected Exchange configureExchange(Exchange exchange) {
+ protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
if (pattern != null) {
exchange.setPattern(pattern);
}
return exchange;
}
+ protected void doStart() throws Exception {
+ producerCache.start();
+ }
+
+ protected void doStop() throws Exception {
+ producerCache.stop();
+ }
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Mon May 25 13:22:02 2009
@@ -24,6 +24,8 @@
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.ProducerCallback;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -69,16 +71,13 @@
}
public void process(Exchange exchange) throws Exception {
- if (producer == null) {
- if (isStopped()) {
- LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
- } else {
- throw new IllegalStateException("No producer, this processor has not been started!");
+ producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
+ public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
+ Exchange wireTapExchange = configureExchange(exchange, pattern);
+ procesWireTap(producer, wireTapExchange);
+ return wireTapExchange;
}
- } else {
- Exchange wireTapExchange = configureExchange(exchange);
- procesWireTap(wireTapExchange);
- }
+ });
}
/**
@@ -86,7 +85,7 @@
*
* @param exchange the exchange to wire tap
*/
- protected void procesWireTap(final Exchange exchange) {
+ protected void procesWireTap(final Producer producer, final Exchange exchange) {
// use submit instead of execute to force it to use a new thread, execute might
// decide to use current thread, so we must submit a new task
// as we dont care for the response we dont hold the future object and wait for the result
@@ -102,7 +101,7 @@
}
@Override
- protected Exchange configureExchange(Exchange exchange) {
+ protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern) {
if (newExchangeProcessor == null && newExchangeExpression == null) {
// use a copy of the original exchange
return configureCopyExchange(exchange);
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/StartAndStopRoutesTest.java Mon May 25 13:22:02 2009
@@ -45,29 +45,26 @@
endpointB = getMandatoryEndpoint("seda:test.b", SedaEndpoint.class);
endpointC = getMandatoryEndpoint("seda:test.C", SedaEndpoint.class);
- assertProducerAndConsumerCounts("endpointA", endpointA, 0, 1);
- assertProducerAndConsumerCounts("endpointB", endpointB, 1, 0);
- assertProducerAndConsumerCounts("endpointC", endpointC, 0, 0);
+ // send from A over B to results
+ MockEndpoint results = getMockEndpoint("mock:results");
+ results.expectedBodiesReceived(expectedBody);
- context.stopRoute(route);
+ template.sendBody(endpointA, expectedBody);
- assertProducerAndConsumerCounts("endpointA", endpointA, 0, 0);
- assertProducerAndConsumerCounts("endpointB", endpointB, 0, 0);
- assertProducerAndConsumerCounts("endpointC", endpointC, 0, 0);
+ assertMockEndpointsSatisfied();
+ // stop the route
+ context.stopRoute(route);
// lets mutate the route...
FromDefinition fromType = assertOneElement(route.getInputs());
fromType.setUri("seda:test.C");
context.startRoute(route);
- assertProducerAndConsumerCounts("endpointA", endpointA, 0, 0);
- assertProducerAndConsumerCounts("endpointB", endpointB, 1, 0);
- assertProducerAndConsumerCounts("endpointC", endpointC, 0, 1);
-
-
// now lets check it works
- MockEndpoint results = getMockEndpoint("mock:results");
+ // send from C over B to results
+ results.reset();
+ results = getMockEndpoint("mock:results");
results.expectedBodiesReceived(expectedBody);
template.sendBody(endpointC, expectedBody);
@@ -75,11 +72,6 @@
assertMockEndpointsSatisfied();
}
- protected void assertProducerAndConsumerCounts(String name, SedaEndpoint endpoint, int producerCount, int consumerCount) {
- assertCollectionSize("Producers for " + name, endpoint.getProducers(), producerCount);
- assertCollectionSize("Consumers for " + name, endpoint.getConsumers(), consumerCount);
- }
-
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
Modified: camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java (original)
+++ camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfSoapProducer.java Mon May 25 13:22:02 2009
@@ -114,6 +114,10 @@
processor.process(exchange);
}
+ public boolean isSingleton() {
+ return true;
+ }
+
public void start() throws Exception {
producer.start();
}
Modified: camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java (original)
+++ camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java Mon May 25 13:22:02 2009
@@ -226,7 +226,9 @@
if (methodToUse.isEntityEnclosing()) {
((EntityEnclosingMethod)method).setRequestEntity(requestEntity);
if (requestEntity.getContentType() == null) {
- LOG.warn("Missing the ContentType in the request entity for the URI " + uri + ". The method is " + method);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No Content-Type provided for URI: " + uri + " with exchange: " + exchange);
+ }
}
}
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Mon May 25 13:22:02 2009
@@ -58,6 +58,13 @@
this.sync = endpoint.getConfiguration().isSync();
}
+ @Override
+ public boolean isSingleton() {
+ // the producer should not be singleton otherwise cannot use concurrent producers and safely
+ // use request/reply with correct correlation
+ return false;
+ }
+
public void process(Exchange exchange) throws Exception {
if (session == null && !lazySessionCreation) {
throw new IllegalStateException("Not started yet!");
Added: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java?rev=778418&view=auto
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java (added)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java Mon May 25 13:22:02 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mina;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class MinaProducerConcurrentTest extends ContextTestSupport {
+
+ public void testNoConcurrentProducers() throws Exception {
+ doSendMessages(1, 1);
+ }
+
+ public void testConcurrentProducers() throws Exception {
+ doSendMessages(10, 5);
+ }
+
+ private void doSendMessages(int files, int poolSize) throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(files);
+
+ ExecutorService executor = Executors.newFixedThreadPool(poolSize);
+ Map<Integer, Future> responses = new ConcurrentHashMap();
+ for (int i = 0; i < files; i++) {
+ final int index = i;
+ Future out = executor.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ return template.requestBody("mina:tcp://localhost:8080?sync=true", index, String.class);
+ }
+ });
+ responses.put(index, out);
+ }
+
+ assertMockEndpointsSatisfied();
+ assertEquals(files, responses.size());
+
+ // get all responses
+ Set unique = new HashSet();
+ for (Future future : responses.values()) {
+ unique.add(future.get());
+ }
+
+ // should be 10 unique responses
+ assertEquals("Should be " + files + " unique responses", files, unique.size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("mina:tcp://localhost:8080?sync=true").process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ String body = exchange.getIn().getBody(String.class);
+ exchange.getOut().setBody("Bye " + body);
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaProducerConcurrentTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-mina/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/resources/log4j.properties?rev=778418&r1=778417&r2=778418&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mina/src/test/resources/log4j.properties Mon May 25 13:22:02 2009
@@ -20,17 +20,16 @@
#
log4j.rootLogger=INFO, file
+#log4j.logger.org.apache.camel.component.mina=DEBUG
+
# CONSOLE appender not used by default
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.file.file=target/camel-mina-test.log
-
-# debug logging for camel-mina
-log4j.logger.org.apache.camel.component.mina=DEBUG
-#log4j.logger.org.apache.camel=DEBUG
+log4j.appender.file.append=false