You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/07/21 09:54:37 UTC

svn commit: r966129 - in /camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms: JmsEndpoint.java JmsProducer.java requestor/Requestor.java

Author: davsclaus
Date: Wed Jul 21 07:54:37 2010
New Revision: 966129

URL: http://svn.apache.org/viewvc?rev=966129&view=rev
Log:
CAMEL-2970: Preperaing JmsProducer to support async routing engine for InOut MEPs.

Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Jul 21 07:54:37 2010
@@ -54,9 +54,6 @@ import org.springframework.transaction.P
  */
 @ManagedResource(description = "Managed JMS Endpoint")
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint>, MultipleConsumersSupport {
-    private static final int DEFAULT_THREADPOOL_SIZE = 100;
-
-    private ScheduledExecutorService scheduledExecutorService;
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -65,6 +62,7 @@ public class JmsEndpoint extends Default
     private String selector;
     private JmsConfiguration configuration;
     private Requestor requestor;
+    private ScheduledExecutorService requestorExecutorService;
 
     public JmsEndpoint() {
         this(null, null);
@@ -288,7 +286,7 @@ public class JmsEndpoint extends Default
 
     public synchronized Requestor getRequestor() throws Exception {
         if (requestor == null) {
-            requestor = new Requestor(getConfiguration(), getScheduledExecutorService());
+            requestor = new Requestor(getConfiguration(), getRequestorExecutorService());
             requestor.start();
         }
         return requestor;
@@ -298,18 +296,6 @@ public class JmsEndpoint extends Default
         this.requestor = requestor;
     }
 
-    public synchronized ScheduledExecutorService getScheduledExecutorService() {
-        if (scheduledExecutorService == null) {
-            scheduledExecutorService = getCamelContext().getExecutorServiceStrategy()
-                    .newScheduledThreadPool(this, getEndpointUri(), DEFAULT_THREADPOOL_SIZE);
-        }
-        return scheduledExecutorService;
-    }
-
-    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
-        this.scheduledExecutorService = scheduledExecutorService;
-    }
-
     public boolean isPubSubDomain() {
         return pubSubDomain;
     }
@@ -357,6 +343,13 @@ public class JmsEndpoint extends Default
         return template;
     }
 
+    protected synchronized ScheduledExecutorService getRequestorExecutorService() {
+        if (requestorExecutorService == null) {
+            requestorExecutorService = getCamelContext().getExecutorServiceStrategy().newScheduledThreadPool(this, "JmsRequesterTimeoutTask", 1);
+        }
+        return requestorExecutorService;
+    }
+
     // Delegated properties from the configuration
     //-------------------------------------------------------------------------
     @ManagedAttribute

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Wed Jul 21 07:54:37 2010
@@ -26,6 +26,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.FailedToCreateProducerException;
@@ -37,7 +38,7 @@ import org.apache.camel.component.jms.re
 import org.apache.camel.component.jms.requestor.DeferredRequestReplyMap.DeferredMessageSentCallback;
 import org.apache.camel.component.jms.requestor.PersistentReplyToRequestor;
 import org.apache.camel.component.jms.requestor.Requestor;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.util.UuidGenerator;
 import org.apache.camel.util.ValueHolder;
 import org.apache.commons.logging.Log;
@@ -48,7 +49,7 @@ import org.springframework.jms.core.Mess
 /**
  * @version $Revision$
  */
