You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/21 09:54:37 UTC
svn commit: r966129 - in
/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms:
JmsEndpoint.java JmsProducer.java requestor/Requestor.java
Author: davsclaus
Date: Wed Jul 21 07:54:37 2010
New Revision: 966129
URL: http://svn.apache.org/viewvc?rev=966129&view=rev
Log:
CAMEL-2970: Preperaing JmsProducer to support async routing engine for InOut MEPs.
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Jul 21 07:54:37 2010
@@ -54,9 +54,6 @@ import org.springframework.transaction.P
*/
@ManagedResource(description = "Managed JMS Endpoint")
public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport {
- private static final int DEFAULT_THREADPOOL_SIZE = 100;
-
- private ScheduledExecutorService scheduledExecutorService;
private HeaderFilterStrategy headerFilterStrategy;
private boolean pubSubDomain;
private JmsBinding binding;
@@ -65,6 +62,7 @@ public class JmsEndpoint extends Default
private String selector;
private JmsConfiguration configuration;
private Requestor requestor;
+ private ScheduledExecutorService requestorExecutorService;
public JmsEndpoint() {
this(null, null);
@@ -288,7 +286,7 @@ public class JmsEndpoint extends Default
public synchronized Requestor getRequestor() throws Exception {
if (requestor == null) {
- requestor = new Requestor(getConfiguration(), getScheduledExecutorService());
+ requestor = new Requestor(getConfiguration(), getRequestorExecutorService());
requestor.start();
}
return requestor;
@@ -298,18 +296,6 @@ public class JmsEndpoint extends Default
this.requestor = requestor;
}
- public synchronized ScheduledExecutorService getScheduledExecutorService() {
- if (scheduledExecutorService == null) {
- scheduledExecutorService = getCamelContext().getExecutorServiceStrategy()
- .newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
- }
- return scheduledExecutorService;
- }
-
- public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.scheduledExecutorService = scheduledExecutorService;
- }
-
public boolean isPubSubDomain() {
return pubSubDomain;
}
@@ -357,6 +343,13 @@ public class JmsEndpoint extends Default
return template;
}
+ protected synchronized ScheduledExecutorService getRequestorExecutorService() {
+ if (requestorExecutorService == null) {
+ requestorExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsRequesterTimeoutTask", 1);
+ }
+ return requestorExecutorService;
+ }
+
// Delegated properties from the configuration
//-------------------------------------------------------------------------
@ManagedAttribute
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Wed Jul 21 07:54:37 2010
@@ -26,6 +26,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.FailedToCreateProducerException;
@@ -37,7 +38,7 @@ import org.apache.camel.component.jms.re
import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
import org.apache.camel.component.jms.requestor.Requestor;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.UuidGenerator;
import org.apache.camel.util.ValueHolder;
import org.apache.commons.logging.Log;
@@ -48,7 +49,7 @@ import org.springframework.jms.core.Mess
/**
* @version $Revision$
*/
-public class JmsProducer extends DefaultProducer {
+public class JmsProducer extends DefaultAsyncProducer {
private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
private RequestorAffinity affinity;
private final JmsEndpoint endpoint;
@@ -100,11 +101,11 @@ public class JmsProducer extends Default
try {
JmsConfiguration c = endpoint.getConfiguration();
if (c.getReplyTo() != null) {
- requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService());
+ requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
requestor.start();
} else {
if (affinity == RequestorAffinity.PER_PRODUCER) {
- requestor = new Requestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService());
+ requestor = new Requestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
requestor.start();
} else if (affinity == RequestorAffinity.PER_ENDPOINT) {
requestor = endpoint.getRequestor();
@@ -141,17 +142,17 @@ public class JmsProducer extends Default
super.doStop();
}
- public void process(final Exchange exchange) {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) {
// in out requires a bit more work than in only
- processInOut(exchange);
+ return processInOut(exchange, callback);
} else {
// in only
- processInOnly(exchange);
+ return processInOnly(exchange, callback);
}
}
- protected void processInOut(final Exchange exchange) {
+ protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) {
final org.apache.camel.Message in = exchange.getIn();
String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
@@ -190,7 +191,7 @@ public class JmsProducer extends Default
}
final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
- final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
+ final DeferredMessageSentCallback jmsCallback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
@@ -201,18 +202,24 @@ public class JmsProducer extends Default
FutureTask future;
future = (!msgIdAsCorrId)
? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
- : requestor.getReceiveFuture(callback);
+ : requestor.getReceiveFuture(jmsCallback);
futureHolder.set(future);
return message;
}
};
- doSend(true, destinationName, destination, messageCreator, callback);
+ doSend(true, destinationName, destination, messageCreator, jmsCallback);
// after sending then set the OUT message id to the JMSMessageID so its identical
setMessageId(exchange);
+ // now we should routing asynchronously to not block while waiting for the reply
+ // TODO:
+ // we need a thread pool to use for continue routing messages, just like a seda consumer
+ // and we need options to configure it as well so you can indicate how many threads to use
+ // TODO: Also consider requestTimeout
+
// lets wait and return the response
long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
try {
@@ -268,9 +275,12 @@ public class JmsProducer extends Default
exchange.setException(e);
}
+ // TODO: should be async
+ callback.done(true);
+ return true;
}
- protected void processInOnly(final Exchange exchange) {
+ protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
final org.apache.camel.Message in = exchange.getIn();
String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
@@ -321,6 +331,10 @@ public class JmsProducer extends Default
// after sending then set the OUT message id to the JMSMessageID so its identical
setMessageId(exchange);
+
+ // we are synchronous so return true
+ callback.done(true);
+ return true;
}
/**
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Wed Jul 21 07:54:37 2010
@@ -20,7 +20,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
-
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -35,7 +34,6 @@ import org.apache.camel.component.jms.re
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.DefaultTimeoutMap;
import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.UuidGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
@@ -49,9 +47,7 @@ import org.springframework.jms.support.d
*/
public class Requestor extends ServiceSupport implements MessageListener {
private static final transient Log LOG = LogFactory.getLog(Requestor.class);
- private static UuidGenerator uuidGenerator;
private final JmsConfiguration configuration;
- private ScheduledExecutorService executorService;
private AbstractMessageListenerContainer listenerContainer;
private TimeoutMap<String, Object> requestMap;
private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
@@ -61,14 +57,15 @@ public class Requestor extends ServiceSu
private long maxRequestTimeout = -1;
private long replyToResolverTimeout = 5000;
+ // TODO: Use a Task queue to transfer replies arriving in onMessage
+ // instead of using the FutureHandle to support async routing
public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
this.configuration = configuration;
- this.executorService = executorService;
- requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
- producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
- deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
- deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ this.requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ this.producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
+ this.deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ this.deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
}
public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer) {
@@ -261,13 +258,6 @@ public class Requestor extends ServiceSu
return answer;
}
- public static synchronized UuidGenerator getUuidGenerator() {
- if (uuidGenerator == null) {
- uuidGenerator = UuidGenerator.get();
- }
- return uuidGenerator;
- }
-
protected JmsConfiguration getConfiguration() {
return configuration;
}