You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/10/18 20:41:49 UTC

svn commit: r586066 [1/2] - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/bean/ camel-core/src/main/java/org/apache/camel/util/ camel-core/src/test/java/org/apache/camel/compone...

Author: jstrachan
Date: Thu Oct 18 11:41:45 2007
New Revision: 586066

URL: http://svn.apache.org/viewvc?rev=586066&view=rev
Log:
added support for JMS InOut so we can do Spring Remoting over JMS via <proxy> and <export> for https://issues.apache.org/activemq/browse/CAMEL-184

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
      - copied, changed from r585168, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java   (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html
      - copied, changed from r585168, activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/
    activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
    activemq/camel/trunk/components/camel-jms/pom.xml
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java
    activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties
    activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java
    activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Oct 18 11:41:45 2007
@@ -16,20 +16,19 @@
  */
 package org.apache.camel;
 
-import org.apache.camel.spi.UnitOfWork;
-
 import java.util.Map;
 
+import org.apache.camel.spi.UnitOfWork;
+
 /**
  * The base message exchange interface providing access to the request, response
  * and fault {@link Message} instances. Different providers such as JMS, JBI,
  * CXF and HTTP can provide their own derived API to expose the underlying
  * transport semantics to avoid the leaky abstractions of generic APIs.
- * 
+ *
  * @version $Revision$
  */
 public interface Exchange {
-
     /**
      * Returns the {@link ExchangePattern} (MEP) of this exchange.
      *
@@ -39,7 +38,7 @@
 
     /**
      * Returns a property associated with this exchange by name
-     * 
+     *
      * @param name the name of the property
      * @return the value of the given header or null if there is no property for
      *         the given name
@@ -49,7 +48,7 @@
     /**
      * Returns a property associated with this exchange by name and specifying
      * the type required
-     * 
+     *
      * @param name the name of the property
      * @param type the type of the property
      * @return the value of the given header or null if there is no property for
@@ -60,8 +59,8 @@
 
     /**
      * Sets a property on the exchange
-     * 
-     * @param name of the property
+     *
+     * @param name  of the property
      * @param value to associate with the name
      */
     void setProperty(String name, Object value);
@@ -76,24 +75,31 @@
 
     /**
      * Returns all of the properties associated with the exchange
-     * 
+     *
      * @return all the headers in a Map
      */
     Map<String, Object> getProperties();
 
     /**
      * Returns the inbound request message
-     * 
+     *
      * @return the message
      */
     Message getIn();
 
     /**
+     * Sets the inbound message instance
+     *
+     * @param in the inbound message
+     */
+    void setIn(Message in);
+
+    /**
      * Returns the outbound message, lazily creating one if one has not already
      * been associated with this exchange. If you want to inspect this property
      * but not force lazy creation then invoke the {@link #getOut(boolean)}
      * method passing in null
-     * 
+     *
      * @return the response
      */
     Message getOut();
@@ -101,14 +107,21 @@
     /**
      * Returns the outbound message; optionally lazily creating one if one has
      * not been associated with this exchange
-     * 
+     *
      * @return the response
      */
     Message getOut(boolean lazyCreate);
 
     /**
+     * Sets the outbound message
+     *
+     * @param out the outbound message
+     */
+    void setOut(Message out);
+
+    /**
      * Returns the fault message
-     * 
+     *
      * @return the fault
      */
     Message getFault();
@@ -123,14 +136,14 @@
 
     /**
      * Returns the exception associated with this exchange
-     * 
+     *
      * @return the exception (or null if no faults)
      */
     Throwable getException();
 
     /**
      * Sets the exception associated with this exchange
-     * 
+     *
      * @param e
      */
     void setException(Throwable e);
@@ -138,22 +151,22 @@
     /**
      * Returns true if this exchange failed due to either an exception or fault
      *
+     * @return true if this exchange failed due to either an exception or fault
      * @see Exchange#getException()
      * @see Exchange#getFault()
-     * @return true if this exchange failed due to either an exception or fault
      */
     boolean isFailed();
 
     /**
      * Returns the container so that a processor can resolve endpoints from URIs
-     * 
+     *
      * @return the container which owns this exchange
      */
     CamelContext getContext();
 
     /**
      * Creates a new exchange instance with empty messages, headers and properties
-     * 
+     *
      * @return
      */
     Exchange newInstance();
@@ -166,7 +179,7 @@
 
     /**
      * Copies the data into this exchange from the given exchange
-     * 
+     * <p/>
      * #param source is the source from which headers and messages will be
      * copied
      */
@@ -197,5 +210,4 @@
      * @param id
      */
     void setExchangeId(String id);
-
 }

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java (from r585168, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java&r1=585168&r2=586066&rev=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java Thu Oct 18 11:41:45 2007
@@ -17,21 +17,22 @@
 package org.apache.camel;
 
 /**
+ * An exception thrown if an InOut exchange times out receiving the OUT message
+ *
  * @version $Revision: 1.1 $
  */
-public class InvalidPayloadException extends CamelExchangeException {
-    private final Class<?> type;
+public class ExchangeTimedOutException extends CamelExchangeException {
+    private final long timeout;
 
-    public InvalidPayloadException(Exchange exchange, Class<?> type) {
-        super("No in body available of type: " + type.getName()
-              + NoSuchPropertyException.valueDescription(exchange.getIn().getBody()), exchange);
-        this.type = type;
+    public ExchangeTimedOutException(Exchange exchange, long timeout) {
+        super("The OUT message was not received within: " + timeout + " millis", exchange);
+        this.timeout = timeout;
     }
 
     /**
-     * The expected type of the body
+     * Return the timeout which expired in milliseconds
      */
-    public Class<?> getType() {
-        return type;
+    public long getTimeout() {
+        return timeout;
     }
-}
+}
\ No newline at end of file

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java Thu Oct 18 11:41:45 2007
@@ -16,23 +16,40 @@
  */
 package org.apache.camel.component.bean;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
 
-public class BeanInvocation {
+public class BeanInvocation implements Externalizable {
+    private Object[] args;
+    private MethodBean methodBean;
+    private transient Method method;
 
-    private final Object proxy;
-    private final Method method;
-    private final Object[] args;
+    public BeanInvocation() {
+    }
 
-    public BeanInvocation(Object proxy, Method method, Object[] args) {
-        this.proxy = proxy;
+    public BeanInvocation(Method method, Object[] args) {
         this.method = method;
         this.args = args;
     }
 
+    @Override
+    public String toString() {
+        Object list = null;
+        if (args != null) {
+            list = Arrays.asList(args);
+        }
+        return "BeanInvocation " + method + " with " + list + "]";
+    }
+
     public Object[] getArgs() {
         return args;
     }
@@ -41,26 +58,52 @@
         return method;
     }
 
-    public Object getProxy() {
-        return proxy;
+    public void setMethod(Method method) {
+        this.method = method;
+    }
+
+    public void setArgs(Object[] args) {
+        this.args = args;
     }
 
     /**
      * This causes us to invoke the endpoint Pojo using reflection.
-     * 
-     * @param pojo
+     *
+     * @param pojo     the bean on which to perform this invocation
+     * @param exchange the exchange carrying the method invocation
      */
     public void invoke(Object pojo, Exchange exchange) {
         try {
             Object response = getMethod().invoke(pojo, getArgs());
             exchange.getOut().setBody(response);
-        } catch (InvocationTargetException e) {
+        }
+        catch (InvocationTargetException e) {
             exchange.setException(e.getCause());
-        } catch (RuntimeException e) {
+        }
+        catch (RuntimeException e) {
             throw e;
-        } catch (Throwable e) {
+        }
+        catch (Throwable e) {
             throw new RuntimeException(e);
         }
     }
 
+    public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
+        methodBean = ObjectHelper.cast(MethodBean.class, objectInput.readObject());
+        try {
+            method = methodBean.getMethod();
+        }
+        catch (NoSuchMethodException e) {
+            throw IOHelper.createIOException(e);
+        }
+        args = ObjectHelper.cast(Object[].class, objectInput.readObject());
+    }
+
+    public void writeExternal(ObjectOutput objectOutput) throws IOException {
+        if (methodBean == null) {
+            methodBean = new MethodBean(method);
+        }
+        objectOutput.writeObject(methodBean);
+        objectOutput.writeObject(args);
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java Thu Oct 18 11:41:45 2007
@@ -21,8 +21,8 @@
 import java.lang.reflect.Method;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.Producer;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
 
 /**
  * An {@link java.lang.reflect.InvocationHandler} which invokes a
@@ -40,7 +40,7 @@
     }
 
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-        BeanInvocation invocation = new BeanInvocation(proxy, method, args);
+        BeanInvocation invocation = new BeanInvocation(method, args);
         BeanExchange exchange = new BeanExchange(endpoint.getContext(), ExchangePattern.InOut);
         exchange.setInvocation(invocation);
 
@@ -49,6 +49,7 @@
         if (fault != null) {
             throw new InvocationTargetException(fault);
         }
-        return exchange.getOut().getBody();
+        return exchange.getOut(true).getBody();
     }
 }
+

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+
+/**
+ * Represents a {@link Serializable} version of a {@link Method}
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class MethodBean implements Serializable {
+    private String name;
+    private Class<?> type;
+    private Class<?>[] parameterTypes;
+
+    public MethodBean() {
+    }
+
+    public MethodBean(Method method) {
+        this.name = method.getName();
+        this.type = method.getDeclaringClass();
+        this.parameterTypes = method.getParameterTypes();
+    }
+
+    public Method getMethod() throws NoSuchMethodException {
+        return type.getMethod(name, parameterTypes);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public Class<?>[] getParameterTypes() {
+        return parameterTypes;
+    }
+
+    public void setParameterTypes(Class<?>[] parameterTypes) {
+        this.parameterTypes = parameterTypes;
+    }
+
+    public Class<?> getType() {
+        return type;
+    }
+
+    public void setType(Class<?> type) {
+        this.type = type;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 
+ * @version $Revision: $
+ */
+public class DefaultTimeoutMap implements TimeoutMap, Runnable {
+
+    private static final Log log = LogFactory.getLog(DefaultTimeoutMap.class);
+
+    private Map map = new HashMap();
+    private SortedSet index = new TreeSet();
+    private ScheduledExecutorService executor;
+    private long purgePollTime;
+
+    public DefaultTimeoutMap() {
+        this(null, 1000L);
+    }
+
+    public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+        this.executor = executor;
+        this.purgePollTime = requestMapPollTimeMillis;
+        schedulePoll();
+    }
+
+    public Object get(Object key) {
+        TimeoutMapEntry entry = null;
+        synchronized (map) {
+            entry = (TimeoutMapEntry) map.get(key);
+            if (entry == null) {
+                return null;
+            }
+            index.remove(entry);
+            updateExpireTime(entry);
+            index.add(entry);
+        }
+        return entry.getValue();
+    }
+
+    public void put(Object key, Object value, long timeoutMillis) {
+        TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis);
+        synchronized (map) {
+            Object oldValue = map.put(key, entry);
+            if (oldValue != null) {
+                index.remove(oldValue);
+            }
+            updateExpireTime(entry);
+            index.add(entry);
+        }
+    }
+
+    public void remove(Object id) {
+        synchronized (map) {
+            TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id);
+            if (entry != null) {
+                index.remove(entry);
+            }
+        }
+    }
+
+    /**
+     * Returns a copy of the keys in the map
+     */
+    public Object[] getKeys() {
+        Object[] keys = null;
+        synchronized (map) {
+            Set keySet = map.keySet();
+            keys = new String[keySet.size()];
+            keySet.toArray(keys);
+        }
+        return keys;
+    }
+
+    /**
+     * The timer task which purges old requests and schedules another poll
+     */
+    public void run() {
+        purge();
+        schedulePoll();
+    }
+
+    /**
+     * Purges any old entries from the map
+     */
+    public void purge() {
+        long now = currentTime();
+        synchronized (map) {
+            for (Iterator iter = index.iterator(); iter.hasNext();) {
+                TimeoutMapEntry entry = (TimeoutMapEntry) iter.next();
+                if (entry == null) {
+                    break;
+                }
+                if (entry.getExpireTime() < now) {
+                    if (isValidForEviction(entry)) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Evicting inactive request for correlationID: " + entry);
+                        }
+                        System.out.println("Evicting inactive request for correlationID: " + entry);
+                        map.remove(entry.getKey());
+                        iter.remove();
+                    }
+                }
+                else {
+                    break;
+                }
+            }
+        }
+    }
+
+    // Properties
+    // -------------------------------------------------------------------------
+    public long getPurgePollTime() {
+        return purgePollTime;
+    }
+
+    /**
+     * Sets the next purge poll time in milliseconds
+     */
+    public void setPurgePollTime(long purgePollTime) {
+        this.purgePollTime = purgePollTime;
+    }
+
+    public ScheduledExecutorService getExecutor() {
+        return executor;
+    }
+
+    /**
+     * Sets the executor used to schedule purge events of inactive requests
+     */
+    public void setExecutor(ScheduledExecutorService executor) {
+        this.executor = executor;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+
+    /**
+     * lets schedule each time to allow folks to change the time at runtime
+     */
+    protected void schedulePoll() {
+        if (executor != null) {
+            executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * A hook to allow derivations to avoid evicting the current entry
+     * 
+     * @param entry
+     * @return
+     */
+    protected boolean isValidForEviction(TimeoutMapEntry entry) {
+        return true;
+    }
+
+    protected void updateExpireTime(TimeoutMapEntry entry) {
+        long now = currentTime();
+        entry.setExpireTime(entry.getTimeout() + now);
+    }
+
+    protected long currentTime() {
+        return System.currentTimeMillis();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+/**
+ * Represents a thread safe map of values which timeout after a period of
+ * inactivity.
+ * 
+ * @version $Revision: $
+ */
+public interface TimeoutMap extends Runnable {
+
+    /**
+     * Looks up the value in the map by the given key.
+     * 
+     * @param key
+     *            the key of the value to search for
+     * @return the value for the given key or null if it is not present (or has
+     *         timed out)
+     */
+    Object get(Object key);
+
+    /**
+     * Returns a copy of the keys in the map
+     */
+    Object[] getKeys();
+
+    /**
+     * Adds the key value pair into the map such that some time after the given
+     * timeout the entry will be evicted
+     */
+    void put(Object key, Object value, long timeoutMillis);
+
+    void remove(Object key);
+
+    /**
+     * Purges any old entries from the map
+     */
+    public void purge();
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.Map;
+
+/**
+ * Represents an entry in a {@link TimeoutMap}
+ * 
+ * @version $Revision: $
+ */
+public class TimeoutMapEntry implements Comparable, Map.Entry {
+    private Object key;
+    private Object value;
+    private long timeout;
+    private long expireTime;
+
+    public TimeoutMapEntry(Object id, Object handler, long timeout) {
+        this.key = id;
+        this.value = handler;
+        this.timeout = timeout;
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    public long getExpireTime() {
+        return expireTime;
+    }
+
+    public void setExpireTime(long expireTime) {
+        this.expireTime = expireTime;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+
+    public Object setValue(Object value) {
+        Object oldValue = value;
+        this.value = value;
+        return oldValue;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public int compareTo(Object that) {
+        if (this == that) {
+            return 0;
+        }
+        if (that instanceof TimeoutMapEntry) {
+            return compareTo((TimeoutMapEntry) that);
+        }
+        return 1;
+    }
+
+    public int compareTo(TimeoutMapEntry that) {
+        long diff = this.expireTime - that.expireTime;
+        if (diff > 0) {
+            return 1;
+        }
+        else if (diff < 0) {
+            return -1;
+        }
+        return this.key.hashCode() - that.key.hashCode();
+    }
+
+    public String toString() {
+        return "Entry for key: " + key;
+    }
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+
+import org.apache.camel.TestSupport;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class BeanInvocationSerializeTest extends TestSupport {
+    public void testSerialize() throws Exception {
+        Method method = getClass().getMethod("cheese", String.class, String.class);
+        BeanInvocation invocation = new BeanInvocation(method, new Object[] { "a", "b" });
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(buffer);
+        out.writeObject(invocation);
+        out.close();
+
+        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray()));
+        Object object = in.readObject();
+        BeanInvocation actual = assertIsInstanceOf(BeanInvocation.class, object);
+        System.out.println("Received " + actual);
+    }
+
+    public void cheese(String a, String b) {
+        System.out.println("Called with a: " + a + " b: " + b);
+
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-jms/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/pom.xml?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/pom.xml (original)
+++ activemq/camel/trunk/components/camel-jms/pom.xml Thu Oct 18 11:41:45 2007
@@ -64,6 +64,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-spring</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>apache-activemq</artifactId>
       <scope>test</scope>

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Thu Oct 18 11:41:45 2007
@@ -16,52 +16,72 @@
  */
 package org.apache.camel.component.jms;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 
-import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.MessageCreator;
 
 /**
  * A JMS {@link MessageListener} which can be used to delegate processing to a
  * Camel endpoint.
- * 
+ *
  * @version $Revision$
  */
-public class EndpointMessageListener<E extends Exchange> implements MessageListener {
+public class EndpointMessageListener implements MessageListener {
     private static final transient Log LOG = LogFactory.getLog(EndpointMessageListener.class);
     private JmsEndpoint endpoint;
     private Processor processor;
     private JmsBinding binding;
     private boolean eagerLoadingOfProperties;
+    private Destination replyToDestination;
+    private JmsOperations template;
+    private boolean disableReplyTo;
 
     public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
         this.processor = processor;
     }
 
-    public void onMessage(Message message) {
+    public void onMessage(final Message message) {
         try {
-
             if (LOG.isDebugEnabled()) {
                 LOG.debug(endpoint + " receiving JMS message: " + message);
             }
-            JmsExchange exchange = createExchange(message);
+            Destination replyDestination = getReplyToDestination(message);
+            final JmsExchange exchange = createExchange(message, replyDestination);
             if (eagerLoadingOfProperties) {
                 exchange.getIn().getHeaders();
             }
             processor.process(exchange);
 
-        } catch (Exception e) {
+            final JmsMessage out = exchange.getOut(false);
+            if (out != null && !disableReplyTo) {
+                sendReply(replyDestination, message, exchange, out);
+            }
+        }
+        catch (Exception e) {
             throw new RuntimeCamelException(e);
         }
     }
 
-    public JmsExchange createExchange(Message message) {
-        return new JmsExchange(endpoint.getContext(), endpoint.getExchangePattern(), getBinding(), message);
+    public JmsExchange createExchange(Message message, Destination replyDestination) {
+        JmsExchange exchange = new JmsExchange(endpoint.getContext(), endpoint.getExchangePattern(), getBinding(), message);
+        // lets set to an InOut if we have some kind of reply-to destination
+        if (replyDestination != null) {
+            exchange.setProperty("org.apache.camel.jms.replyDestination", replyDestination);
+            exchange.setPattern(ExchangePattern.InOut);
+        }
+        return exchange;
     }
 
     // Properties
@@ -76,7 +96,7 @@
     /**
      * Sets the binding used to convert from a Camel message to and from a JMS
      * message
-     * 
+     *
      * @param binding the binding to use
      */
     public void setBinding(JmsBinding binding) {
@@ -89,5 +109,76 @@
 
     public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
         this.eagerLoadingOfProperties = eagerLoadingOfProperties;
+    }
+
+    public JmsOperations getTemplate() {
+        if (template == null) {
+            template = endpoint.createInOnlyTemplate();
+        }
+        return template;
+    }
+
+    public void setTemplate(JmsOperations template) {
+        this.template = template;
+    }
+
+    public boolean isDisableReplyTo() {
+        return disableReplyTo;
+    }
+
+    /**
+     * Allows the reply-to behaviour to be disabled
+     */
+    public void setDisableReplyTo(boolean disableReplyTo) {
+        this.disableReplyTo = disableReplyTo;
+    }
+
+    public Destination getReplyToDestination() {
+        return replyToDestination;
+    }
+
+    /**
+     * Provides an explicit reply to destination which overrides
+     * any incoming value of {@link Message#getJMSReplyTo()}
+     *
+     * @param replyToDestination the destination that should be used to send replies to
+     */
+    public void setReplyToDestination(Destination replyToDestination) {
+        this.replyToDestination = replyToDestination;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange, final JmsMessage out) {
+        if (replyDestination == null) {
+            LOG.warn("Cannot send reply message as there is no replyDestination for: " + out);
+            return;
+        }
+        getTemplate().send(replyDestination, new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
+
+                // lets preserve any correlation ID
+                String correlationID = message.getJMSCorrelationID();
+                if (correlationID != null) {
+                    reply.setJMSCorrelationID(correlationID);
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(endpoint + " sending reply JMS message: " + reply);
+                }
+                return reply;
+            }
+        });
+    }
+
+    protected Destination getReplyToDestination(Message message) throws JMSException {
+        // lets send a response back if we can
+        Destination destination = replyToDestination;
+        if (destination == null) {
+            destination = message.getJMSReplyTo();
+        }
+        return destination;
     }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Oct 18 11:41:45 2007
@@ -51,7 +51,7 @@
      * @param exchange
      * @param message
      */
-    public Object extractBodyFromJms(JmsExchange exchange, Message message) {
+    public Object extractBodyFromJms(Exchange exchange, Message message) {
         try {
             if (message instanceof ObjectMessage) {
                 ObjectMessage objectMessage = (ObjectMessage)message;
@@ -79,15 +79,15 @@
      * @return a newly created JMS Message instance containing the
      * @throws JMSException if the message could not be created
      */
-    public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
+    public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session) throws JMSException {
         Message answer = null;
-        if( exchange instanceof JmsExchange  ) {
-            JmsExchange jmsExchange = (JmsExchange)exchange;
-            answer = jmsExchange.getIn().getJmsMessage();
+        if( camelMessage instanceof JmsMessage  ) {
+            JmsMessage jmsMessage = (JmsMessage)camelMessage;
+            answer = jmsMessage.getJmsMessage();
         }
         if( answer == null ) {
-            answer = createJmsMessage(exchange.getIn().getBody(), session);
-            appendJmsProperties(answer, exchange);
+            answer = createJmsMessage(camelMessage.getBody(), session);
+            appendJmsProperties(answer, exchange, camelMessage);
         }
         return answer;
     }
@@ -95,8 +95,7 @@
     /**
      * Appends the JMS headers from the Camel {@link JmsMessage}
      */
-    public void appendJmsProperties(Message jmsMessage, Exchange exchange) throws JMSException {
-        org.apache.camel.Message in = exchange.getIn();
+    public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in) throws JMSException {
         Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
         for (Map.Entry<String, Object> entry : entries) {
             String headerName = entry.getKey();

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Thu Oct 18 11:41:45 2007
@@ -16,38 +16,39 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.impl.DefaultComponent;
 import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
 import org.springframework.beans.BeansException;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationContextAware;
 import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.connection.JmsTransactionManager;
+import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.listener.serversession.ServerSessionFactory;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
-import org.springframework.jms.connection.JmsTransactionManager;
-import org.springframework.jms.core.JmsOperations;
 import org.springframework.transaction.PlatformTransactionManager;
 
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
-import java.util.Map;
-
 /**
  * A <a href="http://activemq.apache.org/jms.html">JMS Component</a>
- * 
+ *
  * @version $Revision:520964 $
  */
 public class JmsComponent extends DefaultComponent<JmsExchange> implements ApplicationContextAware {
     public static final String QUEUE_PREFIX = "queue:";
     public static final String TOPIC_PREFIX = "topic:";
-
     private JmsConfiguration configuration;
-	private ApplicationContext applicationContext;
+    private ApplicationContext applicationContext;
+    private Requestor requestor;
 
     public JmsComponent() {
     }
@@ -59,7 +60,7 @@
     public JmsComponent(CamelContext context) {
         super(context);
     }
-    
+
     /**
      * Static builder method
      */
@@ -111,48 +112,24 @@
         template.setTransacted(true);
         return jmsComponent(template);
     }
-
-    @Override
-    protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-
-        boolean pubSubDomain = false;
-        if (remaining.startsWith(QUEUE_PREFIX)) {
-            pubSubDomain = false;
-            remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
-        } else if (remaining.startsWith(TOPIC_PREFIX)) {
-            pubSubDomain = true;
-            remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
-        }
-
-        final String subject = convertPathToActualDestination(remaining);
-
-        // lets make sure we copy the configuration as each endpoint can
-        // customize its own version
-        JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
-
-        String selector = (String)parameters.remove("selector");
-        if (selector != null) {
-            endpoint.setSelector(selector);
-        }
-        setProperties(endpoint.getConfiguration(), parameters);
-        return endpoint;
-    }
+    // Properties
+    //-------------------------------------------------------------------------
 
     public JmsConfiguration getConfiguration() {
         if (configuration == null) {
             configuration = createConfiguration();
-            
+
             // If we are being configured with spring... 
-            if( applicationContext !=null ) {
-            	Map beansOfType = applicationContext.getBeansOfType(ConnectionFactory.class);
-            	if( !beansOfType.isEmpty() ) { 
-            		ConnectionFactory cf = (ConnectionFactory) beansOfType.values().iterator().next();
-            		configuration.setConnectionFactory(cf);
-            	}
+            if (applicationContext != null) {
+                Map beansOfType = applicationContext.getBeansOfType(ConnectionFactory.class);
+                if (!beansOfType.isEmpty()) {
+                    ConnectionFactory cf = (ConnectionFactory) beansOfType.values().iterator().next();
+                    configuration.setConnectionFactory(cf);
+                }
                 beansOfType = applicationContext.getBeansOfType(DestinationResolver.class);
-                if( !beansOfType.isEmpty() ) { 
+                if (!beansOfType.isEmpty()) {
                     DestinationResolver destinationResolver = (DestinationResolver) beansOfType.values().iterator().next();
-                        configuration.setDestinationResolver(destinationResolver);
+                    configuration.setDestinationResolver(destinationResolver);
                 }
             }
         }
@@ -161,7 +138,7 @@
 
     /**
      * Sets the JMS configuration
-     * 
+     *
      * @param configuration the configuration to use by default for endpoints
      */
     public void setConfiguration(JmsConfiguration configuration) {
@@ -303,7 +280,7 @@
     public void setUseVersion102(boolean useVersion102) {
         getConfiguration().setUseVersion102(useVersion102);
     }
-    
+
     public void setJmsOperations(JmsOperations jmsOperations) {
         getConfiguration().setJmsOperations(jmsOperations);
     }
@@ -312,6 +289,60 @@
         getConfiguration().setDestinationResolver(destinationResolver);
     }
 
+    public Requestor getRequestor() throws Exception {
+        if (requestor == null) {
+            requestor = new Requestor(getConfiguration(), getExecutorService());
+            requestor.start();
+        }
+        return requestor;
+    }
+
+    public void setRequestor(Requestor requestor) {
+        this.requestor = requestor;
+    }
+
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    @Override
+    protected void doStop() throws Exception {
+        if (requestor != null) {
+            requestor.stop();
+        }
+        super.doStop();
+    }
+
+    @Override
+    protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+
+        boolean pubSubDomain = false;
+        if (remaining.startsWith(QUEUE_PREFIX)) {
+            pubSubDomain = false;
+            remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
+        }
+        else if (remaining.startsWith(TOPIC_PREFIX)) {
+            pubSubDomain = true;
+            remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
+        }
+
+        final String subject = convertPathToActualDestination(remaining);
+
+        // lets make sure we copy the configuration as each endpoint can
+        // customize its own version
+        JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
+
+        String selector = (String) parameters.remove("selector");
+        if (selector != null) {
+            endpoint.setSelector(selector);
+        }
+        setProperties(endpoint.getConfiguration(), parameters);
+        return endpoint;
+    }
+
     /**
      * A strategy method allowing the URI destination to be translated into the
      * actual JMS destination name (say by looking up in JNDI or something)
@@ -322,16 +353,11 @@
 
     /**
      * Factory method to create the default configuration instance
-     * 
+     *
      * @return a newly created configuration object which can then be further
      *         customized
      */
     protected JmsConfiguration createConfiguration() {
         return new JmsConfiguration();
     }
-
-	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-		this.applicationContext = applicationContext;
-	}
-
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Thu Oct 18 11:41:45 2007
@@ -51,7 +51,6 @@
     protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
     protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
     protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
-
     private JmsOperations jmsOperations;
     private DestinationResolver destinationResolver;
     private ConnectionFactory connectionFactory;
@@ -94,6 +93,7 @@
     private String transactionName;
     private int transactionTimeout = -1;
     private boolean preserveMessageQos;
+    private long requestMapPurgePollTimeMillis = 1000L;
 
     public JmsConfiguration() {
     }
@@ -107,18 +107,32 @@
      */
     public JmsConfiguration copy() {
         try {
-            return (JmsConfiguration)clone();
-        } catch (CloneNotSupportedException e) {
+            return (JmsConfiguration) clone();
+        }
+        catch (CloneNotSupportedException e) {
             throw new RuntimeCamelException(e);
         }
     }
 
-    public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) {
-        
-        if ( jmsOperations !=null ) {
+    /**
+     * Creates a JmsOperations object used for request/response using a request timeout value
+     */
+    public JmsOperations createInOutTemplate(boolean pubSubDomain, String destination, long requestTimeout) {
+        JmsOperations answer = createInOnlyTemplate(pubSubDomain, destination);
+        if (answer instanceof JmsTemplate && requestTimeout > 0) {
+            JmsTemplate jmsTemplate = (JmsTemplate) answer;
+            jmsTemplate.setExplicitQosEnabled(true);
+            jmsTemplate.setTimeToLive(requestTimeout);
+        }
+        return answer;
+    }
+
+    public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
+
+        if (jmsOperations != null) {
             return jmsOperations;
         }
-        
+
         ConnectionFactory factory = getTemplateConnectionFactory();
 
         // I whish the spring templates had built in support for preserving the message
@@ -141,11 +155,13 @@
                         }
                     }
                     if (isPubSubDomain()) {
-                        ((TopicPublisher)producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
-                    } else {
-                        ((QueueSender)producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+                        ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+                    }
+                    else {
+                        ((QueueSender) producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
                     }
-                } else {
+                }
+                else {
                     super.doSend(producer, message);
                 }
             }
@@ -167,18 +183,19 @@
                         }
                     }
                     producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
-                } else {
+                }
+                else {
                     super.doSend(producer, message);
                 }
             }
         };
-        
+
         template.setPubSubDomain(pubSubDomain);
-        if( destinationResolver!=null ) {
+        if (destinationResolver != null) {
             template.setDestinationResolver(destinationResolver);
         }
         template.setDefaultDestinationName(destination);
-        
+
         template.setExplicitQosEnabled(explicitQosEnabled);
         template.setDeliveryPersistent(deliveryPersistent);
         if (messageConverter != null) {
@@ -206,7 +223,8 @@
             // for receiving messages.
             if (acknowledgementMode >= 0) {
                 template.setSessionAcknowledgeMode(acknowledgementMode);
-            } else if (acknowledgementModeName != null) {
+            }
+            else if (acknowledgementModeName != null) {
                 template.setSessionAcknowledgeModeName(acknowledgementModeName);
             }
         }
@@ -221,7 +239,7 @@
 
     protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
         container.setConnectionFactory(getListenerConnectionFactory());
-        if( destinationResolver!=null ) {
+        if (destinationResolver != null) {
             container.setDestinationResolver(destinationResolver);
         }
         if (autoStartup) {
@@ -255,21 +273,23 @@
         else {
             if (acknowledgementMode >= 0) {
                 container.setSessionAcknowledgeMode(acknowledgementMode);
-            } else if (acknowledgementModeName != null) {
+            }
+            else if (acknowledgementModeName != null) {
                 container.setSessionAcknowledgeModeName(acknowledgementModeName);
             }
         }
 
         if (container instanceof DefaultMessageListenerContainer) {
             // this includes DefaultMessageListenerContainer102
-            DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
+            DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
             if (concurrentConsumers >= 0) {
                 listenerContainer.setConcurrentConsumers(concurrentConsumers);
             }
 
             if (cacheLevel >= 0) {
                 listenerContainer.setCacheLevel(cacheLevel);
-            } else if (cacheLevelName != null) {
+            }
+            else if (cacheLevelName != null) {
                 listenerContainer.setCacheLevelName(cacheLevelName);
             }
 
@@ -304,18 +324,20 @@
             if (transactionTimeout >= 0) {
                 listenerContainer.setTransactionTimeout(transactionTimeout);
             }
-        } else if (container instanceof ServerSessionMessageListenerContainer) {
+        }
+        else if (container instanceof ServerSessionMessageListenerContainer) {
             // this includes ServerSessionMessageListenerContainer102
-            ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container;
+            ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer) container;
             if (maxMessagesPerTask >= 0) {
                 listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
             }
             if (serverSessionFactory != null) {
                 listenerContainer.setServerSessionFactory(serverSessionFactory);
             }
-        } else if (container instanceof SimpleMessageListenerContainer) {
+        }
+        else if (container instanceof SimpleMessageListenerContainer) {
             // this includes SimpleMessageListenerContainer102
-            SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
+            SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
             if (concurrentConsumers >= 0) {
                 listenerContainer.setConcurrentConsumers(concurrentConsumers);
             }
@@ -340,7 +362,7 @@
      * not specified for either
      * {@link #setTemplateConnectionFactory(ConnectionFactory)} or
      * {@link #setListenerConnectionFactory(ConnectionFactory)}
-     * 
+     *
      * @param connectionFactory the default connection factory to use
      */
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -357,9 +379,9 @@
     /**
      * Sets the connection factory to be used for consuming messages via the
      * {@link #createMessageListenerContainer()}
-     * 
+     *
      * @param listenerConnectionFactory the connection factory to use for
-     *                consuming messages
+     *                                  consuming messages
      */
     public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
         this.listenerConnectionFactory = listenerConnectionFactory;
@@ -374,10 +396,10 @@
 
     /**
      * Sets the connection factory to be used for sending messages via the
-     * {@link JmsTemplate} via {@link #createJmsOperations(boolean, String)}
-     * 
+     * {@link JmsTemplate} via {@link #createInOnlyTemplate(boolean, String)}
+     *
      * @param templateConnectionFactory the connection factory for sending
-     *                messages
+     *                                  messages
      */
     public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
         this.templateConnectionFactory = templateConnectionFactory;
@@ -654,14 +676,14 @@
     protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
         // TODO we could allow a spring container to auto-inject these objects?
         switch (consumerType) {
-        case Simple:
-            return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
-        case ServerSessionPool:
-            return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
-        case Default:
-            return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
-        default:
-            throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
+            case Simple:
+                return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
+            case ServerSessionPool:
+                return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
+            case Default:
+                return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
+            default:
+                throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
         }
     }
 
@@ -695,10 +717,10 @@
     }
 
     /**
-     * Set to true if you want to send message using the QoS settings specified 
+     * Set to true if you want to send message using the QoS settings specified
      * on the message.  Normally the QoS settings used are the one configured
      * on this Object.
-     * 
+     *
      * @param preserveMessageQos
      */
     public void setPreserveMessageQos(boolean preserveMessageQos) {
@@ -719,5 +741,19 @@
 
     public void setDestinationResolver(DestinationResolver destinationResolver) {
         this.destinationResolver = destinationResolver;
+    }
+
+    public long getRequestMapPurgePollTimeMillis() {
+        return requestMapPurgePollTimeMillis;
+    }
+
+    /**
+     * Sets the frequency that the requestMap for InOut exchanges is purged
+     * for timed out message exchanges
+     *
+     * @param requestMapPurgePollTimeMillis
+     */
+    public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
+        this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
     }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Thu Oct 18 11:41:45 2007
@@ -18,9 +18,9 @@
 
 import javax.jms.MessageListener;
 
+import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 
 /**
@@ -44,7 +44,7 @@
     }
 
     protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor processor) {
-        EndpointMessageListener<JmsExchange> messageListener = new EndpointMessageListener<JmsExchange>(endpoint, processor);
+        EndpointMessageListener messageListener = new EndpointMessageListener(endpoint, processor);
         messageListener.setBinding(endpoint.getBinding());
         return messageListener;
     }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Oct 18 11:41:45 2007
@@ -18,49 +18,54 @@
 
 import javax.jms.Message;
 
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.impl.DefaultEndpoint;
-
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 
 /**
  * A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
- * 
+ *
  * @version $Revision:520964 $
  */
 public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
+    private final JmsComponent component;
+    private final boolean pubSubDomain;
     private JmsBinding binding;
     private String destination;
-    private final boolean pubSubDomain;
     private String selector;
     private JmsConfiguration configuration;
+    private Requestor requestor;
+    private long requestTimeout = 20000L;
 
     public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
         super(uri, component);
+        this.component = component;
         this.configuration = configuration;
         this.destination = destination;
         this.pubSubDomain = pubSubDomain;
     }
 
     public JmsProducer createProducer() throws Exception {
-        JmsOperations template = createJmsOperations();
-        return createProducer(template);
+        return new JmsProducer(this);
     }
 
     /**
-     * Creates a producer using the given template
+     * Creates a producer using the given template for InOnly message exchanges
      */
     public JmsProducer createProducer(JmsOperations template) throws Exception {
+        JmsProducer answer = createProducer();
         if (template instanceof JmsTemplate) {
-            JmsTemplate jmsTemplate = (JmsTemplate)template;
+            JmsTemplate jmsTemplate = (JmsTemplate) template;
             jmsTemplate.setPubSubDomain(pubSubDomain);
             jmsTemplate.setDefaultDestinationName(destination);
         }
-        return new JmsProducer(this, template);
+        answer.setInOnlyTemplate(template);
+        return answer;
     }
 
     public JmsConsumer createConsumer(Processor processor) throws Exception {
@@ -70,8 +75,8 @@
 
     /**
      * Creates a consumer using the given processor and listener container
-     * 
-     * @param processor the processor to use to process the messages
+     *
+     * @param processor         the processor to use to process the messages
      * @param listenerContainer the listener container
      * @return a newly created consumer
      * @throws Exception if the consumer cannot be created
@@ -87,7 +92,7 @@
 
     @Override
     public PollingConsumer<JmsExchange> createPollingConsumer() throws Exception {
-        JmsOperations template = createJmsOperations();
+        JmsOperations template = createInOnlyTemplate();
         return new JmsPollingConsumer(this, template);
     }
 
@@ -100,6 +105,20 @@
         return new JmsExchange(getContext(), getExchangePattern(), getBinding(), message);
     }
 
+    /**
+     * Factory method for creating a new template for InOnly message exchanges
+     */
+    public JmsOperations createInOnlyTemplate() {
+        return configuration.createInOnlyTemplate(pubSubDomain, destination);
+    }
+
+    /**
+     * Factory method for creating a new template for InOut message exchanges
+     */
+    public JmsOperations createInOutTemplate() {
+        return configuration.createInOutTemplate(pubSubDomain, destination, getRequestTimeout());
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public JmsBinding getBinding() {
@@ -112,7 +131,7 @@
     /**
      * Sets the binding used to convert from a Camel message to and from a JMS
      * message
-     * 
+     *
      * @param binding the binding to use
      */
     public void setBinding(JmsBinding binding) {
@@ -142,8 +161,30 @@
         return false;
     }
 
-    protected JmsOperations createJmsOperations() {
-        return configuration.createJmsOperations(pubSubDomain, destination);
+    public Requestor getRequestor() throws Exception {
+        if (requestor == null) {
+            requestor = component.getRequestor();
+        }
+        return requestor;
+    }
+
+    public void setRequestor(Requestor requestor) {
+        this.requestor = requestor;
+    }
+
+    public long getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    /**
+     * Sets the timeout in milliseconds which requests should timeout after
+     * 
+     * @param requestTimeout
+     */
+    public void setRequestTimeout(long requestTimeout) {
+        this.requestTimeout = requestTimeout;
     }
 
+    // Implementation methods
+    //-------------------------------------------------------------------------
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Thu Oct 18 11:41:45 2007
@@ -26,6 +26,7 @@
 import javax.jms.Queue;
 import javax.jms.Topic;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultMessage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@
 public class JmsMessage extends DefaultMessage {
     private static final transient Log LOG = LogFactory.getLog(JmsMessage.class);
     private Message jmsMessage;
+    private JmsBinding binding;
 
     public JmsMessage() {
     }
@@ -46,6 +48,11 @@
         setJmsMessage(jmsMessage);
     }
 
+    public JmsMessage(Message jmsMessage, JmsBinding binding) {
+        this(jmsMessage);
+        setBinding(binding);
+    }
+
     @Override
     public String toString() {
         if (jmsMessage != null) {
@@ -64,6 +71,24 @@
         return jmsMessage;
     }
 
+    public JmsBinding getBinding() {
+        if (binding == null) {
+            Exchange exchange = getExchange();
+            if (exchange instanceof JmsExchange) {
+                JmsExchange jmsExchange = (JmsExchange) exchange;
+                return jmsExchange.getBinding();
+            }
+            else {
+                return new JmsBinding();
+            }
+        }
+        return binding;
+    }
+
+    public void setBinding(JmsBinding binding) {
+        this.binding = binding;
+    }
+
     public void setJmsMessage(Message jmsMessage) {
         this.jmsMessage = jmsMessage;
         try {
@@ -100,9 +125,8 @@
 
     @Override
     protected Object createBody() {
-        if (jmsMessage != null && getExchange() instanceof JmsExchange) {
-            JmsExchange exchange = (JmsExchange)getExchange();
-            return exchange.getBinding().extractBodyFromJms(exchange, jmsMessage);
+        if (jmsMessage != null) {
+            return getBinding().extractBodyFromJms(getExchange(), jmsMessage);
         }
         return null;
     }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Thu Oct 18 11:41:45 2007
@@ -16,15 +16,22 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.jms.requestor.Requestor;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.UuidGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
 
@@ -34,27 +41,117 @@
 public class JmsProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
     private final JmsEndpoint endpoint;
-    private final JmsOperations template;
+    private JmsOperations inOnlyTemplate;
+    private JmsOperations inOutTemplate;
+    private UuidGenerator uuidGenerator;
 
-    public JmsProducer(JmsEndpoint endpoint, JmsOperations template) {
+    public JmsProducer(JmsEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
-        this.template = template;
     }
 
     public void process(final Exchange exchange) {
-        template.send(endpoint.getDestination(), new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                Message message = endpoint.getBinding().makeJmsMessage(exchange, session);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(endpoint + " sending JMS message: " + message);
+        final org.apache.camel.Message in = exchange.getIn();
+
+        if (exchange.getPattern().isOutCapable()) {
+            // create a temporary queue and consumer for responses...
+            // note due to JMS transaction semantics we cannot use a single transaction
+            // for sending the request and receiving the response
+            Requestor requestor;
+            try {
+                requestor = endpoint.getRequestor();
+            }
+            catch (Exception e) {
+                throw new RuntimeExchangeException(e, exchange);
+            }
+
+            final Destination replyTo = requestor.getReplyTo();
+
+            String correlationId = in.getHeader("JMSCorrelationID", String.class);
+            if (correlationId == null) {
+                correlationId = getUuidGenerator().generateId();
+                in.setHeader("JMSCorrelationID", correlationId);
+            }
+
+            getInOutTemplate().send(endpoint.getDestination(), new MessageCreator() {
+                public Message createMessage(Session session) throws JMSException {
+                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+                    message.setJMSReplyTo(replyTo);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(endpoint + " sending JMS message: " + message);
+                    }
+                    return message;
+                }
+            });
+
+            // lets wait and return the response
+            long requestTimeout = endpoint.getRequestTimeout();
+            FutureTask future = requestor.getReceiveFuture(correlationId, requestTimeout);
+
+            try {
+                Message message;
+                if (requestTimeout < 0) {
+                    message = (Message) future.get();
+                }
+                else {
+                    message = (Message) future.get(requestTimeout, TimeUnit.MILLISECONDS);
+                }
+                if (message != null) {
+                    exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
                 }
-                return message;
+                else {
+                    // lets set a timed out exception
+                    exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
+                }
+            }
+            catch (Exception e) {
+                exchange.setException(e);
             }
-        });
+        }
+        else {
+            getInOnlyTemplate().send(endpoint.getDestination(), new MessageCreator() {
+                public Message createMessage(Session session) throws JMSException {
+                    Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(endpoint + " sending JMS message: " + message);
+                    }
+                    return message;
+                }
+            });
+        }
+    }
+
+    public JmsOperations getInOnlyTemplate() {
+        if (inOnlyTemplate == null) {
+            inOnlyTemplate = endpoint.createInOnlyTemplate();
+        }
+        return inOnlyTemplate;
+    }
+
+    public void setInOnlyTemplate(JmsOperations inOnlyTemplate) {
+        this.inOnlyTemplate = inOnlyTemplate;
+    }
+
+    public JmsOperations getInOutTemplate() {
+        if (inOutTemplate == null) {
+            inOutTemplate = endpoint.createInOutTemplate();
+        }
+        return inOutTemplate;
+    }
+
+    public void setInOutTemplate(JmsOperations inOutTemplate) {
+        this.inOutTemplate = inOutTemplate;
+    }
+
+    public UuidGenerator getUuidGenerator() {
+        if (uuidGenerator == null) {
+            uuidGenerator = new UuidGenerator();
+        }
+        return uuidGenerator;
     }
 
-    public JmsOperations getTemplate() {
-        return template;
+    public void setUuidGenerator(UuidGenerator uuidGenerator) {
+        this.uuidGenerator = uuidGenerator;
     }
 }

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if a response message from an InOut could not be processed
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class FailedToProcessResponse extends RuntimeCamelException {
+    private final Message response;
+
+    public FailedToProcessResponse(Message response, JMSException e) {
+        super("Failed to process response: "+ e + ". Message: " + response, e);
+        this.response = response;
+    }
+
+    /**
+     * The response message which caused the exception
+     * 
+     * @return
+     */
+    public Message getResponse() {
+        return response;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A {@link FutureTask} which implements {@link ReplyHandler}
+ * so that it can be used as a handler for a correlation ID
+ *
+ * @version $Revision: 1.1 $
+ */
+public class FutureHandler extends FutureTask implements ReplyHandler {
+    private static final Callable EMPTY_CALLABLE = new Callable() {
+        public Object call() throws Exception {
+            return null;
+        }
+    };
+
+    public FutureHandler() {
+        super(EMPTY_CALLABLE);
+    }
+
+    public synchronized void set(Object result) {
+        super.set(result);
+    }
+
+    public boolean handle(Message message) throws JMSException {
+        set(message);
+        return true;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public interface ReplyHandler {
+    /**
+     * Processes the message, returning true if this is the last method of a lifecycle
+     * so that the handler can be discarded
+     */
+    boolean handle(Message message) throws JMSException;
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native