You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/10/18 20:41:49 UTC
svn commit: r586066 [1/2] - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/bean/
camel-core/src/main/java/org/apache/camel/util/
camel-core/src/test/java/org/apache/camel/compone...
Author: jstrachan
Date: Thu Oct 18 11:41:45 2007
New Revision: 586066
URL: http://svn.apache.org/viewvc?rev=586066&view=rev
Log:
added support for JMS InOut so we can do Spring Remoting over JMS via <proxy> and <export> for https://issues.apache.org/activemq/browse/CAMEL-184
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
- copied, changed from r585168, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (with props)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (with props)
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html
- copied, changed from r585168, activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java (with props)
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/
activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
activemq/camel/trunk/components/camel-jms/pom.xml
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java
activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties
activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java
activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu Oct 18 11:41:45 2007
@@ -16,20 +16,19 @@
*/
package org.apache.camel;
-import org.apache.camel.spi.UnitOfWork;
-
import java.util.Map;
+import org.apache.camel.spi.UnitOfWork;
+
/**
* The base message exchange interface providing access to the request, response
* and fault {@link Message} instances. Different providers such as JMS, JBI,
* CXF and HTTP can provide their own derived API to expose the underlying
* transport semantics to avoid the leaky abstractions of generic APIs.
- *
+ *
* @version $Revision$
*/
public interface Exchange {
-
/**
* Returns the {@link ExchangePattern} (MEP) of this exchange.
*
@@ -39,7 +38,7 @@
/**
* Returns a property associated with this exchange by name
- *
+ *
* @param name the name of the property
* @return the value of the given header or null if there is no property for
* the given name
@@ -49,7 +48,7 @@
/**
* Returns a property associated with this exchange by name and specifying
* the type required
- *
+ *
* @param name the name of the property
* @param type the type of the property
* @return the value of the given header or null if there is no property for
@@ -60,8 +59,8 @@
/**
* Sets a property on the exchange
- *
- * @param name of the property
+ *
+ * @param name of the property
* @param value to associate with the name
*/
void setProperty(String name, Object value);
@@ -76,24 +75,31 @@
/**
* Returns all of the properties associated with the exchange
- *
+ *
* @return all the headers in a Map
*/
Map<String, Object> getProperties();
/**
* Returns the inbound request message
- *
+ *
* @return the message
*/
Message getIn();
/**
+ * Sets the inbound message instance
+ *
+ * @param in the inbound message
+ */
+ void setIn(Message in);
+
+ /**
* Returns the outbound message, lazily creating one if one has not already
* been associated with this exchange. If you want to inspect this property
* but not force lazy creation then invoke the {@link #getOut(boolean)}
* method passing in null
- *
+ *
* @return the response
*/
Message getOut();
@@ -101,14 +107,21 @@
/**
* Returns the outbound message; optionally lazily creating one if one has
* not been associated with this exchange
- *
+ *
* @return the response
*/
Message getOut(boolean lazyCreate);
/**
+ * Sets the outbound message
+ *
+ * @param out the outbound message
+ */
+ void setOut(Message out);
+
+ /**
* Returns the fault message
- *
+ *
* @return the fault
*/
Message getFault();
@@ -123,14 +136,14 @@
/**
* Returns the exception associated with this exchange
- *
+ *
* @return the exception (or null if no faults)
*/
Throwable getException();
/**
* Sets the exception associated with this exchange
- *
+ *
* @param e
*/
void setException(Throwable e);
@@ -138,22 +151,22 @@
/**
* Returns true if this exchange failed due to either an exception or fault
*
+ * @return true if this exchange failed due to either an exception or fault
* @see Exchange#getException()
* @see Exchange#getFault()
- * @return true if this exchange failed due to either an exception or fault
*/
boolean isFailed();
/**
* Returns the container so that a processor can resolve endpoints from URIs
- *
+ *
* @return the container which owns this exchange
*/
CamelContext getContext();
/**
* Creates a new exchange instance with empty messages, headers and properties
- *
+ *
* @return
*/
Exchange newInstance();
@@ -166,7 +179,7 @@
/**
* Copies the data into this exchange from the given exchange
- *
+ * <p/>
* #param source is the source from which headers and messages will be
* copied
*/
@@ -197,5 +210,4 @@
* @param id
*/
void setExchangeId(String id);
-
}
Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java (from r585168, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java&r1=585168&r2=586066&rev=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java Thu Oct 18 11:41:45 2007
@@ -17,21 +17,22 @@
package org.apache.camel;
/**
+ * An exception thrown if an InOut exchange times out receiving the OUT message
+ *
* @version $Revision: 1.1 $
*/
-public class InvalidPayloadException extends CamelExchangeException {
- private final Class<?> type;
+public class ExchangeTimedOutException extends CamelExchangeException {
+ private final long timeout;
- public InvalidPayloadException(Exchange exchange, Class<?> type) {
- super("No in body available of type: " + type.getName()
- + NoSuchPropertyException.valueDescription(exchange.getIn().getBody()), exchange);
- this.type = type;
+ public ExchangeTimedOutException(Exchange exchange, long timeout) {
+ super("The OUT message was not received within: " + timeout + " millis", exchange);
+ this.timeout = timeout;
}
/**
- * The expected type of the body
+ * Return the timeout which expired in milliseconds
*/
- public Class<?> getType() {
- return type;
+ public long getTimeout() {
+ return timeout;
}
-}
+}
\ No newline at end of file
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/BeanInvocation.java Thu Oct 18 11:41:45 2007
@@ -16,23 +16,40 @@
*/
package org.apache.camel.component.bean;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Arrays;
import org.apache.camel.Exchange;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
-public class BeanInvocation {
+public class BeanInvocation implements Externalizable {
+ private Object[] args;
+ private MethodBean methodBean;
+ private transient Method method;
- private final Object proxy;
- private final Method method;
- private final Object[] args;
+ public BeanInvocation() {
+ }
- public BeanInvocation(Object proxy, Method method, Object[] args) {
- this.proxy = proxy;
+ public BeanInvocation(Method method, Object[] args) {
this.method = method;
this.args = args;
}
+ @Override
+ public String toString() {
+ Object list = null;
+ if (args != null) {
+ list = Arrays.asList(args);
+ }
+ return "BeanInvocation " + method + " with " + list + "]";
+ }
+
public Object[] getArgs() {
return args;
}
@@ -41,26 +58,52 @@
return method;
}
- public Object getProxy() {
- return proxy;
+ public void setMethod(Method method) {
+ this.method = method;
+ }
+
+ public void setArgs(Object[] args) {
+ this.args = args;
}
/**
* This causes us to invoke the endpoint Pojo using reflection.
- *
- * @param pojo
+ *
+ * @param pojo the bean on which to perform this invocation
+ * @param exchange the exchange carrying the method invocation
*/
public void invoke(Object pojo, Exchange exchange) {
try {
Object response = getMethod().invoke(pojo, getArgs());
exchange.getOut().setBody(response);
- } catch (InvocationTargetException e) {
+ }
+ catch (InvocationTargetException e) {
exchange.setException(e.getCause());
- } catch (RuntimeException e) {
+ }
+ catch (RuntimeException e) {
throw e;
- } catch (Throwable e) {
+ }
+ catch (Throwable e) {
throw new RuntimeException(e);
}
}
+ public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
+ methodBean = ObjectHelper.cast(MethodBean.class, objectInput.readObject());
+ try {
+ method = methodBean.getMethod();
+ }
+ catch (NoSuchMethodException e) {
+ throw IOHelper.createIOException(e);
+ }
+ args = ObjectHelper.cast(Object[].class, objectInput.readObject());
+ }
+
+ public void writeExternal(ObjectOutput objectOutput) throws IOException {
+ if (methodBean == null) {
+ methodBean = new MethodBean(method);
+ }
+ objectOutput.writeObject(methodBean);
+ objectOutput.writeObject(args);
+ }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/CamelInvocationHandler.java Thu Oct 18 11:41:45 2007
@@ -21,8 +21,8 @@
import java.lang.reflect.Method;
import org.apache.camel.Endpoint;
-import org.apache.camel.Producer;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.Producer;
/**
* An {@link java.lang.reflect.InvocationHandler} which invokes a
@@ -40,7 +40,7 @@
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- BeanInvocation invocation = new BeanInvocation(proxy, method, args);
+ BeanInvocation invocation = new BeanInvocation(method, args);
BeanExchange exchange = new BeanExchange(endpoint.getContext(), ExchangePattern.InOut);
exchange.setInvocation(invocation);
@@ -49,6 +49,7 @@
if (fault != null) {
throw new InvocationTargetException(fault);
}
- return exchange.getOut().getBody();
+ return exchange.getOut(true).getBody();
}
}
+
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+
+/**
+ * Represents a {@link Serializable} version of a {@link Method}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class MethodBean implements Serializable {
+ private String name;
+ private Class<?> type;
+ private Class<?>[] parameterTypes;
+
+ public MethodBean() {
+ }
+
+ public MethodBean(Method method) {
+ this.name = method.getName();
+ this.type = method.getDeclaringClass();
+ this.parameterTypes = method.getParameterTypes();
+ }
+
+ public Method getMethod() throws NoSuchMethodException {
+ return type.getMethod(name, parameterTypes);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Class<?>[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ public void setParameterTypes(Class<?>[] parameterTypes) {
+ this.parameterTypes = parameterTypes;
+ }
+
+ public Class<?> getType() {
+ return type;
+ }
+
+ public void setType(Class<?> type) {
+ this.type = type;
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/bean/MethodBean.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,193 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class DefaultTimeoutMap implements TimeoutMap, Runnable {
+
+ private static final Log log = LogFactory.getLog(DefaultTimeoutMap.class);
+
+ private Map map = new HashMap();
+ private SortedSet index = new TreeSet();
+ private ScheduledExecutorService executor;
+ private long purgePollTime;
+
+ public DefaultTimeoutMap() {
+ this(null, 1000L);
+ }
+
+ public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+ this.executor = executor;
+ this.purgePollTime = requestMapPollTimeMillis;
+ schedulePoll();
+ }
+
+ public Object get(Object key) {
+ TimeoutMapEntry entry = null;
+ synchronized (map) {
+ entry = (TimeoutMapEntry) map.get(key);
+ if (entry == null) {
+ return null;
+ }
+ index.remove(entry);
+ updateExpireTime(entry);
+ index.add(entry);
+ }
+ return entry.getValue();
+ }
+
+ public void put(Object key, Object value, long timeoutMillis) {
+ TimeoutMapEntry entry = new TimeoutMapEntry(key, value, timeoutMillis);
+ synchronized (map) {
+ Object oldValue = map.put(key, entry);
+ if (oldValue != null) {
+ index.remove(oldValue);
+ }
+ updateExpireTime(entry);
+ index.add(entry);
+ }
+ }
+
+ public void remove(Object id) {
+ synchronized (map) {
+ TimeoutMapEntry entry = (TimeoutMapEntry) map.remove(id);
+ if (entry != null) {
+ index.remove(entry);
+ }
+ }
+ }
+
+ /**
+ * Returns a copy of the keys in the map
+ */
+ public Object[] getKeys() {
+ Object[] keys = null;
+ synchronized (map) {
+ Set keySet = map.keySet();
+ keys = new String[keySet.size()];
+ keySet.toArray(keys);
+ }
+ return keys;
+ }
+
+ /**
+ * The timer task which purges old requests and schedules another poll
+ */
+ public void run() {
+ purge();
+ schedulePoll();
+ }
+
+ /**
+ * Purges any old entries from the map
+ */
+ public void purge() {
+ long now = currentTime();
+ synchronized (map) {
+ for (Iterator iter = index.iterator(); iter.hasNext();) {
+ TimeoutMapEntry entry = (TimeoutMapEntry) iter.next();
+ if (entry == null) {
+ break;
+ }
+ if (entry.getExpireTime() < now) {
+ if (isValidForEviction(entry)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Evicting inactive request for correlationID: " + entry);
+ }
+ System.out.println("Evicting inactive request for correlationID: " + entry);
+ map.remove(entry.getKey());
+ iter.remove();
+ }
+ }
+ else {
+ break;
+ }
+ }
+ }
+ }
+
+ // Properties
+ // -------------------------------------------------------------------------
+ public long getPurgePollTime() {
+ return purgePollTime;
+ }
+
+ /**
+ * Sets the next purge poll time in milliseconds
+ */
+ public void setPurgePollTime(long purgePollTime) {
+ this.purgePollTime = purgePollTime;
+ }
+
+ public ScheduledExecutorService getExecutor() {
+ return executor;
+ }
+
+ /**
+ * Sets the executor used to schedule purge events of inactive requests
+ */
+ public void setExecutor(ScheduledExecutorService executor) {
+ this.executor = executor;
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+
+ /**
+ * lets schedule each time to allow folks to change the time at runtime
+ */
+ protected void schedulePoll() {
+ if (executor != null) {
+ executor.schedule(this, purgePollTime, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * A hook to allow derivations to avoid evicting the current entry
+ *
+ * @param entry
+ * @return
+ */
+ protected boolean isValidForEviction(TimeoutMapEntry entry) {
+ return true;
+ }
+
+ protected void updateExpireTime(TimeoutMapEntry entry) {
+ long now = currentTime();
+ entry.setExpireTime(entry.getTimeout() + now);
+ }
+
+ protected long currentTime() {
+ return System.currentTimeMillis();
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,55 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+/**
+ * Represents a thread safe map of values which timeout after a period of
+ * inactivity.
+ *
+ * @version $Revision: $
+ */
+public interface TimeoutMap extends Runnable {
+
+ /**
+ * Looks up the value in the map by the given key.
+ *
+ * @param key
+ * the key of the value to search for
+ * @return the value for the given key or null if it is not present (or has
+ * timed out)
+ */
+ Object get(Object key);
+
+ /**
+ * Returns a copy of the keys in the map
+ */
+ Object[] getKeys();
+
+ /**
+ * Adds the key value pair into the map such that some time after the given
+ * timeout the entry will be evicted
+ */
+ void put(Object key, Object value, long timeoutMillis);
+
+ void remove(Object key);
+
+ /**
+ * Purges any old entries from the map
+ */
+ public void purge();
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.util.Map;
+
+/**
+ * Represents an entry in a {@link TimeoutMap}
+ *
+ * @version $Revision: $
+ */
+public class TimeoutMapEntry implements Comparable, Map.Entry {
+ private Object key;
+ private Object value;
+ private long timeout;
+ private long expireTime;
+
+ public TimeoutMapEntry(Object id, Object handler, long timeout) {
+ this.key = id;
+ this.value = handler;
+ this.timeout = timeout;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ public long getExpireTime() {
+ return expireTime;
+ }
+
+ public void setExpireTime(long expireTime) {
+ this.expireTime = expireTime;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public Object setValue(Object value) {
+ Object oldValue = value;
+ this.value = value;
+ return oldValue;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public int compareTo(Object that) {
+ if (this == that) {
+ return 0;
+ }
+ if (that instanceof TimeoutMapEntry) {
+ return compareTo((TimeoutMapEntry) that);
+ }
+ return 1;
+ }
+
+ public int compareTo(TimeoutMapEntry that) {
+ long diff = this.expireTime - that.expireTime;
+ if (diff > 0) {
+ return 1;
+ }
+ else if (diff < 0) {
+ return -1;
+ }
+ return this.key.hashCode() - that.key.hashCode();
+ }
+
+ public String toString() {
+ return "Entry for key: " + key;
+ }
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.bean;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+
+import org.apache.camel.TestSupport;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class BeanInvocationSerializeTest extends TestSupport {
+ public void testSerialize() throws Exception {
+ Method method = getClass().getMethod("cheese", String.class, String.class);
+ BeanInvocation invocation = new BeanInvocation(method, new Object[] { "a", "b" });
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(buffer);
+ out.writeObject(invocation);
+ out.close();
+
+ ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray()));
+ Object object = in.readObject();
+ BeanInvocation actual = assertIsInstanceOf(BeanInvocation.class, object);
+ System.out.println("Received " + actual);
+ }
+
+ public void cheese(String a, String b) {
+ System.out.println("Called with a: " + a + " b: " + b);
+
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/bean/BeanInvocationSerializeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-jms/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/pom.xml?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/pom.xml (original)
+++ activemq/camel/trunk/components/camel-jms/pom.xml Thu Oct 18 11:41:45 2007
@@ -64,6 +64,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>apache-activemq</artifactId>
<scope>test</scope>
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java Thu Oct 18 11:41:45 2007
@@ -16,52 +16,72 @@
*/
package org.apache.camel.component.jms;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.Session;
-import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.MessageCreator;
/**
* A JMS {@link MessageListener} which can be used to delegate processing to a
* Camel endpoint.
- *
+ *
* @version $Revision$
*/
-public class EndpointMessageListener<E extends Exchange> implements MessageListener {
+public class EndpointMessageListener implements MessageListener {
private static final transient Log LOG = LogFactory.getLog(EndpointMessageListener.class);
private JmsEndpoint endpoint;
private Processor processor;
private JmsBinding binding;
private boolean eagerLoadingOfProperties;
+ private Destination replyToDestination;
+ private JmsOperations template;
+ private boolean disableReplyTo;
public EndpointMessageListener(JmsEndpoint endpoint, Processor processor) {
this.endpoint = endpoint;
this.processor = processor;
}
- public void onMessage(Message message) {
+ public void onMessage(final Message message) {
try {
-
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " receiving JMS message: " + message);
}
- JmsExchange exchange = createExchange(message);
+ Destination replyDestination = getReplyToDestination(message);
+ final JmsExchange exchange = createExchange(message, replyDestination);
if (eagerLoadingOfProperties) {
exchange.getIn().getHeaders();
}
processor.process(exchange);
- } catch (Exception e) {
+ final JmsMessage out = exchange.getOut(false);
+ if (out != null && !disableReplyTo) {
+ sendReply(replyDestination, message, exchange, out);
+ }
+ }
+ catch (Exception e) {
throw new RuntimeCamelException(e);
}
}
- public JmsExchange createExchange(Message message) {
- return new JmsExchange(endpoint.getContext(), endpoint.getExchangePattern(), getBinding(), message);
+ public JmsExchange createExchange(Message message, Destination replyDestination) {
+ JmsExchange exchange = new JmsExchange(endpoint.getContext(), endpoint.getExchangePattern(), getBinding(), message);
+ // lets set to an InOut if we have some kind of reply-to destination
+ if (replyDestination != null) {
+ exchange.setProperty("org.apache.camel.jms.replyDestination", replyDestination);
+ exchange.setPattern(ExchangePattern.InOut);
+ }
+ return exchange;
}
// Properties
@@ -76,7 +96,7 @@
/**
* Sets the binding used to convert from a Camel message to and from a JMS
* message
- *
+ *
* @param binding the binding to use
*/
public void setBinding(JmsBinding binding) {
@@ -89,5 +109,76 @@
public void setEagerLoadingOfProperties(boolean eagerLoadingOfProperties) {
this.eagerLoadingOfProperties = eagerLoadingOfProperties;
+ }
+
+ public JmsOperations getTemplate() {
+ if (template == null) {
+ template = endpoint.createInOnlyTemplate();
+ }
+ return template;
+ }
+
+ public void setTemplate(JmsOperations template) {
+ this.template = template;
+ }
+
+ public boolean isDisableReplyTo() {
+ return disableReplyTo;
+ }
+
+ /**
+ * Allows the reply-to behaviour to be disabled
+ */
+ public void setDisableReplyTo(boolean disableReplyTo) {
+ this.disableReplyTo = disableReplyTo;
+ }
+
+ public Destination getReplyToDestination() {
+ return replyToDestination;
+ }
+
+ /**
+ * Provides an explicit reply to destination which overrides
+ * any incoming value of {@link Message#getJMSReplyTo()}
+ *
+ * @param replyToDestination the destination that should be used to send replies to
+ */
+ public void setReplyToDestination(Destination replyToDestination) {
+ this.replyToDestination = replyToDestination;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ protected void sendReply(Destination replyDestination, final Message message, final JmsExchange exchange, final JmsMessage out) {
+ if (replyDestination == null) {
+ LOG.warn("Cannot send reply message as there is no replyDestination for: " + out);
+ return;
+ }
+ getTemplate().send(replyDestination, new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session);
+
+ // lets preserve any correlation ID
+ String correlationID = message.getJMSCorrelationID();
+ if (correlationID != null) {
+ reply.setJMSCorrelationID(correlationID);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " sending reply JMS message: " + reply);
+ }
+ return reply;
+ }
+ });
+ }
+
+ protected Destination getReplyToDestination(Message message) throws JMSException {
+ // lets send a response back if we can
+ Destination destination = replyToDestination;
+ if (destination == null) {
+ destination = message.getJMSReplyTo();
+ }
+ return destination;
}
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Thu Oct 18 11:41:45 2007
@@ -51,7 +51,7 @@
* @param exchange
* @param message
*/
- public Object extractBodyFromJms(JmsExchange exchange, Message message) {
+ public Object extractBodyFromJms(Exchange exchange, Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage)message;
@@ -79,15 +79,15 @@
* @return a newly created JMS Message instance containing the
* @throws JMSException if the message could not be created
*/
- public Message makeJmsMessage(Exchange exchange, Session session) throws JMSException {
+ public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message camelMessage, Session session) throws JMSException {
Message answer = null;
- if( exchange instanceof JmsExchange ) {
- JmsExchange jmsExchange = (JmsExchange)exchange;
- answer = jmsExchange.getIn().getJmsMessage();
+ if( camelMessage instanceof JmsMessage ) {
+ JmsMessage jmsMessage = (JmsMessage)camelMessage;
+ answer = jmsMessage.getJmsMessage();
}
if( answer == null ) {
- answer = createJmsMessage(exchange.getIn().getBody(), session);
- appendJmsProperties(answer, exchange);
+ answer = createJmsMessage(camelMessage.getBody(), session);
+ appendJmsProperties(answer, exchange, camelMessage);
}
return answer;
}
@@ -95,8 +95,7 @@
/**
* Appends the JMS headers from the Camel {@link JmsMessage}
*/
- public void appendJmsProperties(Message jmsMessage, Exchange exchange) throws JMSException {
- org.apache.camel.Message in = exchange.getIn();
+ public void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message in) throws JMSException {
Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
String headerName = entry.getKey();
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Thu Oct 18 11:41:45 2007
@@ -16,38 +16,39 @@
*/
package org.apache.camel.component.jms;
+import java.util.Map;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultComponent;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
-
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.connection.JmsTransactionManager;
+import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.listener.serversession.ServerSessionFactory;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
-import org.springframework.jms.connection.JmsTransactionManager;
-import org.springframework.jms.core.JmsOperations;
import org.springframework.transaction.PlatformTransactionManager;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.Session;
-import java.util.Map;
-
/**
* A <a href="http://activemq.apache.org/jms.html">JMS Component</a>
- *
+ *
* @version $Revision:520964 $
*/
public class JmsComponent extends DefaultComponent<JmsExchange> implements ApplicationContextAware {
public static final String QUEUE_PREFIX = "queue:";
public static final String TOPIC_PREFIX = "topic:";
-
private JmsConfiguration configuration;
- private ApplicationContext applicationContext;
+ private ApplicationContext applicationContext;
+ private Requestor requestor;
public JmsComponent() {
}
@@ -59,7 +60,7 @@
public JmsComponent(CamelContext context) {
super(context);
}
-
+
/**
* Static builder method
*/
@@ -111,48 +112,24 @@
template.setTransacted(true);
return jmsComponent(template);
}
-
- @Override
- protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-
- boolean pubSubDomain = false;
- if (remaining.startsWith(QUEUE_PREFIX)) {
- pubSubDomain = false;
- remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
- } else if (remaining.startsWith(TOPIC_PREFIX)) {
- pubSubDomain = true;
- remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
- }
-
- final String subject = convertPathToActualDestination(remaining);
-
- // lets make sure we copy the configuration as each endpoint can
- // customize its own version
- JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
-
- String selector = (String)parameters.remove("selector");
- if (selector != null) {
- endpoint.setSelector(selector);
- }
- setProperties(endpoint.getConfiguration(), parameters);
- return endpoint;
- }
+ // Properties
+ //-------------------------------------------------------------------------
public JmsConfiguration getConfiguration() {
if (configuration == null) {
configuration = createConfiguration();
-
+
// If we are being configured with spring...
- if( applicationContext !=null ) {
- Map beansOfType = applicationContext.getBeansOfType(ConnectionFactory.class);
- if( !beansOfType.isEmpty() ) {
- ConnectionFactory cf = (ConnectionFactory) beansOfType.values().iterator().next();
- configuration.setConnectionFactory(cf);
- }
+ if (applicationContext != null) {
+ Map beansOfType = applicationContext.getBeansOfType(ConnectionFactory.class);
+ if (!beansOfType.isEmpty()) {
+ ConnectionFactory cf = (ConnectionFactory) beansOfType.values().iterator().next();
+ configuration.setConnectionFactory(cf);
+ }
beansOfType = applicationContext.getBeansOfType(DestinationResolver.class);
- if( !beansOfType.isEmpty() ) {
+ if (!beansOfType.isEmpty()) {
DestinationResolver destinationResolver = (DestinationResolver) beansOfType.values().iterator().next();
- configuration.setDestinationResolver(destinationResolver);
+ configuration.setDestinationResolver(destinationResolver);
}
}
}
@@ -161,7 +138,7 @@
/**
* Sets the JMS configuration
- *
+ *
* @param configuration the configuration to use by default for endpoints
*/
public void setConfiguration(JmsConfiguration configuration) {
@@ -303,7 +280,7 @@
public void setUseVersion102(boolean useVersion102) {
getConfiguration().setUseVersion102(useVersion102);
}
-
+
public void setJmsOperations(JmsOperations jmsOperations) {
getConfiguration().setJmsOperations(jmsOperations);
}
@@ -312,6 +289,60 @@
getConfiguration().setDestinationResolver(destinationResolver);
}
+ public Requestor getRequestor() throws Exception {
+ if (requestor == null) {
+ requestor = new Requestor(getConfiguration(), getExecutorService());
+ requestor.start();
+ }
+ return requestor;
+ }
+
+ public void setRequestor(Requestor requestor) {
+ this.requestor = requestor;
+ }
+
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ @Override
+ protected void doStop() throws Exception {
+ if (requestor != null) {
+ requestor.stop();
+ }
+ super.doStop();
+ }
+
+ @Override
+ protected Endpoint<JmsExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+
+ boolean pubSubDomain = false;
+ if (remaining.startsWith(QUEUE_PREFIX)) {
+ pubSubDomain = false;
+ remaining = removeStartingCharacters(remaining.substring(QUEUE_PREFIX.length()), '/');
+ }
+ else if (remaining.startsWith(TOPIC_PREFIX)) {
+ pubSubDomain = true;
+ remaining = removeStartingCharacters(remaining.substring(TOPIC_PREFIX.length()), '/');
+ }
+
+ final String subject = convertPathToActualDestination(remaining);
+
+ // lets make sure we copy the configuration as each endpoint can
+ // customize its own version
+ JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
+
+ String selector = (String) parameters.remove("selector");
+ if (selector != null) {
+ endpoint.setSelector(selector);
+ }
+ setProperties(endpoint.getConfiguration(), parameters);
+ return endpoint;
+ }
+
/**
* A strategy method allowing the URI destination to be translated into the
* actual JMS destination name (say by looking up in JNDI or something)
@@ -322,16 +353,11 @@
/**
* Factory method to create the default configuration instance
- *
+ *
* @return a newly created configuration object which can then be further
* customized
*/
protected JmsConfiguration createConfiguration() {
return new JmsConfiguration();
}
-
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
- }
-
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Thu Oct 18 11:41:45 2007
@@ -51,7 +51,6 @@
protected static final String CLIENT_ACKNOWLEDGE = "CLIENT_ACKNOWLEDGE";
protected static final String AUTO_ACKNOWLEDGE = "AUTO_ACKNOWLEDGE";
protected static final String DUPS_OK_ACKNOWLEDGE = "DUPS_OK_ACKNOWLEDGE";
-
private JmsOperations jmsOperations;
private DestinationResolver destinationResolver;
private ConnectionFactory connectionFactory;
@@ -94,6 +93,7 @@
private String transactionName;
private int transactionTimeout = -1;
private boolean preserveMessageQos;
+ private long requestMapPurgePollTimeMillis = 1000L;
public JmsConfiguration() {
}
@@ -107,18 +107,32 @@
*/
public JmsConfiguration copy() {
try {
- return (JmsConfiguration)clone();
- } catch (CloneNotSupportedException e) {
+ return (JmsConfiguration) clone();
+ }
+ catch (CloneNotSupportedException e) {
throw new RuntimeCamelException(e);
}
}
- public JmsOperations createJmsOperations(boolean pubSubDomain, String destination) {
-
- if ( jmsOperations !=null ) {
+ /**
+ * Creates a JmsOperations object used for request/response using a request timeout value
+ */
+ public JmsOperations createInOutTemplate(boolean pubSubDomain, String destination, long requestTimeout) {
+ JmsOperations answer = createInOnlyTemplate(pubSubDomain, destination);
+ if (answer instanceof JmsTemplate && requestTimeout > 0) {
+ JmsTemplate jmsTemplate = (JmsTemplate) answer;
+ jmsTemplate.setExplicitQosEnabled(true);
+ jmsTemplate.setTimeToLive(requestTimeout);
+ }
+ return answer;
+ }
+
+ public JmsOperations createInOnlyTemplate(boolean pubSubDomain, String destination) {
+
+ if (jmsOperations != null) {
return jmsOperations;
}
-
+
ConnectionFactory factory = getTemplateConnectionFactory();
// I whish the spring templates had built in support for preserving the message
@@ -141,11 +155,13 @@
}
}
if (isPubSubDomain()) {
- ((TopicPublisher)producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
- } else {
- ((QueueSender)producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+ ((TopicPublisher) producer).publish(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
+ }
+ else {
+ ((QueueSender) producer).send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
}
- } else {
+ }
+ else {
super.doSend(producer, message);
}
}
@@ -167,18 +183,19 @@
}
}
producer.send(message, message.getJMSDeliveryMode(), message.getJMSPriority(), ttl);
- } else {
+ }
+ else {
super.doSend(producer, message);
}
}
};
-
+
template.setPubSubDomain(pubSubDomain);
- if( destinationResolver!=null ) {
+ if (destinationResolver != null) {
template.setDestinationResolver(destinationResolver);
}
template.setDefaultDestinationName(destination);
-
+
template.setExplicitQosEnabled(explicitQosEnabled);
template.setDeliveryPersistent(deliveryPersistent);
if (messageConverter != null) {
@@ -206,7 +223,8 @@
// for receiving messages.
if (acknowledgementMode >= 0) {
template.setSessionAcknowledgeMode(acknowledgementMode);
- } else if (acknowledgementModeName != null) {
+ }
+ else if (acknowledgementModeName != null) {
template.setSessionAcknowledgeModeName(acknowledgementModeName);
}
}
@@ -221,7 +239,7 @@
protected void configureMessageListenerContainer(AbstractMessageListenerContainer container) {
container.setConnectionFactory(getListenerConnectionFactory());
- if( destinationResolver!=null ) {
+ if (destinationResolver != null) {
container.setDestinationResolver(destinationResolver);
}
if (autoStartup) {
@@ -255,21 +273,23 @@
else {
if (acknowledgementMode >= 0) {
container.setSessionAcknowledgeMode(acknowledgementMode);
- } else if (acknowledgementModeName != null) {
+ }
+ else if (acknowledgementModeName != null) {
container.setSessionAcknowledgeModeName(acknowledgementModeName);
}
}
if (container instanceof DefaultMessageListenerContainer) {
// this includes DefaultMessageListenerContainer102
- DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer)container;
+ DefaultMessageListenerContainer listenerContainer = (DefaultMessageListenerContainer) container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
if (cacheLevel >= 0) {
listenerContainer.setCacheLevel(cacheLevel);
- } else if (cacheLevelName != null) {
+ }
+ else if (cacheLevelName != null) {
listenerContainer.setCacheLevelName(cacheLevelName);
}
@@ -304,18 +324,20 @@
if (transactionTimeout >= 0) {
listenerContainer.setTransactionTimeout(transactionTimeout);
}
- } else if (container instanceof ServerSessionMessageListenerContainer) {
+ }
+ else if (container instanceof ServerSessionMessageListenerContainer) {
// this includes ServerSessionMessageListenerContainer102
- ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer)container;
+ ServerSessionMessageListenerContainer listenerContainer = (ServerSessionMessageListenerContainer) container;
if (maxMessagesPerTask >= 0) {
listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
}
if (serverSessionFactory != null) {
listenerContainer.setServerSessionFactory(serverSessionFactory);
}
- } else if (container instanceof SimpleMessageListenerContainer) {
+ }
+ else if (container instanceof SimpleMessageListenerContainer) {
// this includes SimpleMessageListenerContainer102
- SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer)container;
+ SimpleMessageListenerContainer listenerContainer = (SimpleMessageListenerContainer) container;
if (concurrentConsumers >= 0) {
listenerContainer.setConcurrentConsumers(concurrentConsumers);
}
@@ -340,7 +362,7 @@
* not specified for either
* {@link #setTemplateConnectionFactory(ConnectionFactory)} or
* {@link #setListenerConnectionFactory(ConnectionFactory)}
- *
+ *
* @param connectionFactory the default connection factory to use
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
@@ -357,9 +379,9 @@
/**
* Sets the connection factory to be used for consuming messages via the
* {@link #createMessageListenerContainer()}
- *
+ *
* @param listenerConnectionFactory the connection factory to use for
- * consuming messages
+ * consuming messages
*/
public void setListenerConnectionFactory(ConnectionFactory listenerConnectionFactory) {
this.listenerConnectionFactory = listenerConnectionFactory;
@@ -374,10 +396,10 @@
/**
* Sets the connection factory to be used for sending messages via the
- * {@link JmsTemplate} via {@link #createJmsOperations(boolean, String)}
- *
+ * {@link JmsTemplate} via {@link #createInOnlyTemplate(boolean, String)}
+ *
* @param templateConnectionFactory the connection factory for sending
- * messages
+ * messages
*/
public void setTemplateConnectionFactory(ConnectionFactory templateConnectionFactory) {
this.templateConnectionFactory = templateConnectionFactory;
@@ -654,14 +676,14 @@
protected AbstractMessageListenerContainer chooseMessageListenerContainerImplementation() {
// TODO we could allow a spring container to auto-inject these objects?
switch (consumerType) {
- case Simple:
- return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
- case ServerSessionPool:
- return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
- case Default:
- return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
- default:
- throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
+ case Simple:
+ return isUseVersion102() ? new SimpleMessageListenerContainer102() : new SimpleMessageListenerContainer();
+ case ServerSessionPool:
+ return isUseVersion102() ? new ServerSessionMessageListenerContainer102() : new ServerSessionMessageListenerContainer();
+ case Default:
+ return isUseVersion102() ? new DefaultMessageListenerContainer102() : new DefaultMessageListenerContainer();
+ default:
+ throw new IllegalArgumentException("Unknown consumer type: " + consumerType);
}
}
@@ -695,10 +717,10 @@
}
/**
- * Set to true if you want to send message using the QoS settings specified
+ * Set to true if you want to send message using the QoS settings specified
* on the message. Normally the QoS settings used are the one configured
* on this Object.
- *
+ *
* @param preserveMessageQos
*/
public void setPreserveMessageQos(boolean preserveMessageQos) {
@@ -719,5 +741,19 @@
public void setDestinationResolver(DestinationResolver destinationResolver) {
this.destinationResolver = destinationResolver;
+ }
+
+ public long getRequestMapPurgePollTimeMillis() {
+ return requestMapPurgePollTimeMillis;
+ }
+
+ /**
+ * Sets the frequency that the requestMap for InOut exchanges is purged
+ * for timed out message exchanges
+ *
+ * @param requestMapPurgePollTimeMillis
+ */
+ public void setRequestMapPurgePollTimeMillis(long requestMapPurgePollTimeMillis) {
+ this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
}
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Thu Oct 18 11:41:45 2007
@@ -18,9 +18,9 @@
import javax.jms.MessageListener;
+import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-
import org.springframework.jms.listener.AbstractMessageListenerContainer;
/**
@@ -44,7 +44,7 @@
}
protected MessageListener createMessageListener(JmsEndpoint endpoint, Processor processor) {
- EndpointMessageListener<JmsExchange> messageListener = new EndpointMessageListener<JmsExchange>(endpoint, processor);
+ EndpointMessageListener messageListener = new EndpointMessageListener(endpoint, processor);
messageListener.setBinding(endpoint.getBinding());
return messageListener;
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Thu Oct 18 11:41:45 2007
@@ -18,49 +18,54 @@
import javax.jms.Message;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultEndpoint;
-
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
/**
* A <a href="http://activemq.apache.org/jms.html">JMS Endpoint</a>
- *
+ *
* @version $Revision:520964 $
*/
public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
+ private final JmsComponent component;
+ private final boolean pubSubDomain;
private JmsBinding binding;
private String destination;
- private final boolean pubSubDomain;
private String selector;
private JmsConfiguration configuration;
+ private Requestor requestor;
+ private long requestTimeout = 20000L;
public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
super(uri, component);
+ this.component = component;
this.configuration = configuration;
this.destination = destination;
this.pubSubDomain = pubSubDomain;
}
public JmsProducer createProducer() throws Exception {
- JmsOperations template = createJmsOperations();
- return createProducer(template);
+ return new JmsProducer(this);
}
/**
- * Creates a producer using the given template
+ * Creates a producer using the given template for InOnly message exchanges
*/
public JmsProducer createProducer(JmsOperations template) throws Exception {
+ JmsProducer answer = createProducer();
if (template instanceof JmsTemplate) {
- JmsTemplate jmsTemplate = (JmsTemplate)template;
+ JmsTemplate jmsTemplate = (JmsTemplate) template;
jmsTemplate.setPubSubDomain(pubSubDomain);
jmsTemplate.setDefaultDestinationName(destination);
}
- return new JmsProducer(this, template);
+ answer.setInOnlyTemplate(template);
+ return answer;
}
public JmsConsumer createConsumer(Processor processor) throws Exception {
@@ -70,8 +75,8 @@
/**
* Creates a consumer using the given processor and listener container
- *
- * @param processor the processor to use to process the messages
+ *
+ * @param processor the processor to use to process the messages
* @param listenerContainer the listener container
* @return a newly created consumer
* @throws Exception if the consumer cannot be created
@@ -87,7 +92,7 @@
@Override
public PollingConsumer<JmsExchange> createPollingConsumer() throws Exception {
- JmsOperations template = createJmsOperations();
+ JmsOperations template = createInOnlyTemplate();
return new JmsPollingConsumer(this, template);
}
@@ -100,6 +105,20 @@
return new JmsExchange(getContext(), getExchangePattern(), getBinding(), message);
}
+ /**
+ * Factory method for creating a new template for InOnly message exchanges
+ */
+ public JmsOperations createInOnlyTemplate() {
+ return configuration.createInOnlyTemplate(pubSubDomain, destination);
+ }
+
+ /**
+ * Factory method for creating a new template for InOut message exchanges
+ */
+ public JmsOperations createInOutTemplate() {
+ return configuration.createInOutTemplate(pubSubDomain, destination, getRequestTimeout());
+ }
+
// Properties
// -------------------------------------------------------------------------
public JmsBinding getBinding() {
@@ -112,7 +131,7 @@
/**
* Sets the binding used to convert from a Camel message to and from a JMS
* message
- *
+ *
* @param binding the binding to use
*/
public void setBinding(JmsBinding binding) {
@@ -142,8 +161,30 @@
return false;
}
- protected JmsOperations createJmsOperations() {
- return configuration.createJmsOperations(pubSubDomain, destination);
+ public Requestor getRequestor() throws Exception {
+ if (requestor == null) {
+ requestor = component.getRequestor();
+ }
+ return requestor;
+ }
+
+ public void setRequestor(Requestor requestor) {
+ this.requestor = requestor;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * Sets the timeout in milliseconds which requests should timeout after
+ *
+ * @param requestTimeout
+ */
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
}
+ // Implementation methods
+ //-------------------------------------------------------------------------
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java Thu Oct 18 11:41:45 2007
@@ -26,6 +26,7 @@
import javax.jms.Queue;
import javax.jms.Topic;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +39,7 @@
public class JmsMessage extends DefaultMessage {
private static final transient Log LOG = LogFactory.getLog(JmsMessage.class);
private Message jmsMessage;
+ private JmsBinding binding;
public JmsMessage() {
}
@@ -46,6 +48,11 @@
setJmsMessage(jmsMessage);
}
+ public JmsMessage(Message jmsMessage, JmsBinding binding) {
+ this(jmsMessage);
+ setBinding(binding);
+ }
+
@Override
public String toString() {
if (jmsMessage != null) {
@@ -64,6 +71,24 @@
return jmsMessage;
}
+ public JmsBinding getBinding() {
+ if (binding == null) {
+ Exchange exchange = getExchange();
+ if (exchange instanceof JmsExchange) {
+ JmsExchange jmsExchange = (JmsExchange) exchange;
+ return jmsExchange.getBinding();
+ }
+ else {
+ return new JmsBinding();
+ }
+ }
+ return binding;
+ }
+
+ public void setBinding(JmsBinding binding) {
+ this.binding = binding;
+ }
+
public void setJmsMessage(Message jmsMessage) {
this.jmsMessage = jmsMessage;
try {
@@ -100,9 +125,8 @@
@Override
protected Object createBody() {
- if (jmsMessage != null && getExchange() instanceof JmsExchange) {
- JmsExchange exchange = (JmsExchange)getExchange();
- return exchange.getBinding().extractBodyFromJms(exchange, jmsMessage);
+ if (jmsMessage != null) {
+ return getBinding().extractBodyFromJms(getExchange(), jmsMessage);
}
return null;
}
Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Thu Oct 18 11:41:45 2007
@@ -16,15 +16,22 @@
*/
package org.apache.camel.component.jms;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeTimedOutException;
+import org.apache.camel.RuntimeExchangeException;
+import org.apache.camel.component.jms.requestor.Requestor;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.UuidGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
@@ -34,27 +41,117 @@
public class JmsProducer extends DefaultProducer {
private static final transient Log LOG = LogFactory.getLog(JmsProducer.class);
private final JmsEndpoint endpoint;
- private final JmsOperations template;
+ private JmsOperations inOnlyTemplate;
+ private JmsOperations inOutTemplate;
+ private UuidGenerator uuidGenerator;
- public JmsProducer(JmsEndpoint endpoint, JmsOperations template) {
+ public JmsProducer(JmsEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
- this.template = template;
}
public void process(final Exchange exchange) {
- template.send(endpoint.getDestination(), new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- Message message = endpoint.getBinding().makeJmsMessage(exchange, session);
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " sending JMS message: " + message);
+ final org.apache.camel.Message in = exchange.getIn();
+
+ if (exchange.getPattern().isOutCapable()) {
+ // create a temporary queue and consumer for responses...
+ // note due to JMS transaction semantics we cannot use a single transaction
+ // for sending the request and receiving the response
+ Requestor requestor;
+ try {
+ requestor = endpoint.getRequestor();
+ }
+ catch (Exception e) {
+ throw new RuntimeExchangeException(e, exchange);
+ }
+
+ final Destination replyTo = requestor.getReplyTo();
+
+ String correlationId = in.getHeader("JMSCorrelationID", String.class);
+ if (correlationId == null) {
+ correlationId = getUuidGenerator().generateId();
+ in.setHeader("JMSCorrelationID", correlationId);
+ }
+
+ getInOutTemplate().send(endpoint.getDestination(), new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ message.setJMSReplyTo(replyTo);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " sending JMS message: " + message);
+ }
+ return message;
+ }
+ });
+
+ // lets wait and return the response
+ long requestTimeout = endpoint.getRequestTimeout();
+ FutureTask future = requestor.getReceiveFuture(correlationId, requestTimeout);
+
+ try {
+ Message message;
+ if (requestTimeout < 0) {
+ message = (Message) future.get();
+ }
+ else {
+ message = (Message) future.get(requestTimeout, TimeUnit.MILLISECONDS);
+ }
+ if (message != null) {
+ exchange.setOut(new JmsMessage(message, endpoint.getBinding()));
}
- return message;
+ else {
+ // lets set a timed out exception
+ exchange.setException(new ExchangeTimedOutException(exchange, requestTimeout));
+ }
+ }
+ catch (Exception e) {
+ exchange.setException(e);
}
- });
+ }
+ else {
+ getInOnlyTemplate().send(endpoint.getDestination(), new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ Message message = endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " sending JMS message: " + message);
+ }
+ return message;
+ }
+ });
+ }
+ }
+
+ public JmsOperations getInOnlyTemplate() {
+ if (inOnlyTemplate == null) {
+ inOnlyTemplate = endpoint.createInOnlyTemplate();
+ }
+ return inOnlyTemplate;
+ }
+
+ public void setInOnlyTemplate(JmsOperations inOnlyTemplate) {
+ this.inOnlyTemplate = inOnlyTemplate;
+ }
+
+ public JmsOperations getInOutTemplate() {
+ if (inOutTemplate == null) {
+ inOutTemplate = endpoint.createInOutTemplate();
+ }
+ return inOutTemplate;
+ }
+
+ public void setInOutTemplate(JmsOperations inOutTemplate) {
+ this.inOutTemplate = inOutTemplate;
+ }
+
+ public UuidGenerator getUuidGenerator() {
+ if (uuidGenerator == null) {
+ uuidGenerator = new UuidGenerator();
+ }
+ return uuidGenerator;
}
- public JmsOperations getTemplate() {
- return template;
+ public void setUuidGenerator(UuidGenerator uuidGenerator) {
+ this.uuidGenerator = uuidGenerator;
}
}
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.camel.RuntimeCamelException;
+
+/**
+ * An exception thrown if a response message from an InOut could not be processed
+ *
+ * @version $Revision: 1.1 $
+ */
+public class FailedToProcessResponse extends RuntimeCamelException {
+ private final Message response;
+
+ public FailedToProcessResponse(Message response, JMSException e) {
+ super("Failed to process response: "+ e + ". Message: " + response, e);
+ this.response = response;
+ }
+
+ /**
+ * The response message which caused the exception
+ *
+ * @return
+ */
+ public Message getResponse() {
+ return response;
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FailedToProcessResponse.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A {@link FutureTask} which implements {@link ReplyHandler}
+ * so that it can be used as a handler for a correlation ID
+ *
+ * @version $Revision: 1.1 $
+ */
+public class FutureHandler extends FutureTask implements ReplyHandler {
+ private static final Callable EMPTY_CALLABLE = new Callable() {
+ public Object call() throws Exception {
+ return null;
+ }
+ };
+
+ public FutureHandler() {
+ super(EMPTY_CALLABLE);
+ }
+
+ public synchronized void set(Object result) {
+ super.set(result);
+ }
+
+ public boolean handle(Message message) throws JMSException {
+ set(message);
+ return true;
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/FutureHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public interface ReplyHandler {
+ /**
+ * Processes the message, returning true if this is the last method of a lifecycle
+ * so that the handler can be discarded
+ */
+ boolean handle(Message message) throws JMSException;
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/ReplyHandler.java
------------------------------------------------------------------------------
svn:eol-style = native