-public class JmsProducer extends DefaultProducer {
+public class JmsProducer extends DefaultAsyncProducer {
     private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
     private RequestorAffinity affinity;
     private final JmsEndpoint endpoint;
@@ -100,11 +101,11 @@ public class JmsProducer extends Default
                 try {
                     JmsConfiguration c = endpoint.getConfiguration();
                     if (c.getReplyTo() != null) {
-                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService());
+                        requestor = new PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
                         requestor.start();
                     } else {
                         if (affinity == RequestorAffinity.PER_PRODUCER) {
-                            requestor = new Requestor(endpoint.getConfiguration(), endpoint.getScheduledExecutorService());
+                            requestor = new Requestor(endpoint.getConfiguration(), endpoint.getRequestorExecutorService());
                             requestor.start();
                         } else if (affinity == RequestorAffinity.PER_ENDPOINT) {
                             requestor = endpoint.getRequestor();
@@ -141,17 +142,17 @@ public class JmsProducer extends Default
         super.doStop();
     }
 
-    public void process(final Exchange exchange) {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         if (!endpoint.isDisableReplyTo() && exchange.getPattern().isOutCapable()) {
             // in out requires a bit more work than in only
-            processInOut(exchange);
+            return processInOut(exchange, callback);
         } else {
             // in only
-            processInOnly(exchange);
+            return processInOnly(exchange, callback);
         }
     }
 
-    protected void processInOut(final Exchange exchange) {
+    protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) {
         final org.apache.camel.Message in = exchange.getIn();
 
         String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
@@ -190,7 +191,7 @@ public class JmsProducer extends Default
         }
 
         final ValueHolder<FutureTask> futureHolder = new ValueHolder<FutureTask>();
-        final DeferredMessageSentCallback callback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
+        final DeferredMessageSentCallback jmsCallback = msgIdAsCorrId ? deferredRequestReplyMap.createDeferredMessageSentCallback() : null;
 
         MessageCreator messageCreator = new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
@@ -201,18 +202,24 @@ public class JmsProducer extends Default
                 FutureTask future;
                 future = (!msgIdAsCorrId)
                         ? requestor.getReceiveFuture(message.getJMSCorrelationID(), endpoint.getConfiguration().getRequestTimeout())
-                        : requestor.getReceiveFuture(callback);
+                        : requestor.getReceiveFuture(jmsCallback);
 
                 futureHolder.set(future);
                 return message;
             }
         };
 
-        doSend(true, destinationName, destination, messageCreator, callback);
+        doSend(true, destinationName, destination, messageCreator, jmsCallback);
 
         // after sending then set the OUT message id to the JMSMessageID so its identical
         setMessageId(exchange);
 
+        // now we should routing asynchronously to not block while waiting for the reply
+        // TODO:
+        // we need a thread pool to use for continue routing messages, just like a seda consumer
+        // and we need options to configure it as well so you can indicate how many threads to use
+        // TODO: Also consider requestTimeout
+
         // lets wait and return the response
         long requestTimeout = endpoint.getConfiguration().getRequestTimeout();
         try {
@@ -268,9 +275,12 @@ public class JmsProducer extends Default
             exchange.setException(e);
         }
 
+        // TODO: should be async
+        callback.done(true);
+        return true;
     }
 
-    protected void processInOnly(final Exchange exchange) {
+    protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
         final org.apache.camel.Message in = exchange.getIn();
 
         String destinationName = in.getHeader(JmsConstants.JMS_DESTINATION_NAME, String.class);
@@ -321,6 +331,10 @@ public class JmsProducer extends Default
 
         // after sending then set the OUT message id to the JMSMessageID so its identical
         setMessageId(exchange);
+
+        // we are synchronous so return true
+        callback.done(true);
+        return true;
     }
 
     /**

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=966129&r1=966128&r2=966129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Wed Jul 21 07:54:37 2010
@@ -20,7 +20,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
-
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -35,7 +34,6 @@ import org.apache.camel.component.jms.re
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.UuidGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.core.task.TaskExecutor;
@@ -49,9 +47,7 @@ import org.springframework.jms.support.d
  */
 public class Requestor extends ServiceSupport implements MessageListener {
     private static final transient Log LOG = LogFactory.getLog(Requestor.class);
-    private static UuidGenerator uuidGenerator;
     private final JmsConfiguration configuration;
-    private ScheduledExecutorService executorService;
     private AbstractMessageListenerContainer listenerContainer;
     private TimeoutMap<String, Object> requestMap;
     private Map<JmsProducer, DeferredRequestReplyMap> producerDeferredRequestReplyMap;
@@ -61,14 +57,15 @@ public class Requestor extends ServiceSu
     private long maxRequestTimeout = -1;
     private long replyToResolverTimeout = 5000;
 
+    // TODO: Use a Task queue to transfer replies arriving in onMessage
+    // instead of using the FutureHandle to support async routing
 
     public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
         this.configuration = configuration;
-        this.executorService = executorService;
-        requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
-        producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
-        deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
-        deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+        this.requestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+        this.producerDeferredRequestReplyMap = new HashMap<JmsProducer, DeferredRequestReplyMap>();
+        this.deferredRequestMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
+        this.deferredReplyMap = new DefaultTimeoutMap<String, Object>(executorService, configuration.getRequestMapPurgePollTimeMillis());
     }
 
     public synchronized DeferredRequestReplyMap getDeferredRequestReplyMap(JmsProducer producer) {
@@ -261,13 +258,6 @@ public class Requestor extends ServiceSu
         return answer;
     }
 
-    public static synchronized UuidGenerator getUuidGenerator() {
-        if (uuidGenerator == null) {
-            uuidGenerator = UuidGenerator.get();
-        }
-        return uuidGenerator;
-    }
-
     protected JmsConfiguration getConfiguration() {
         return configuration;
     }