You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/19 20:51:01 UTC

svn commit: r520071 - in /activemq/camel/trunk/camel-jms/src: main/java/org/apache/camel/jms/ main/resources/ main/resources/META-INF/ main/resources/META-INF/services/ main/resources/META-INF/services/org/ main/resources/META-INF/services/org/apache/ ...

Author: jstrachan
Date: Mon Mar 19 12:51:00 2007
New Revision: 520071

URL: http://svn.apache.org/viewvc?view=rev&rev=520071
Log:
added a spike of a JMS component and endpoint for Camel

Added:
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java   (with props)
    activemq/camel/trunk/camel-jms/src/main/resources/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/
    activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms
    activemq/camel/trunk/camel-jms/src/test/
    activemq/camel/trunk/camel-jms/src/test/java/
    activemq/camel/trunk/camel-jms/src/test/java/org/
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java   (with props)
Modified:
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java?view=diff&rev=520071&r1=520070&r2=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java Mon Mar 19 12:51:00 2007
@@ -32,6 +32,13 @@
 public class DefaultJmsExchange extends ExchangeSupport<Message> implements JmsExchange {
     private Map<String, Object> lazyHeaders;
 
+    public DefaultJmsExchange() {
+    }
+
+    public DefaultJmsExchange(Message message) {
+        setRequest(message);
+    }
+
     public <T> T getHeader(String name) {
         Message request = getRequest();
         if (request != null) {

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+
+import com.sun.jndi.toolkit.url.Uri;
+
+/**
+ * @version $Revision$
+ */
+public class JmsComponent implements Component<JmsExchange> {
+    private JmsTemplate template = new JmsTemplate();
+    private static final String QUEUE_PREFIX = "queue/";
+    private static final String TOPIC_PREFIX = "topic/";
+    private CamelContainer container;
+
+    public JmsComponent(CamelContainer container) {
+        this.container = container;
+    }
+
+    public JmsEndpoint createEndpoint(Uri uri) {
+        // lets figure out from the URI whether its a queue, topic etc
+
+        String path = uri.getPath();
+        return createEndpoint(uri.toString(), path);
+    }
+
+    public JmsEndpoint createEndpoint(String uri, String path) {
+        if (path.startsWith(QUEUE_PREFIX)) {
+            template.setPubSubDomain(false);
+            path = path.substring(QUEUE_PREFIX.length());
+        }
+        else if (path.startsWith(TOPIC_PREFIX)) {
+            template.setPubSubDomain(false);
+            path = path.substring(TOPIC_PREFIX.length());
+        }
+
+        final String subject = convertPathToActualDestination(path);
+        template.setDefaultDestinationName(subject);
+
+        Destination destination = (Destination) template.execute(new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                return template.getDestinationResolver().resolveDestinationName(session, subject, template.isPubSubDomain());
+            }
+        });
+
+        AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
+        return new JmsEndpoint(uri, container, destination, template ,listenerContainer);
+    }
+
+    public JmsTemplate getTemplate() {
+        return template;
+    }
+
+    public void setTemplate(JmsTemplate template) {
+        this.template = template;
+    }
+
+    protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
+        // TODO use an enum to auto-switch container types?
+        return new DefaultMessageListenerContainer();
+    }
+
+    /**
+     * A strategy method allowing the URI destination to be translated into the actual JMS destination name
+     * (say by looking up in JNDI or something)
+     */
+    protected String convertPathToActualDestination(String path) {
+        return path;
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java?view=diff&rev=520071&r1=520070&r2=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java Mon Mar 19 12:51:00 2007
@@ -16,39 +16,53 @@
  */
 package org.apache.camel.jms;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeConverter;
 import org.apache.camel.CamelContainer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
 
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.MessageListener;
+import javax.jms.Destination;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * @version $Revision$
  */
-public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
+public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
 
     private JmsOperations template;
     private Destination destination;
+    private AbstractMessageListenerContainer listenerContainer;
+    private Processor<Exchange> processor;
+    private AtomicBoolean startedConsuming = new AtomicBoolean(false);
 
-
-    public JmsEndpoint(String uri, CamelContainer container, Destination destination, JmsOperations template) {
-        super(uri, container);
+    public JmsEndpoint(String endpointUri, CamelContainer container, Destination destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+        super(endpointUri, container);
         this.destination = destination;
         this.template = template;
+        this.listenerContainer = listenerContainer;
+        this.listenerContainer.setMessageListener(this);
+        this.listenerContainer.setDestination(destination);
     }
 
-    public void send(final JmsExchange exchange) {
-        template.send(getDestination(), new MessageCreator() {
-            public Message createMessage(Session session) throws JMSException {
-                return exchange.createMessage(session);
-            }
-        });
+    public void onMessage(Message message) {
+        Exchange exchange = createExchange(message);
+        processor.onExchange(exchange);
+    }
+
+    public void setProcessor(Processor<Exchange> processor) {
+        this.processor = processor;
+        if (startedConsuming.compareAndSet(false, true)) {
+            listenerContainer.afterPropertiesSet();
+            listenerContainer.initialize();
+            listenerContainer.start();
+        }
     }
 
     public void send(Exchange exchange) {
@@ -57,6 +71,14 @@
         send(jmsExchange);
     }
 
+    public void send(final JmsExchange exchange) {
+        template.send(getDestination(), new MessageCreator() {
+            public Message createMessage(Session session) throws JMSException {
+                return exchange.createMessage(session);
+            }
+        });
+    }
+
     /**
      * Returns the JMS destination for this endpoint
      */
@@ -70,5 +92,15 @@
 
     public JmsExchange createExchange() {
         return new DefaultJmsExchange();
+    }
+
+
+    public JmsExchange createExchange(Message message) {
+        return new DefaultJmsExchange(message);
+    }
+
+
+    protected MessageListener createMessageListener(Processor<Exchange> processor) {
+        return new MessageListenerProcessor(processor);
     }
 }

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.apache.camel.EndpointResolver;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.queue.QueueEndpoint;
+import org.apache.camel.queue.QueueComponent;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+
+/**
+ * An implementation of {@link EndpointResolver} that creates
+ * {@link QueueEndpoint} objects.
+ *
+ * The synatx for a Queue URI looks like:
+ *
+ * <pre><code>queue:[component:]queuename</code></pre>
+ * the component is optional, and if it is not specified, the default component name
+ * is assumed.
+ *
+ * @version $Revision$
+ */
+public class JmsEndpointResolver implements EndpointResolver<JmsExchange> {
+
+	public static final String DEFAULT_COMPONENT_NAME = QueueComponent.class.getName();
+
+	/**
+	 * Finds the {@see JmsComponent} specified by the uri.  If the {@see JmsComponent}
+	 * object do not exist, it will be created.
+	 */
+	public Component resolveComponent(CamelContainer container, String uri) {
+		String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);
+		return resolveJmsComponent(container, splitURI);
+	}
+
+	/**
+	 * Finds the {@see QueueEndpoint} specified by the uri.  If the {@see QueueEndpoint} or it's associated
+	 * {@see QueueComponent} object do not exist, they will be created.
+	 */
+	public JmsEndpoint resolveEndpoint(CamelContainer container, String uri) {
+		String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);
+    	JmsComponent component = resolveJmsComponent(container, splitURI);
+
+        return component.createEndpoint(uri, splitURI[2]);
+    }
+
+	private JmsComponent resolveJmsComponent(final CamelContainer container, String[] splitURI) {
+		String componentName = DEFAULT_COMPONENT_NAME;
+    	if( splitURI[2] != null ) {
+    		componentName =  splitURI[1];
+    	}
+    	Component rc = container.getOrCreateComponent(componentName, new Callable<JmsComponent>(){
+			public JmsComponent call() throws Exception {
+				return new JmsComponent(container);
+			}});
+    	return (JmsComponent) rc;
+	}
+
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import javax.jms.MessageListener;
+import javax.jms.Message;
+
+/**
+ * @version $Revision$
+ */
+public class MessageListenerProcessor implements MessageListener {
+    private Processor<Exchange> processor;
+    
+    public MessageListenerProcessor(Processor<Exchange> processor) {
+        this.processor = processor;
+    }
+
+    public void onMessage(Message message) {
+        Exchange exchange = createMessageExchange(message);
+        processor.onExchange(exchange);
+    }
+
+    protected DefaultJmsExchange createMessageExchange(Message message) {
+        return new DefaultJmsExchange(message);
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms (added)
+++ activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms Mon Mar 19 12:51:00 2007
@@ -0,0 +1 @@
+class=org.apache.camel.jms.JmsEndpointResolver

Added: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (added)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class JmsRouteTest extends TestCase {
+    public void testJmsRoute() throws Exception {
+        // TODO 
+    }
+}

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain