You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/20 02:15:58 UTC
svn commit: r520200 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/builder/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/queue/ camel-jms/src...
Author: jstrachan
Date: Mon Mar 19 18:15:57 2007
New Revision: 520200
URL: http://svn.apache.org/viewvc?view=rev&rev=520200
Log:
added some helper methods to make it easy to configure the components using Java code
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java Mon Mar 19 18:15:57 2007
@@ -17,25 +17,66 @@
*/
package org.apache.camel;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpointResolver;
+import org.apache.camel.impl.DefaultExchangeConverter;
+
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
-import org.apache.camel.impl.DefaultEndpointResolver;
-import org.apache.camel.impl.DefaultExchangeConverter;
-
/**
* Represents the container used to configure routes and the policies to use.
*
- * @org.apache.xbean.XBean element="container" rootElement="true"
* @version $Revision$
+ * @org.apache.xbean.XBean element="container" rootElement="true"
*/
-public class CamelContainer<E> {
-
+public class CamelContainer<E extends Exchange> {
+
private EndpointResolver<E> endpointResolver;
private ExchangeConverter exchangeConverter;
private Map<String, Component> components = new HashMap<String, Component>();
+ // Builder APIs
+ //-----------------------------------------------------------------------
+ public void routes(RouteBuilder<E> builder) {
+ // lets now add the routes from the builder
+ builder.setContainer(this);
+ Map<Endpoint<E>, Processor<E>> routeMap = builder.getRouteMap();
+ Set<Map.Entry<Endpoint<E>, Processor<E>>> entries = routeMap.entrySet();
+ for (Map.Entry<Endpoint<E>, Processor<E>> entry : entries) {
+ Endpoint<E> endpoint = entry.getKey();
+ Processor<E> processor = entry.getValue();
+ endpoint.setInboundProcessor(processor);
+ }
+ }
+
+ public void routes(final RouteFactory factory) {
+ RouteBuilder<E> builder = new RouteBuilder<E>(this) {
+ public void configure() {
+ factory.build(this);
+ }
+ };
+ }
+
+
+ /**
+ * Adds a component to the container if there is not currently a component already registered.
+ */
+ public void addComponent(String componentName, final Component<E, ? extends Endpoint<E>> component) {
+ // TODO provide a version of this which barfs if the component is registered multiple times
+
+ getOrCreateComponent(componentName, new Callable<Component<E, ? extends Endpoint<E>>>() {
+ public Component<E, ? extends Endpoint<E>> call() throws Exception {
+ return component;
+ }
+ });
+ }
+
+
+ // Properties
+ //-----------------------------------------------------------------------
public EndpointResolver<E> getEndpointResolver() {
if (endpointResolver == null) {
endpointResolver = createEndpointResolver();
@@ -71,26 +112,30 @@
return new DefaultExchangeConverter();
}
- public Component getOrCreateComponent(String componentName, Callable<Component<E,? extends Endpoint<E>>> factory) {
- synchronized(components) {
- Component component = components.get(componentName);
- if( component == null ) {
- try {
- component = factory.call();
- if( component == null )
- throw new IllegalArgumentException("Factory failed to create the "+componentName+" component, it returned null.");
- } catch (Exception e) {
- throw new IllegalArgumentException("Factory failed to create the "+componentName+" component", e);
- }
- }
- return component;
- }
- }
-
- public Component getComponent(String componentName) {
- synchronized(components) {
- Component component = components.get(componentName);
- return component;
- }
- }
+ public Component getOrCreateComponent(String componentName, Callable<Component<E, ? extends Endpoint<E>>> factory) {
+ synchronized (components) {
+ Component component = components.get(componentName);
+ if (component == null) {
+ try {
+ component = factory.call();
+ if (component == null) {
+ throw new IllegalArgumentException("Factory failed to create the " + componentName + " component, it returned null.");
+ }
+ components.put(componentName, component);
+ component.setContainer(this);
+ }
+ catch (Exception e) {
+ throw new IllegalArgumentException("Factory failed to create the " + componentName + " component", e);
+ }
+ }
+ return component;
+ }
+ }
+
+ public Component getComponent(String componentName) {
+ synchronized (components) {
+ Component component = components.get(componentName);
+ return component;
+ }
+ }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Mon Mar 19 18:15:57 2007
@@ -23,7 +23,12 @@
*/
public interface Component<E, EP extends Endpoint<E>> {
- /**
+ /**
+ * The CamelContainer is injected into the component when it is added to it
+ */
+ void setContainer(CamelContainer container);
+
+ /**
* Asks the component to activate the delivery of {@link Exchange} objects
* from the {@link Endpoint} to the {@link Processor}.
*/
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Mon Mar 19 18:15:57 2007
@@ -34,7 +34,23 @@
void send(E exchange);
/**
+ * Sets the processor for inbound messages
+ */
+ void setInboundProcessor(Processor<E> processor);
+
+ /**
* Create a new exchange for communicating with this endpoint
*/
E createExchange();
+
+
+ /**
+ * Called by the container when an endpoint is activiated
+ */
+ void activate();
+
+ /**
+ * Called by the container when the endpoint is deactivated
+ */
+ void deactivate();
}
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java?view=auto&rev=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java Mon Mar 19 18:15:57 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * A simple callback that POJOs can implement to be called back by the
+ * {@link CamelContainer} with a properly configured {@link RouteBuilder}
+ * to build routes
+ *
+ * @version $Revision: $
+ */
+public interface RouteFactory {
+ void build(RouteBuilder builder);
+}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Mon Mar 19 18:15:57 2007
@@ -16,12 +16,6 @@
*/
package org.apache.camel.builder;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.camel.CamelContainer;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointResolver;
@@ -30,6 +24,12 @@
import org.apache.camel.Processor;
import org.apache.camel.util.ObjectHelper;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* A builder of destinationBuilders using a typesafe Java DLS.
*
@@ -41,6 +41,13 @@
private AtomicBoolean initalized = new AtomicBoolean(false);
private Map<Endpoint<E>, Processor<E>> routeMap = new HashMap<Endpoint<E>, Processor<E>>();
+ protected RouteBuilder() {
+ }
+
+ protected RouteBuilder(CamelContainer<E> container) {
+ this.container = container;
+ }
+
/**
* Called on initialisation to to build the required destinationBuilders
*/
@@ -134,5 +141,4 @@
protected CamelContainer<E> createContainer() {
return new CamelContainer<E>();
}
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 19 18:15:57 2007
@@ -16,18 +16,23 @@
*/
package org.apache.camel.impl;
+import org.apache.camel.CamelContainer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeConverter;
-import org.apache.camel.CamelContainer;
+import org.apache.camel.Processor;
import org.apache.camel.util.ObjectHelper;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* @version $Revision$
*/
public abstract class DefaultEndpoint<E> implements Endpoint<E> {
private String endpointUri;
private CamelContainer container;
+ private Processor<E> inboundProcessor;
+ private AtomicBoolean activated = new AtomicBoolean(false);
+ private AtomicBoolean deactivated = new AtomicBoolean(false);
protected DefaultEndpoint(String endpointUri, CamelContainer container) {
this.endpointUri = endpointUri;
@@ -69,6 +74,42 @@
return type.cast(exchange);
}
return getContainer().getExchangeConverter().convertTo(type, exchange);
+ }
+
+
+ public void activate() {
+ if (activated.compareAndSet(false, true)) {
+ doActivate();
+ }
+ }
+ public void deactivate() {
+ if (deactivated.compareAndSet(false, true)) {
+ doDeactivate();
+ }
+ }
+
+ /**
+ * The processor used to process inbound message exchanges
+ */
+ public Processor<E> getInboundProcessor() {
+ return inboundProcessor;
+ }
+
+ public void setInboundProcessor(Processor<E> inboundProcessor) {
+ this.inboundProcessor = inboundProcessor;
+ activate();
+ }
+
+ /**
+ * Called at most once by the container to activate the endpoint
+ */
+ protected void doActivate() {
+ }
+
+ /**
+ * Called at most once by the container to deactivate the endpoint
+ */
+ protected void doDeactivate() {
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java Mon Mar 19 18:15:57 2007
@@ -16,14 +16,15 @@
*/
package org.apache.camel.queue;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+import org.apache.camel.Processor;
+
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.camel.Component;
-import org.apache.camel.Processor;
-
/**
* Represents the component that manages {@link QueueEndpoint}. It holds the
* list of named queues that queue endpoints reference.
@@ -35,7 +36,12 @@
private HashMap<String, Queue<E>> registry = new HashMap<String, Queue<E>>();
private HashMap<QueueEndpoint<E>, Activation> activations = new HashMap<QueueEndpoint<E>, Activation>();
-
+ private CamelContainer container;
+
+ public void setContainer(CamelContainer container) {
+ this.container = container;
+ }
+
class Activation implements Runnable {
private final QueueEndpoint<E> endpoint;
AtomicBoolean stop = new AtomicBoolean();
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java Mon Mar 19 18:15:57 2007
@@ -16,12 +16,13 @@
*/
package org.apache.camel.queue;
-import java.util.Queue;
-
import org.apache.camel.CamelContainer;
+import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
+import java.util.Queue;
+
/**
* Represents a queue endpoint that uses a {@link Queue}
* object to process inbound exchanges.
@@ -39,6 +40,11 @@
public void send(E exchange) {
queue.add(exchange);
+ }
+
+ public void setInboundProcessor(Processor<E> processor) {
+ // TODO lets start a thread to process inbound requests
+ // if we don't already have one
}
public E createExchange() {
Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java?view=auto&rev=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java Mon Mar 19 18:15:57 2007
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+/**
+ * Some helper builder methods for the easy configuration of the {@link org.apache.camel.CamelContainer} via Java code.
+ *
+ * @version $Revision: $
+ */
+public class Builder {
+ public static JmsComponent jmsComponent() {
+ return new JmsComponent();
+ }
+}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 18:15:57 2007
@@ -17,30 +17,59 @@
*/
package org.apache.camel.jms;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
+import com.sun.jndi.toolkit.url.Uri;
import org.apache.camel.CamelContainer;
import org.apache.camel.Component;
import org.apache.camel.Processor;
+import org.apache.camel.util.ObjectHelper;
import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import com.sun.jndi.toolkit.url.Uri;
+import javax.jms.ConnectionFactory;
/**
* @version $Revision$
*/
public class JmsComponent implements Component<JmsExchange, JmsEndpoint> {
- private JmsTemplate template = new JmsTemplate();
- private static final String QUEUE_PREFIX = "queue/";
- private static final String TOPIC_PREFIX = "topic/";
+ public static final String QUEUE_PREFIX = "queue/";
+ public static final String TOPIC_PREFIX = "topic/";
+
private CamelContainer container;
+ private JmsTemplate template;
+
+ /**
+ * Static builder method
+ */
+ public static JmsComponent jmsComponent() {
+ return new JmsComponent();
+ }
+
+ /**
+ * Static builder method
+ */
+ public static JmsComponent jmsComponent(JmsTemplate template) {
+ return new JmsComponent(template);
+ }
+
+ /**
+ * Static builder method
+ */
+ public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) {
+ return jmsComponent(new JmsTemplate(connectionFactory));
+ }
+
+
+ protected JmsComponent() {
+ this.template = new JmsTemplate();
+ }
+
+ protected JmsComponent(JmsTemplate template) {
+ this.template = template;
+ }
public JmsComponent(CamelContainer container) {
+ this();
this.container = container;
}
@@ -52,6 +81,8 @@
}
public JmsEndpoint createEndpoint(String uri, String path) {
+ ObjectHelper.notNull(container, "container");
+
if (path.startsWith(QUEUE_PREFIX)) {
template.setPubSubDomain(false);
path = path.substring(QUEUE_PREFIX.length());
@@ -64,14 +95,25 @@
final String subject = convertPathToActualDestination(path);
template.setDefaultDestinationName(subject);
+ /*
Destination destination = (Destination) template.execute(new SessionCallback() {
public Object doInJms(Session session) throws JMSException {
return template.getDestinationResolver().resolveDestinationName(session, subject, template.isPubSubDomain());
}
});
+ */
AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
- return new JmsEndpoint(uri, container, destination, template ,listenerContainer);
+ listenerContainer.setDestinationName(subject);
+ listenerContainer.setPubSubDomain(template.isPubSubDomain());
+ listenerContainer.setConnectionFactory(template.getConnectionFactory());
+
+ // TODO support optional parameters
+ // selector
+ // messageConverter
+ // durableSubscriberName
+
+ return new JmsEndpoint(uri, container, template, listenerContainer);
}
public JmsTemplate getTemplate() {
@@ -82,8 +124,19 @@
this.template = template;
}
+
+ public CamelContainer getContainer() {
+ return container;
+ }
+
+ public void setContainer(CamelContainer container) {
+ this.container = container;
+ }
+
protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
// TODO use an enum to auto-switch container types?
+
+ //return new SimpleMessageListenerContainer();
return new DefaultMessageListenerContainer();
}
@@ -95,10 +148,11 @@
return path;
}
- public void activate(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
- // TODO Auto-generated method stub
- }
- public void deactivate(JmsEndpoint endpoint) {
- // TODO Auto-generated method stub
- }
+ public void activate(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
+ // TODO Auto-generated method stub
+ }
+
+ public void deactivate(JmsEndpoint endpoint) {
+ // TODO Auto-generated method stub
+ }
}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java Mon Mar 19 18:15:57 2007
@@ -26,10 +26,8 @@
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.Session;
import javax.jms.MessageListener;
-import javax.jms.Destination;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Session;
/**
* @version $Revision$
@@ -37,33 +35,20 @@
public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
private JmsOperations template;
- private Destination destination;
private AbstractMessageListenerContainer listenerContainer;
- private Processor<Exchange> processor;
- private AtomicBoolean startedConsuming = new AtomicBoolean(false);
- public JmsEndpoint(String endpointUri, CamelContainer container, Destination destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+ public JmsEndpoint(String endpointUri, CamelContainer container, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
super(endpointUri, container);
- this.destination = destination;
this.template = template;
this.listenerContainer = listenerContainer;
this.listenerContainer.setMessageListener(this);
- this.listenerContainer.setDestination(destination);
}
public void onMessage(Message message) {
- Exchange exchange = createExchange(message);
- processor.onExchange(exchange);
+ JmsExchange exchange = createExchange(message);
+ getInboundProcessor().onExchange(exchange);
}
- public void setProcessor(Processor<Exchange> processor) {
- this.processor = processor;
- if (startedConsuming.compareAndSet(false, true)) {
- listenerContainer.afterPropertiesSet();
- listenerContainer.initialize();
- listenerContainer.start();
- }
- }
public void send(Exchange exchange) {
// lets convert to the type of an exchange
@@ -72,20 +57,13 @@
}
public void send(final JmsExchange exchange) {
- template.send(getDestination(), new MessageCreator() {
+ template.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return exchange.createMessage(session);
}
});
}
- /**
- * Returns the JMS destination for this endpoint
- */
- public Destination getDestination() {
- return destination;
- }
-
public JmsOperations getTemplate() {
return template;
}
@@ -102,5 +80,19 @@
protected MessageListener createMessageListener(Processor<Exchange> processor) {
return new MessageListenerProcessor(processor);
+ }
+
+
+ protected void doActivate() {
+ super.doActivate();
+ listenerContainer.afterPropertiesSet();
+ listenerContainer.initialize();
+ listenerContainer.start();
+ }
+
+ protected void doDeactivate() {
+ listenerContainer.stop();
+ listenerContainer.destroy();
+ super.doDeactivate();
}
}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java Mon Mar 19 18:15:57 2007
@@ -17,8 +17,6 @@
*/
package org.apache.camel.jms;
-import java.util.concurrent.Callable;
-
import org.apache.axis.transport.jms.JMSEndpoint;
import org.apache.camel.CamelContainer;
import org.apache.camel.Component;
@@ -26,6 +24,8 @@
import org.apache.camel.queue.QueueComponent;
import org.apache.camel.util.ObjectHelper;
+import java.util.concurrent.Callable;
+
/**
* An implementation of {@link EndpointResolver} that creates
* {@link JMSEndpoint} objects.
@@ -69,18 +69,18 @@
String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);
if( splitURI[2] != null ) {
rc[0] = splitURI[1];
- rc[0] = splitURI[2];
+ rc[1] = splitURI[2];
} else {
- rc[0] = splitURI[1];
+ rc[1] = splitURI[1];
}
return rc;
}
@SuppressWarnings("unchecked")
- private JmsComponent resolveJmsComponent(final CamelContainer container, String componentName) {
+ private JmsComponent resolveJmsComponent(final CamelContainer container, final String componentName) {
Component rc = container.getOrCreateComponent(componentName, new Callable<JmsComponent>(){
public JmsComponent call() throws Exception {
- return new JmsComponent(container);
+ return new JmsComponent(container);
}});
return (JmsComponent) rc;
}
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 18:15:57 2007
@@ -18,12 +18,32 @@
package org.apache.camel.jms;
import junit.framework.TestCase;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.builder.RouteBuilder;
+
+import static org.apache.camel.jms.JmsComponent.*;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.ConnectionFactory;
/**
* @version $Revision$
*/
public class JmsRouteTest extends TestCase {
public void testJmsRoute() throws Exception {
- // TODO
+ CamelContainer container = new CamelContainer();
+
+ System.out.println("Created container: " + container);
+
+ // lets configure some componnets
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ container.addComponent("activemq", jmsComponent(connectionFactory));
+
+ // lets add some routes
+ container.routes(new RouteBuilder() {
+ public void configure() {
+ from("jms:activemq:FOO.BAR").to("jms:activemq:FOO.BAR");
+ }
+ });
}
}