You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/10/18 20:41:49 UTC
svn commit: r586066 [2/2] - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/bean/
camel-core/src/main/java/org/apache/camel/util/
camel-core/src/test/java/org/apache/camel/compone...
Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.DefaultTimeoutMap;
+import org.apache.camel.util.TimeoutMap;
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class Requestor extends ServiceSupport implements MessageListener {
+ private static final transient Log LOG = LogFactory.getLog(Requestor.class);
+ private final JmsConfiguration configuration;
+ private AbstractMessageListenerContainer listenerContainer;
+ private TimeoutMap requestMap;
+ private Destination replyTo;
+
+ public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
+ this.configuration = configuration;
+ requestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+ }
+
+ public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
+ FutureTask future = null;
+/*
+ // Deal with async handlers...
+
+ Object currentHandler = requestMap.get(correlationID);
+ if (currentHandler instanceof AsyncReplyHandler) {
+ AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
+ future = handler.newResultHandler();
+ }
+*/
+
+ if (future == null) {
+ FutureHandler futureHandler = new FutureHandler();
+ future = futureHandler;
+ requestMap.put(correlationID, futureHandler, requestTimeout);
+ }
+ return future;
+ }
+
+ public void onMessage(Message message) {
+ try {
+ String correlationID = message.getJMSCorrelationID();
+ if (correlationID == null) {
+ LOG.warn("Ignoring message with no correlationID! " + message);
+ return;
+ }
+
+ // lets notify the monitor for this response
+ Object handler = requestMap.get(correlationID);
+ if (handler == null) {
+ LOG.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
+ }
+ else if (handler instanceof ReplyHandler) {
+ ReplyHandler replyHandler = (ReplyHandler) handler;
+ boolean complete = replyHandler.handle(message);
+ if (complete) {
+ requestMap.remove(correlationID);
+ }
+ }
+ }
+ catch (JMSException e) {
+ throw new FailedToProcessResponse(message, e);
+ }
+ }
+
+ public AbstractMessageListenerContainer getListenerContainer() {
+ if (listenerContainer == null) {
+ listenerContainer = createListenerContainer();
+ }
+ return listenerContainer;
+ }
+
+ public void setListenerContainer(AbstractMessageListenerContainer listenerContainer) {
+ this.listenerContainer = listenerContainer;
+ }
+
+ public Destination getReplyTo() {
+ return replyTo;
+ }
+
+ public void setReplyTo(Destination replyTo) {
+ this.replyTo = replyTo;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ protected void doStart() throws Exception {
+ AbstractMessageListenerContainer container = getListenerContainer();
+ container.afterPropertiesSet();
+ }
+
+ protected void doStop() throws Exception {
+ if (listenerContainer != null) {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ }
+ }
+
+ protected AbstractMessageListenerContainer createListenerContainer() {
+ SimpleMessageListenerContainer answer = new SimpleMessageListenerContainer();
+ answer.setDestinationName("temporary");
+ answer.setDestinationResolver(new DestinationResolver() {
+
+ public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
+ TemporaryQueue queue = session.createTemporaryQueue();
+ replyTo = queue;
+ return queue;
+ }
+ });
+ answer.setAutoStartup(true);
+ answer.setMessageListener(this);
+ answer.setPubSubDomain(false);
+ answer.setSubscriptionDurable(false);
+ answer.setConcurrentConsumers(1);
+ answer.setConnectionFactory(configuration.getConnectionFactory());
+ String clientId = configuration.getClientId();
+ if (clientId != null) {
+ clientId += ".Requestor";
+ answer.setClientId(clientId);
+ }
+ TaskExecutor taskExecutor = configuration.getTaskExecutor();
+ if (taskExecutor != null) {
+ answer.setTaskExecutor(taskExecutor);
+ }
+ ExceptionListener exceptionListener = configuration.getExceptionListener();
+ if (exceptionListener != null) {
+ answer.setExceptionListener(exceptionListener);
+ }
+ return answer;
+ }
+}
Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html (from r585168, activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html?p2=activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html&p1=activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html&r1=585168&r2=586066&rev=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html Thu Oct 18 11:41:45 2007
@@ -19,7 +19,9 @@
</head>
<body>
+Implementation classes for implementing request-response over JMS so that the
Defines the <a href="http://activemq.apache.org/camel/jms.html">JMS Component</a>
+can support InOut as well as InOnly
</body>
</html>
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java Thu Oct 18 11:41:45 2007
@@ -52,7 +52,7 @@
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
return camelContext;
@@ -76,7 +76,7 @@
from("bean:service2?method=status").to("activemq:topic:registry.heartbeats");
from("bean:service3?method=status").to("activemq:topic:registry.heartbeats");
- from("activemq:topic:registry.heartbeats").to("bean:registry?method=onEvent");
+ from("activemq:topic:registry.heartbeats?cacheLevelName=CACHE_CONSUMER").to("bean:registry?method=onEvent");
}
};
}
Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java Thu Oct 18 11:41:45 2007
@@ -20,17 +20,22 @@
import java.util.Map;
import java.util.HashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* A simple POJO showing how to create a simple registry
*
* @version $Revision: 1.1 $
*/
public class MyRegistry {
+ private static final transient Log LOG = LogFactory.getLog(MyRegistry.class);
private Map<String,Map> services = new HashMap<String, Map>();
public void onEvent(Map heartbeat) {
String key = (String) heartbeat.get("name");
+ LOG.debug(">>> event for: " + key + " details: " + heartbeat);
services.put(key, heartbeat);
}
Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.remoting;
+
+import org.apache.camel.spring.remoting.SpringRemotingRouteTest;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class JmsRemotingTest extends SpringRemotingRouteTest {
+
+ protected ClassPathXmlApplicationContext createApplicationContext() {
+ return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/remoting/spring.xml");
+ }
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties Thu Oct 18 11:41:45 2007
@@ -35,7 +35,7 @@
#
# The logging properties used during tests..
#
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
log4j.logger.org.apache.activemq.spring=WARN
Added: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml Thu Oct 18 11:41:45 2007
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:camel="http://activemq.apache.org/camel/schema/spring"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+ ">
+
+ <bean id="sayService" class="org.apache.camel.spring.remoting.SayService"/>
+
+ <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+ <!-- create the client proxy -->
+ <proxy id="sayProxy" serviceUrl="activemq:test.serviceQueue"
+ serviceInterface="org.apache.camel.spring.remoting.ISay"/>
+
+ <!-- export the service -->
+ <export id="say" uri="activemq:test.serviceQueue" serviceRef="sayService"
+ serviceInterface="org.apache.camel.spring.remoting.ISay"/>
+
+ </camelContext>
+
+ <!-- configure the ActiveMQ component -->
+ <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
+ <property name="connectionFactory">
+ <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
+ <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+ </bean>
+ </property>
+ </bean>
+</beans>
Propchange: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java Thu Oct 18 11:41:45 2007
@@ -85,7 +85,7 @@
if (!isStarted()) {
throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
}
- BeanInvocation invocation = new BeanInvocation(proxy, method, args);
+ BeanInvocation invocation = new BeanInvocation(method, args);
BeanExchange exchange = getEndpoint().createExchange();
exchange.setInvocation(invocation);
getProcessor().process(exchange);
Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java (original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java Thu Oct 18 11:41:45 2007
@@ -81,6 +81,6 @@
@Converter
public static BeanInvocation toBeanInvocation(MethodInvocation invocation) {
- return new BeanInvocation(invocation.getThis(), invocation.getMethod(), invocation.getArguments());
+ return new BeanInvocation(invocation.getMethod(), invocation.getArguments());
}
}