You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/19 20:51:01 UTC
svn commit: r520071 - in /activemq/camel/trunk/camel-jms/src:
main/java/org/apache/camel/jms/ main/resources/ main/resources/META-INF/
main/resources/META-INF/services/ main/resources/META-INF/services/org/
main/resources/META-INF/services/org/apache/ ...
Author: jstrachan
Date: Mon Mar 19 12:51:00 2007
New Revision: 520071
URL: http://svn.apache.org/viewvc?view=rev&rev=520071
Log:
added a spike of a JMS component and endpoint for Camel
Added:
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (with props)
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java (with props)
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java (with props)
activemq/camel/trunk/camel-jms/src/main/resources/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/
activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms
activemq/camel/trunk/camel-jms/src/test/
activemq/camel/trunk/camel-jms/src/test/java/
activemq/camel/trunk/camel-jms/src/test/java/org/
activemq/camel/trunk/camel-jms/src/test/java/org/apache/
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (with props)
Modified:
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java?view=diff&rev=520071&r1=520070&r2=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/DefaultJmsExchange.java Mon Mar 19 12:51:00 2007
@@ -32,6 +32,13 @@
public class DefaultJmsExchange extends ExchangeSupport<Message> implements JmsExchange {
private Map<String, Object> lazyHeaders;
+ public DefaultJmsExchange() {
+ }
+
+ public DefaultJmsExchange(Message message) {
+ setRequest(message);
+ }
+
public <T> T getHeader(String name) {
Message request = getRequest();
if (request != null) {
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,98 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.springframework.jms.core.JmsOperations;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.Destination;
+
+import com.sun.jndi.toolkit.url.Uri;
+
+/**
+ * @version $Revision$
+ */
+public class JmsComponent implements Component<JmsExchange> {
+ private JmsTemplate template = new JmsTemplate();
+ private static final String QUEUE_PREFIX = "queue/";
+ private static final String TOPIC_PREFIX = "topic/";
+ private CamelContainer container;
+
+ public JmsComponent(CamelContainer container) {
+ this.container = container;
+ }
+
+ public JmsEndpoint createEndpoint(Uri uri) {
+ // lets figure out from the URI whether its a queue, topic etc
+
+ String path = uri.getPath();
+ return createEndpoint(uri.toString(), path);
+ }
+
+ public JmsEndpoint createEndpoint(String uri, String path) {
+ if (path.startsWith(QUEUE_PREFIX)) {
+ template.setPubSubDomain(false);
+ path = path.substring(QUEUE_PREFIX.length());
+ }
+ else if (path.startsWith(TOPIC_PREFIX)) {
+ template.setPubSubDomain(false);
+ path = path.substring(TOPIC_PREFIX.length());
+ }
+
+ final String subject = convertPathToActualDestination(path);
+ template.setDefaultDestinationName(subject);
+
+ Destination destination = (Destination) template.execute(new SessionCallback() {
+ public Object doInJms(Session session) throws JMSException {
+ return template.getDestinationResolver().resolveDestinationName(session, subject, template.isPubSubDomain());
+ }
+ });
+
+ AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
+ return new JmsEndpoint(uri, container, destination, template ,listenerContainer);
+ }
+
+ public JmsTemplate getTemplate() {
+ return template;
+ }
+
+ public void setTemplate(JmsTemplate template) {
+ this.template = template;
+ }
+
+ protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
+ // TODO use an enum to auto-switch container types?
+ return new DefaultMessageListenerContainer();
+ }
+
+ /**
+ * A strategy method allowing the URI destination to be translated into the actual JMS destination name
+ * (say by looking up in JNDI or something)
+ */
+ protected String convertPathToActualDestination(String path) {
+ return path;
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java?view=diff&rev=520071&r1=520070&r2=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java Mon Mar 19 12:51:00 2007
@@ -16,39 +16,53 @@
*/
package org.apache.camel.jms;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeConverter;
import org.apache.camel.CamelContainer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.MessageListener;
+import javax.jms.Destination;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* @version $Revision$
*/
-public class JmsEndpoint extends DefaultEndpoint<JmsExchange> {
+public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
private JmsOperations template;
private Destination destination;
+ private AbstractMessageListenerContainer listenerContainer;
+ private Processor<Exchange> processor;
+ private AtomicBoolean startedConsuming = new AtomicBoolean(false);
-
- public JmsEndpoint(String uri, CamelContainer container, Destination destination, JmsOperations template) {
- super(uri, container);
+ public JmsEndpoint(String endpointUri, CamelContainer container, Destination destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+ super(endpointUri, container);
this.destination = destination;
this.template = template;
+ this.listenerContainer = listenerContainer;
+ this.listenerContainer.setMessageListener(this);
+ this.listenerContainer.setDestination(destination);
}
- public void send(final JmsExchange exchange) {
- template.send(getDestination(), new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return exchange.createMessage(session);
- }
- });
+ public void onMessage(Message message) {
+ Exchange exchange = createExchange(message);
+ processor.onExchange(exchange);
+ }
+
+ public void setProcessor(Processor<Exchange> processor) {
+ this.processor = processor;
+ if (startedConsuming.compareAndSet(false, true)) {
+ listenerContainer.afterPropertiesSet();
+ listenerContainer.initialize();
+ listenerContainer.start();
+ }
}
public void send(Exchange exchange) {
@@ -57,6 +71,14 @@
send(jmsExchange);
}
+ public void send(final JmsExchange exchange) {
+ template.send(getDestination(), new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ return exchange.createMessage(session);
+ }
+ });
+ }
+
/**
* Returns the JMS destination for this endpoint
*/
@@ -70,5 +92,15 @@
public JmsExchange createExchange() {
return new DefaultJmsExchange();
+ }
+
+
+ public JmsExchange createExchange(Message message) {
+ return new DefaultJmsExchange(message);
+ }
+
+
+ protected MessageListener createMessageListener(Processor<Exchange> processor) {
+ return new MessageListenerProcessor(processor);
}
}
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.apache.camel.EndpointResolver;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.queue.QueueEndpoint;
+import org.apache.camel.queue.QueueComponent;
+
+import java.util.Queue;
+import java.util.concurrent.Callable;
+
+/**
+ * An implementation of {@link EndpointResolver} that creates
+ * {@link QueueEndpoint} objects.
+ *
+ * The synatx for a Queue URI looks like:
+ *
+ * <pre><code>queue:[component:]queuename</code></pre>
+ * the component is optional, and if it is not specified, the default component name
+ * is assumed.
+ *
+ * @version $Revision$
+ */
+public class JmsEndpointResolver implements EndpointResolver<JmsExchange> {
+
+ public static final String DEFAULT_COMPONENT_NAME = QueueComponent.class.getName();
+
+ /**
+ * Finds the {@see JmsComponent} specified by the uri. If the {@see JmsComponent}
+ * object do not exist, it will be created.
+ */
+ public Component resolveComponent(CamelContainer container, String uri) {
+ String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);
+ return resolveJmsComponent(container, splitURI);
+ }
+
+ /**
+ * Finds the {@see QueueEndpoint} specified by the uri. If the {@see QueueEndpoint} or it's associated
+ * {@see QueueComponent} object do not exist, they will be created.
+ */
+ public JmsEndpoint resolveEndpoint(CamelContainer container, String uri) {
+ String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);
+ JmsComponent component = resolveJmsComponent(container, splitURI);
+
+ return component.createEndpoint(uri, splitURI[2]);
+ }
+
+ private JmsComponent resolveJmsComponent(final CamelContainer container, String[] splitURI) {
+ String componentName = DEFAULT_COMPONENT_NAME;
+ if( splitURI[2] != null ) {
+ componentName = splitURI[1];
+ }
+ Component rc = container.getOrCreateComponent(componentName, new Callable<JmsComponent>(){
+ public JmsComponent call() throws Exception {
+ return new JmsComponent(container);
+ }});
+ return (JmsComponent) rc;
+ }
+
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import javax.jms.MessageListener;
+import javax.jms.Message;
+
+/**
+ * @version $Revision$
+ */
+public class MessageListenerProcessor implements MessageListener {
+ private Processor<Exchange> processor;
+
+ public MessageListenerProcessor(Processor<Exchange> processor) {
+ this.processor = processor;
+ }
+
+ public void onMessage(Message message) {
+ Exchange exchange = createMessageExchange(message);
+ processor.onExchange(exchange);
+ }
+
+ protected DefaultJmsExchange createMessageExchange(Message message) {
+ return new DefaultJmsExchange(message);
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/MessageListenerProcessor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms (added)
+++ activemq/camel/trunk/camel-jms/src/main/resources/META-INF/services/org/apache/camel/EndpointResolver/jms Mon Mar 19 12:51:00 2007
@@ -0,0 +1 @@
+class=org.apache.camel.jms.JmsEndpointResolver
Added: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=auto&rev=520071
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (added)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 12:51:00 2007
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class JmsRouteTest extends TestCase {
+ public void testJmsRoute() throws Exception {
+ // TODO
+ }
+}
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain