You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/03/20 02:15:58 UTC

svn commit: r520200 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/queue/ camel-jms/src...

Author: jstrachan
Date: Mon Mar 19 18:15:57 2007
New Revision: 520200

URL: http://svn.apache.org/viewvc?view=rev&rev=520200
Log:
added some helper methods to make it easy to configure the components using Java code

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java Mon Mar 19 18:15:57 2007
@@ -17,25 +17,66 @@
  */
 package org.apache.camel;
 
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpointResolver;
+import org.apache.camel.impl.DefaultExchangeConverter;
+
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Callable;
 
-import org.apache.camel.impl.DefaultEndpointResolver;
-import org.apache.camel.impl.DefaultExchangeConverter;
-
 /**
  * Represents the container used to configure routes and the policies to use.
  *
- * @org.apache.xbean.XBean element="container" rootElement="true"
  * @version $Revision$
+ * @org.apache.xbean.XBean element="container" rootElement="true"
  */
-public class CamelContainer<E> {
-	
+public class CamelContainer<E extends Exchange> {
+
     private EndpointResolver<E> endpointResolver;
     private ExchangeConverter exchangeConverter;
     private Map<String, Component> components = new HashMap<String, Component>();
 
+    // Builder APIs
+    //-----------------------------------------------------------------------
+    public void routes(RouteBuilder<E> builder) {
+        // lets now add the routes from the builder
+        builder.setContainer(this);
+        Map<Endpoint<E>, Processor<E>> routeMap = builder.getRouteMap();
+        Set<Map.Entry<Endpoint<E>, Processor<E>>> entries = routeMap.entrySet();
+        for (Map.Entry<Endpoint<E>, Processor<E>> entry : entries) {
+            Endpoint<E> endpoint = entry.getKey();
+            Processor<E> processor = entry.getValue();
+            endpoint.setInboundProcessor(processor);
+        }
+    }
+
+    public void routes(final RouteFactory factory) {
+        RouteBuilder<E> builder = new RouteBuilder<E>(this) {
+            public void configure() {
+                factory.build(this);
+            }
+        };
+    }
+
+
+    /**
+     * Adds a component to the container if there is not currently a component already registered.
+     */
+    public void addComponent(String componentName, final Component<E, ? extends Endpoint<E>> component) {
+        // TODO provide a version of this which barfs if the component is registered multiple times
+
+        getOrCreateComponent(componentName, new Callable<Component<E, ? extends Endpoint<E>>>() {
+            public Component<E, ? extends Endpoint<E>> call() throws Exception {
+                return component;
+            }
+        });
+    }
+
+
+    // Properties
+    //-----------------------------------------------------------------------
     public EndpointResolver<E> getEndpointResolver() {
         if (endpointResolver == null) {
             endpointResolver = createEndpointResolver();
@@ -71,26 +112,30 @@
         return new DefaultExchangeConverter();
     }
 
-	public Component getOrCreateComponent(String componentName, Callable<Component<E,? extends  Endpoint<E>>> factory) {
-		synchronized(components) { 
-			Component component = components.get(componentName);
-			if( component == null ) {
-				try {
-					component = factory.call();
-					if( component == null )
-						throw new IllegalArgumentException("Factory failed to create the "+componentName+" component, it returned null.");
-				} catch (Exception e) {
-					throw new IllegalArgumentException("Factory failed to create the "+componentName+" component", e);
-				}
-			}
-			return component;
-		}
-	}
-	
-	public Component getComponent(String componentName) {
-		synchronized(components) { 
-			Component component = components.get(componentName);
-			return component;
-		}
-	}
+    public Component getOrCreateComponent(String componentName, Callable<Component<E, ? extends Endpoint<E>>> factory) {
+        synchronized (components) {
+            Component component = components.get(componentName);
+            if (component == null) {
+                try {
+                    component = factory.call();
+                    if (component == null) {
+                        throw new IllegalArgumentException("Factory failed to create the " + componentName + " component, it returned null.");
+                    }
+                    components.put(componentName, component);
+                    component.setContainer(this);
+                }
+                catch (Exception e) {
+                    throw new IllegalArgumentException("Factory failed to create the " + componentName + " component", e);
+                }
+            }
+            return component;
+        }
+    }
+
+    public Component getComponent(String componentName) {
+        synchronized (components) {
+            Component component = components.get(componentName);
+            return component;
+        }
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Mon Mar 19 18:15:57 2007
@@ -23,7 +23,12 @@
  */
 public interface Component<E, EP extends Endpoint<E>>  {
 
-	/**
+    /**
+     * The CamelContainer is injected into the component when it is added to it
+     */
+    void setContainer(CamelContainer container);
+
+    /**
 	 * Asks the component to activate the delivery of {@link Exchange} objects
 	 * from the {@link Endpoint} to the {@link Processor}.
 	 */

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Mon Mar 19 18:15:57 2007
@@ -34,7 +34,23 @@
     void send(E exchange);
 
     /**
+     * Sets the processor for inbound messages
+     */
+    void setInboundProcessor(Processor<E> processor);
+    
+    /**
      * Create a new exchange for communicating with this endpoint
      */
     E createExchange();
+
+
+    /**
+     * Called by the container when an endpoint is activiated
+     */
+    void activate();
+
+    /**
+     * Called by the container when the endpoint is deactivated
+     */
+    void deactivate();
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java?view=auto&rev=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/RouteFactory.java Mon Mar 19 18:15:57 2007
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * A simple callback that POJOs can implement to be called back by the
+ * {@link CamelContainer} with a properly configured {@link RouteBuilder}
+ * to build routes
+ *
+ * @version $Revision: $
+ */
+public interface RouteFactory {
+    void build(RouteBuilder builder);
+}

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Mon Mar 19 18:15:57 2007
@@ -16,12 +16,6 @@
  */
 package org.apache.camel.builder;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.camel.CamelContainer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointResolver;
@@ -30,6 +24,12 @@
 import org.apache.camel.Processor;
 import org.apache.camel.util.ObjectHelper;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * A builder of destinationBuilders using a typesafe Java DLS.
  *
@@ -41,6 +41,13 @@
     private AtomicBoolean initalized = new AtomicBoolean(false);
     private Map<Endpoint<E>, Processor<E>> routeMap = new HashMap<Endpoint<E>, Processor<E>>();
 
+    protected RouteBuilder() {
+    }
+
+    protected RouteBuilder(CamelContainer<E> container) {
+        this.container = container;
+    }
+
     /**
      * Called on initialisation to to build the required destinationBuilders
      */
@@ -134,5 +141,4 @@
     protected CamelContainer<E> createContainer() {
         return new CamelContainer<E>();
     }
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 19 18:15:57 2007
@@ -16,18 +16,23 @@
  */
 package org.apache.camel.impl;
 
+import org.apache.camel.CamelContainer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeConverter;
-import org.apache.camel.CamelContainer;
+import org.apache.camel.Processor;
 import org.apache.camel.util.ObjectHelper;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * @version $Revision$
  */
 public abstract class DefaultEndpoint<E> implements Endpoint<E> {
     private String endpointUri;
     private CamelContainer container;
+    private Processor<E> inboundProcessor;
+    private AtomicBoolean activated = new AtomicBoolean(false);
+    private AtomicBoolean deactivated = new AtomicBoolean(false);
 
     protected DefaultEndpoint(String endpointUri, CamelContainer container) {
         this.endpointUri = endpointUri;
@@ -69,6 +74,42 @@
             return type.cast(exchange);
         }
         return getContainer().getExchangeConverter().convertTo(type, exchange);
+    }
+
+
+    public void activate() {
+        if (activated.compareAndSet(false, true)) {
+            doActivate();
+        }
+    }
+    public void deactivate() {
+        if (deactivated.compareAndSet(false, true)) {
+            doDeactivate();
+        }
+    }
+
+    /**
+     * The processor used to process inbound message exchanges
+     */
+    public Processor<E> getInboundProcessor() {
+        return inboundProcessor;
+    }
+
+    public void setInboundProcessor(Processor<E> inboundProcessor) {
+        this.inboundProcessor = inboundProcessor;
+        activate();
+    }
+
+    /**
+     * Called at most once by the container to activate the endpoint
+     */
+    protected void doActivate() {
+    }
+
+    /**
+     * Called at most once by the container to deactivate the endpoint
+     */
+    protected void doDeactivate() {
     }
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java Mon Mar 19 18:15:57 2007
@@ -16,14 +16,15 @@
  */
 package org.apache.camel.queue;
 
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
+import org.apache.camel.Processor;
+
 import java.util.HashMap;
 import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.Component;
-import org.apache.camel.Processor;
-
 /**
  * Represents the component that manages {@link QueueEndpoint}.  It holds the 
  * list of named queues that queue endpoints reference.
@@ -35,7 +36,12 @@
 	
     private HashMap<String, Queue<E>> registry = new HashMap<String, Queue<E>>();
     private HashMap<QueueEndpoint<E>, Activation> activations = new HashMap<QueueEndpoint<E>, Activation>();
-    
+    private CamelContainer container;
+
+    public void setContainer(CamelContainer container) {
+        this.container = container;
+    }
+
     class Activation implements Runnable {
 		private final QueueEndpoint<E> endpoint;
 		AtomicBoolean stop = new AtomicBoolean();

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java Mon Mar 19 18:15:57 2007
@@ -16,12 +16,13 @@
  */
 package org.apache.camel.queue;
 
-import java.util.Queue;
-
 import org.apache.camel.CamelContainer;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 
+import java.util.Queue;
+
 /**
  * Represents a queue endpoint that uses a {@link Queue}
  * object to process inbound exchanges.
@@ -39,6 +40,11 @@
 
     public void send(E exchange) {
         queue.add(exchange);
+    }
+
+    public void setInboundProcessor(Processor<E> processor) {
+        // TODO lets start a thread to process inbound requests
+        // if we don't already have one
     }
 
     public E createExchange() {

Added: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java?view=auto&rev=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java (added)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/Builder.java Mon Mar 19 18:15:57 2007
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.jms;
+
+/**
+ * Some helper builder methods for the easy configuration of the {@link org.apache.camel.CamelContainer} via Java code.
+ *
+ * @version $Revision: $
+ */
+public class Builder {
+    public static JmsComponent jmsComponent() {
+        return new JmsComponent();
+    }
+}

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 18:15:57 2007
@@ -17,30 +17,59 @@
  */
 package org.apache.camel.jms;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
+import com.sun.jndi.toolkit.url.Uri;
 import org.apache.camel.CamelContainer;
 import org.apache.camel.Component;
 import org.apache.camel.Processor;
+import org.apache.camel.util.ObjectHelper;
 import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
-import com.sun.jndi.toolkit.url.Uri;
+import javax.jms.ConnectionFactory;
 
 /**
  * @version $Revision$
  */
 public class JmsComponent implements Component<JmsExchange, JmsEndpoint> {
-    private JmsTemplate template = new JmsTemplate();
-    private static final String QUEUE_PREFIX = "queue/";
-    private static final String TOPIC_PREFIX = "topic/";
+    public static final String QUEUE_PREFIX = "queue/";
+    public static final String TOPIC_PREFIX = "topic/";
+
     private CamelContainer container;
+    private JmsTemplate template;
+
+    /**
+     * Static builder method
+     */
+    public static JmsComponent jmsComponent() {
+        return new JmsComponent();
+    }
+
+    /**
+     * Static builder method
+     */
+    public static JmsComponent jmsComponent(JmsTemplate template) {
+        return new JmsComponent(template);
+    }
+
+    /**
+     * Static builder method
+     */
+    public static JmsComponent jmsComponent(ConnectionFactory connectionFactory) {
+        return jmsComponent(new JmsTemplate(connectionFactory));
+    }
+
+
+    protected JmsComponent() {
+        this.template = new JmsTemplate();
+    }
+
+    protected JmsComponent(JmsTemplate template) {
+        this.template = template;
+    }
 
     public JmsComponent(CamelContainer container) {
+        this();
         this.container = container;
     }
 
@@ -52,6 +81,8 @@
     }
 
     public JmsEndpoint createEndpoint(String uri, String path) {
+        ObjectHelper.notNull(container, "container");
+
         if (path.startsWith(QUEUE_PREFIX)) {
             template.setPubSubDomain(false);
             path = path.substring(QUEUE_PREFIX.length());
@@ -64,14 +95,25 @@
         final String subject = convertPathToActualDestination(path);
         template.setDefaultDestinationName(subject);
 
+        /*
         Destination destination = (Destination) template.execute(new SessionCallback() {
             public Object doInJms(Session session) throws JMSException {
                 return template.getDestinationResolver().resolveDestinationName(session, subject, template.isPubSubDomain());
             }
         });
+        */
 
         AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(template);
-        return new JmsEndpoint(uri, container, destination, template ,listenerContainer);
+        listenerContainer.setDestinationName(subject);
+        listenerContainer.setPubSubDomain(template.isPubSubDomain());
+        listenerContainer.setConnectionFactory(template.getConnectionFactory());
+
+        // TODO support optional parameters
+        // selector
+        // messageConverter
+        // durableSubscriberName 
+
+        return new JmsEndpoint(uri, container, template, listenerContainer);
     }
 
     public JmsTemplate getTemplate() {
@@ -82,8 +124,19 @@
         this.template = template;
     }
 
+
+    public CamelContainer getContainer() {
+        return container;
+    }
+
+    public void setContainer(CamelContainer container) {
+        this.container = container;
+    }
+
     protected AbstractMessageListenerContainer createMessageListenerContainer(JmsTemplate template) {
         // TODO use an enum to auto-switch container types?
+
+        //return new SimpleMessageListenerContainer();
         return new DefaultMessageListenerContainer();
     }
 
@@ -95,10 +148,11 @@
         return path;
     }
 
-	public void activate(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
-		// TODO Auto-generated method stub
-	}
-	public void deactivate(JmsEndpoint endpoint) {
-		// TODO Auto-generated method stub
-	}
+    public void activate(JmsEndpoint endpoint, Processor<JmsExchange> processor) {
+        // TODO Auto-generated method stub
+    }
+
+    public void deactivate(JmsEndpoint endpoint) {
+        // TODO Auto-generated method stub
+    }
 }

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpoint.java Mon Mar 19 18:15:57 2007
@@ -26,10 +26,8 @@
 
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.Session;
 import javax.jms.MessageListener;
-import javax.jms.Destination;
-import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Session;
 
 /**
  * @version $Revision$
@@ -37,33 +35,20 @@
 public class JmsEndpoint extends DefaultEndpoint<JmsExchange> implements MessageListener {
 
     private JmsOperations template;
-    private Destination destination;
     private AbstractMessageListenerContainer listenerContainer;
-    private Processor<Exchange> processor;
-    private AtomicBoolean startedConsuming = new AtomicBoolean(false);
 
-    public JmsEndpoint(String endpointUri, CamelContainer container, Destination destination, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
+    public JmsEndpoint(String endpointUri, CamelContainer container, JmsOperations template, AbstractMessageListenerContainer listenerContainer) {
         super(endpointUri, container);
-        this.destination = destination;
         this.template = template;
         this.listenerContainer = listenerContainer;
         this.listenerContainer.setMessageListener(this);
-        this.listenerContainer.setDestination(destination);
     }
 
     public void onMessage(Message message) {
-        Exchange exchange = createExchange(message);
-        processor.onExchange(exchange);
+        JmsExchange exchange = createExchange(message);
+        getInboundProcessor().onExchange(exchange);
     }
 
-    public void setProcessor(Processor<Exchange> processor) {
-        this.processor = processor;
-        if (startedConsuming.compareAndSet(false, true)) {
-            listenerContainer.afterPropertiesSet();
-            listenerContainer.initialize();
-            listenerContainer.start();
-        }
-    }
 
     public void send(Exchange exchange) {
         // lets convert to the type of an exchange
@@ -72,20 +57,13 @@
     }
 
     public void send(final JmsExchange exchange) {
-        template.send(getDestination(), new MessageCreator() {
+        template.send(new MessageCreator() {
             public Message createMessage(Session session) throws JMSException {
                 return exchange.createMessage(session);
             }
         });
     }
 
-    /**
-     * Returns the JMS destination for this endpoint
-     */
-    public Destination getDestination() {
-        return destination;
-    }
-
     public JmsOperations getTemplate() {
         return template;
     }
@@ -102,5 +80,19 @@
 
     protected MessageListener createMessageListener(Processor<Exchange> processor) {
         return new MessageListenerProcessor(processor);
+    }
+
+
+    protected void doActivate() {
+        super.doActivate();
+        listenerContainer.afterPropertiesSet();
+        listenerContainer.initialize();
+        listenerContainer.start();
+    }
+
+    protected void doDeactivate() {
+        listenerContainer.stop();
+        listenerContainer.destroy();
+        super.doDeactivate();
     }
 }

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsEndpointResolver.java Mon Mar 19 18:15:57 2007
@@ -17,8 +17,6 @@
  */
 package org.apache.camel.jms;
 
-import java.util.concurrent.Callable;
-
 import org.apache.axis.transport.jms.JMSEndpoint;
 import org.apache.camel.CamelContainer;
 import org.apache.camel.Component;
@@ -26,6 +24,8 @@
 import org.apache.camel.queue.QueueComponent;
 import org.apache.camel.util.ObjectHelper;
 
+import java.util.concurrent.Callable;
+
 /**
  * An implementation of {@link EndpointResolver} that creates
  * {@link JMSEndpoint} objects.
@@ -69,18 +69,18 @@
 		String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3);        
     	if( splitURI[2] != null ) {
     		rc[0] =  splitURI[1];
-    		rc[0] =  splitURI[2];
+    		rc[1] =  splitURI[2];
     	} else {
-    		rc[0] =  splitURI[1];
+    		rc[1] =  splitURI[1];
     	}
 		return rc;
 	}
 	
 	@SuppressWarnings("unchecked")
-	private JmsComponent resolveJmsComponent(final CamelContainer container, String componentName) {
+	private JmsComponent resolveJmsComponent(final CamelContainer container, final String componentName) {
     	Component rc = container.getOrCreateComponent(componentName, new Callable<JmsComponent>(){
 			public JmsComponent call() throws Exception {
-				return new JmsComponent(container);
+                return new JmsComponent(container);
 			}});
     	return (JmsComponent) rc;
 	}

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=diff&rev=520200&r1=520199&r2=520200
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 18:15:57 2007
@@ -18,12 +18,32 @@
 package org.apache.camel.jms;
 
 import junit.framework.TestCase;
+import org.apache.camel.CamelContainer;
+import org.apache.camel.builder.RouteBuilder;
+
+import static org.apache.camel.jms.JmsComponent.*;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.ConnectionFactory;
 
 /**
  * @version $Revision$
  */
 public class JmsRouteTest extends TestCase {
     public void testJmsRoute() throws Exception {
-        // TODO 
+        CamelContainer container = new CamelContainer();
+
+        System.out.println("Created container: " + container);
+        
+        // lets configure some componnets
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        container.addComponent("activemq", jmsComponent(connectionFactory));
+
+        // lets add some routes
+        container.routes(new RouteBuilder() {
+            public void configure() {
+                from("jms:activemq:FOO.BAR").to("jms:activemq:FOO.BAR");
+            }
+        });
     }
 }