You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/10/18 20:41:49 UTC

svn commit: r586066 [2/2] - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/bean/ camel-core/src/main/java/org/apache/camel/util/ camel-core/src/test/java/org/apache/camel/compone...

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.requestor;
+
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.DefaultTimeoutMap;
+import org.apache.camel.util.TimeoutMap;
+import org.apache.camel.component.jms.JmsConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
+import org.springframework.jms.support.destination.DestinationResolver;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class Requestor extends ServiceSupport implements MessageListener {
+    private static final transient Log LOG = LogFactory.getLog(Requestor.class);
+    private final JmsConfiguration configuration;
+    private AbstractMessageListenerContainer listenerContainer;
+    private TimeoutMap requestMap;
+    private Destination replyTo;
+                                
+    public Requestor(JmsConfiguration configuration, ScheduledExecutorService executorService) {
+        this.configuration = configuration;
+        requestMap = new DefaultTimeoutMap(executorService, configuration.getRequestMapPurgePollTimeMillis());
+    }
+
+    public FutureTask getReceiveFuture(String correlationID, long requestTimeout) {
+        FutureTask future = null;
+/*
+            // Deal with async handlers...
+
+            Object currentHandler = requestMap.get(correlationID);
+            if (currentHandler instanceof AsyncReplyHandler) {
+                AsyncReplyHandler handler = (AsyncReplyHandler) currentHandler;
+                future = handler.newResultHandler();
+            }
+*/
+
+        if (future == null) {
+            FutureHandler futureHandler = new FutureHandler();
+            future = futureHandler;
+            requestMap.put(correlationID, futureHandler, requestTimeout);
+        }
+        return future;
+    }
+
+    public void onMessage(Message message) {
+        try {
+            String correlationID = message.getJMSCorrelationID();
+            if (correlationID == null) {
+                LOG.warn("Ignoring message with no correlationID! " + message);
+                return;
+            }
+
+            // lets notify the monitor for this response
+            Object handler = requestMap.get(correlationID);
+            if (handler == null) {
+                LOG.warn("Response received for unknown correlationID: " + correlationID + " request: " + message);
+            }
+            else if (handler instanceof ReplyHandler) {
+                ReplyHandler replyHandler = (ReplyHandler) handler;
+                boolean complete = replyHandler.handle(message);
+                if (complete) {
+                    requestMap.remove(correlationID);
+                }
+            }
+        }
+        catch (JMSException e) {
+            throw new FailedToProcessResponse(message, e);
+        }
+    }
+
+    public AbstractMessageListenerContainer getListenerContainer() {
+        if (listenerContainer == null) {
+            listenerContainer = createListenerContainer();
+        }
+        return listenerContainer;
+    }
+
+    public void setListenerContainer(AbstractMessageListenerContainer listenerContainer) {
+        this.listenerContainer = listenerContainer;
+    }
+
+    public Destination getReplyTo() {
+        return replyTo;
+    }
+
+    public void setReplyTo(Destination replyTo) {
+        this.replyTo = replyTo;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    protected void doStart() throws Exception {
+        AbstractMessageListenerContainer container = getListenerContainer();
+        container.afterPropertiesSet();
+    }
+
+    protected void doStop() throws Exception {
+        if (listenerContainer != null) {
+            listenerContainer.stop();
+            listenerContainer.destroy();
+        }
+    }
+
+    protected AbstractMessageListenerContainer createListenerContainer() {
+        SimpleMessageListenerContainer answer = new SimpleMessageListenerContainer();
+        answer.setDestinationName("temporary");
+        answer.setDestinationResolver(new DestinationResolver() {
+
+            public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
+                TemporaryQueue queue = session.createTemporaryQueue();
+                replyTo = queue;
+                return queue;
+            }
+        });
+        answer.setAutoStartup(true);
+        answer.setMessageListener(this);
+        answer.setPubSubDomain(false);
+        answer.setSubscriptionDurable(false);
+        answer.setConcurrentConsumers(1);
+        answer.setConnectionFactory(configuration.getConnectionFactory());
+        String clientId = configuration.getClientId();
+        if (clientId != null) {
+            clientId += ".Requestor";
+            answer.setClientId(clientId);
+        }
+        TaskExecutor taskExecutor = configuration.getTaskExecutor();
+        if (taskExecutor != null) {
+            answer.setTaskExecutor(taskExecutor);
+        }
+        ExceptionListener exceptionListener = configuration.getExceptionListener();
+        if (exceptionListener != null) {
+            answer.setExceptionListener(exceptionListener);
+        }
+        return answer;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/Requestor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html (from r585168, activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html?p2=activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html&p1=activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html&r1=585168&r2=586066&rev=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/package.html (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/requestor/package.html Thu Oct 18 11:41:45 2007
@@ -19,7 +19,9 @@
 </head>
 <body>
 
+Implementation classes for implementing request-response over JMS so that the
 Defines the <a href="http://activemq.apache.org/camel/jms.html">JMS Component</a>
+can support InOut as well as InOnly
 
 </body>
 </html>

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/JmsDiscoveryTest.java Thu Oct 18 11:41:45 2007
@@ -52,7 +52,7 @@
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
         camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
 
         return camelContext;
@@ -76,7 +76,7 @@
                 from("bean:service2?method=status").to("activemq:topic:registry.heartbeats");
                 from("bean:service3?method=status").to("activemq:topic:registry.heartbeats");
 
-                from("activemq:topic:registry.heartbeats").to("bean:registry?method=onEvent");
+                from("activemq:topic:registry.heartbeats?cacheLevelName=CACHE_CONSUMER").to("bean:registry?method=onEvent");
             }
         };
     }

Modified: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/discovery/MyRegistry.java Thu Oct 18 11:41:45 2007
@@ -20,17 +20,22 @@
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /**
  * A simple POJO showing how to create a simple registry
  * 
  * @version $Revision: 1.1 $
  */
 public class MyRegistry {
+    private static final transient Log LOG = LogFactory.getLog(MyRegistry.class);
 
     private Map<String,Map> services = new HashMap<String, Map>();
 
     public void onEvent(Map heartbeat) {
         String key = (String) heartbeat.get("name");
+        LOG.debug(">>> event for: " + key + " details: " + heartbeat);
         services.put(key, heartbeat);
     }
 

Added: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java Thu Oct 18 11:41:45 2007
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.remoting;
+
+import org.apache.camel.spring.remoting.SpringRemotingRouteTest;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class JmsRemotingTest extends SpringRemotingRouteTest {
+
+    protected ClassPathXmlApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext("org/apache/camel/component/jms/remoting/spring.xml");
+    }
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/remoting/JmsRemotingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/log4j.properties Thu Oct 18 11:41:45 2007
@@ -35,7 +35,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
 
 log4j.logger.org.apache.activemq.spring=WARN
 

Added: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml?rev=586066&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml (added)
+++ activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml Thu Oct 18 11:41:45 2007
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:camel="http://activemq.apache.org/camel/schema/spring"
+       xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+       http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+    ">
+
+  <bean id="sayService" class="org.apache.camel.spring.remoting.SayService"/>
+
+  <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+    <!-- create the client proxy -->
+    <proxy id="sayProxy" serviceUrl="activemq:test.serviceQueue"
+           serviceInterface="org.apache.camel.spring.remoting.ISay"/>
+
+    <!-- export the service -->
+    <export id="say" uri="activemq:test.serviceQueue" serviceRef="sayService"
+            serviceInterface="org.apache.camel.spring.remoting.ISay"/>
+
+  </camelContext>
+
+  <!-- configure the ActiveMQ component -->
+  <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
+    <property name="connectionFactory">
+      <bean class="org.apache.activemq.spring.ActiveMQConnectionFactory">
+        <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
+      </bean>
+    </property>
+  </bean>
+</beans>

Propchange: activemq/camel/trunk/components/camel-jms/src/test/resources/org/apache/camel/component/jms/remoting/spring.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java (original)
+++ activemq/camel/trunk/components/camel-rmi/src/main/java/org/apache/camel/component/rmi/RmiConsumer.java Thu Oct 18 11:41:45 2007
@@ -85,7 +85,7 @@
         if (!isStarted()) {
             throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
         }
-        BeanInvocation invocation = new BeanInvocation(proxy, method, args);
+        BeanInvocation invocation = new BeanInvocation(method, args);
         BeanExchange exchange = getEndpoint().createExchange();
         exchange.setInvocation(invocation);
         getProcessor().process(exchange);

Modified: activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java?rev=586066&r1=586065&r2=586066&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java (original)
+++ activemq/camel/trunk/components/camel-spring/src/main/java/org/apache/camel/spring/spi/SpringConverters.java Thu Oct 18 11:41:45 2007
@@ -81,6 +81,6 @@
 
     @Converter
     public static BeanInvocation toBeanInvocation(MethodInvocation invocation) {
-        return new BeanInvocation(invocation.getThis(), invocation.getMethod(), invocation.getArguments());
+        return new BeanInvocation(invocation.getMethod(), invocation.getArguments());
     }
 }