You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/03 13:58:34 UTC

svn commit: r525142 - in /activemq/camel/trunk: ./ apache-camel/ camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/pojo/ camel-core/src/main/java/org/apache/camel/component/pojo/timer/ camel-core/src/main/ja...

Author: jstrachan
Date: Tue Apr  3 04:58:32 2007
New Revision: 525142

URL: http://svn.apache.org/viewvc?view=rev&rev=525142
Log:
Added a working camel-jpa test case. Added an ExecutionService property to the Endpoint which defaults to the Component. Refactored the DefaultEndpoint constructor to take a Component parameter.

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java   (with props)
    activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java   (with props)
Modified:
    activemq/camel/trunk/apache-camel/pom.xml
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
    activemq/camel/trunk/camel-cxf/pom.xml
    activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
    activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
    activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
    activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
    activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
    activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
    activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
    activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java
    activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
    activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
    activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
    activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
    activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java
    activemq/camel/trunk/pom.xml

Modified: activemq/camel/trunk/apache-camel/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/apache-camel/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/apache-camel/pom.xml (original)
+++ activemq/camel/trunk/apache-camel/pom.xml Tue Apr  3 04:58:32 2007
@@ -56,6 +56,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-jpa</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-mina</artifactId>
     </dependency>
     <dependency>
@@ -156,7 +160,7 @@
               <goal>createbundle</goal>
             </goals>
             <configuration>
-              <includes>camel-core,camel-cxf,camel-http,camel-jbi,camel-jms,camel-mina,camel-script,camel-spring,commons-logging</includes>
+              <includes>camel-core,camel-cxf,camel-http,camel-jbi,camel-jms,camel-jpa,camel-mina,camel-script,camel-spring,commons-logging</includes>
             </configuration>
           </execution>
         </executions>

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Tue Apr  3 04:58:32 2007
@@ -16,17 +16,33 @@
  */
 package org.apache.camel;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 /**
  * Represents a component which manages a set of {@link Endpoint} objects.
  *
  * @version $Revision: 519901 $
  */
 public interface Component<E>  {
+    
+    /**
+     * Returns the context
+     *
+     * @return the context of this component
+     */
+    CamelContext getCamelContext();
 
     /**
      * The {@link CamelContext} is injected into the component when it is added to it
      */
     void setCamelContext(CamelContext context);
 
+
+    /**
+     * Returns the executor for this endpoint which typically defaults to the components executor
+     *
+     * @return the executor for this endpoint
+     */
+    ScheduledExecutorService getExecutorService();
 	
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Tue Apr  3 04:58:32 2007
@@ -16,6 +16,7 @@
  */
 package org.apache.camel;
 
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Implements the <a href="http://activemq.apache.org/camel/message-endpoint.html">Message Endpoint</a>
@@ -42,23 +43,6 @@
     E createExchange(E exchange);
 
     /**
-     * Called by the container to Activate the endpoint.  Once activated,
-     * the endpoint will start delivering inbound message exchanges
-     * that are received to the specified processor.
-     *
-     * The processor must be thread safe ( or stateless ) since some endpoints 
-     * may choose to deliver exchanges concurrently to the processor.
-     * 
-     * @throws IllegalStateException if the Endpoint has already been activated.
-     */
-	void activate(Processor<E> processor) throws Exception;
-
-    /**
-     * Called by the container when the endpoint is deactivated
-     */
-    void deactivate();
-
-    /**
      * Returns the context which created the endpoint
      *
      * @return the context which created the endpoint
@@ -78,4 +62,11 @@
      * @return a newly created consumer
      */
     Consumer<E> createConsumer(Processor<E> processor) throws Exception;
+
+    /**
+     * Returns the executor for this endpoint which typically defaults to the components executor
+     *
+     * @return the executor for this endpoint
+     */
+    ScheduledExecutorService getExecutorService();
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java Tue Apr  3 04:58:32 2007
@@ -16,8 +16,7 @@
  */
 package org.apache.camel.component.pojo;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultComponent;
 
 import java.util.HashMap;
 
@@ -27,20 +26,19 @@
  *
  * @version $Revision: 519973 $
  */
-public class PojoComponent implements Component<PojoExchange> {
-	
+public class PojoComponent extends DefaultComponent<PojoExchange> {
     protected final HashMap<String, Object> services = new HashMap<String, Object>();
     protected final HashMap<String, PojoConsumer> consumers = new HashMap<String, PojoConsumer>();
-    
-    private CamelContext container;
 
     public void addService(String uri, Object pojo) {
         services.put(uri, pojo);
     }
+
     public void removeService(String uri) {
         services.remove(uri);
         removeConsumer(uri);
     }
+
     public Object getService(String uri) {
         return services.get(uri);
     }
@@ -48,17 +46,12 @@
     void addConsumer(String uri, PojoConsumer endpoint) {
         consumers.put(uri, endpoint);
     }
+
     void removeConsumer(String uri) {
         consumers.remove(uri);
     }
+
     public PojoConsumer getConsumer(String uri) {
         return consumers.get(uri);
-    }
-
-    public void setCamelContext(CamelContext container) {
-        this.container = container;
-    }
-    public CamelContext getContainer() {
-        return container;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java Tue Apr  3 04:58:32 2007
@@ -36,7 +36,7 @@
 	private final String pojoId;
 
     public PojoEndpoint(String uri, String pojoId, PojoComponent component) {
-        super(uri, component.getContainer());
+        super(uri, component);
 		this.pojoId = pojoId;
         this.component = component;
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java Tue Apr  3 04:58:32 2007
@@ -16,35 +16,25 @@
  */
 package org.apache.camel.component.pojo.timer;
 
-import java.util.ArrayList;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
 import org.apache.camel.component.pojo.PojoExchange;
+import org.apache.camel.impl.DefaultComponent;
+
+import java.util.ArrayList;
 
 /**
  * Represents the component that manages {@link TimerEndpoint}.  It holds the
  * list of {@link TimerConsumer} objects that are started.
- * 
+ *
  * @version $Revision: 519973 $
  */
-public class TimerComponent implements Component<PojoExchange> {
-	
+public class TimerComponent extends DefaultComponent<PojoExchange> {
     protected final ArrayList<TimerConsumer> timers = new ArrayList<TimerConsumer>();
-    
-    private CamelContext container;
 
     boolean addConsumer(TimerConsumer consumer) {
         return timers.add(consumer);
     }
+
     boolean removeConsumer(TimerConsumer consumer) {
         return timers.remove(consumer);
-    }
-
-    public void setCamelContext(CamelContext container) {
-        this.container = container;
-    }
-    public CamelContext getContainer() {
-        return container;
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java Tue Apr  3 04:58:32 2007
@@ -47,7 +47,7 @@
 
 
     public TimerEndpoint(String fullURI, String timerPartURI, TimerComponent component) throws URISyntaxException {
-        super(fullURI, component.getContainer());
+        super(fullURI, component);
         this.component = component;
 		
         // Use a URI to extract query so they can be set as properties on the endpoint.

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java Tue Apr  3 04:58:32 2007
@@ -18,6 +18,8 @@
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Component;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultComponent;
 
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
@@ -30,21 +32,10 @@
  * @org.apache.xbean.XBean
  * @version $Revision: 519973 $
  */
-public class QueueComponent<E> implements Component<E> {
+public class QueueComponent<E extends Exchange> extends DefaultComponent<E> {
 	
-    private CamelContext container;
-
-    public void setCamelContext(CamelContext container) {
-        this.container = container;
-    }
-
 	public BlockingQueue<E> createQueue() {
 		return new LinkedBlockingQueue<E>();
 	}
-
-	public CamelContext getContainer() {
-		return container;
-	}
-
 
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Tue Apr  3 04:58:32 2007
@@ -37,8 +37,8 @@
 public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
     private BlockingQueue<E> queue;
 
-    public QueueEndpoint(String uri, CamelContext container, BlockingQueue<E> queue) {
-        super(uri, container);
+    public QueueEndpoint(String uri, QueueComponent<E> component, BlockingQueue<E> queue) {
+        super(uri, component);
         this.queue = queue;
     }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java Tue Apr  3 04:58:32 2007
@@ -63,7 +63,7 @@
 		String id[] = getEndpointId(uri);        
     	QueueComponent<E> component = resolveQueueComponent(container, id[0]);  
     	BlockingQueue<E> queue = component.createQueue();
-		return new QueueEndpoint<E>(uri, container, queue);
+		return new QueueEndpoint<E>(uri, component, queue);
     }
 
 	/**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Tue Apr  3 04:58:32 2007
@@ -21,24 +21,65 @@
 import org.apache.camel.Component;
 import org.apache.camel.Exchange;
 
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
 /**
  * @version $Revision$
  */
-public class DefaultComponent<E extends Exchange> implements Component<E> {
-    private CamelContext context;
+public class DefaultComponent<E extends Exchange> extends ServiceSupport implements Component<E> {
+    private int defaultThreadPoolSize = 5;
+    private CamelContext camelContext;
+    private ScheduledExecutorService executorService;
 
     public DefaultComponent() {
     }
 
     public DefaultComponent(CamelContext context) {
-        this.context = context;
+        this.camelContext = context;
     }
 
-    public CamelContext getContext() {
-        return context;
+    public CamelContext getCamelContext() {
+        return camelContext;
     }
 
     public void setCamelContext(CamelContext context) {
-        this.context = context;
+        this.camelContext = context;
+    }
+
+    public ScheduledExecutorService getExecutorService() {
+        if (executorService == null) {
+            executorService = createExecutorService();
+        }
+        return executorService;
+    }
+
+    public void setExecutorService(ScheduledExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    /**
+     * A factory method to create a default thread pool and executor
+     */
+    protected ScheduledExecutorService createExecutorService() {
+        return new ScheduledThreadPoolExecutor(defaultThreadPoolSize, new ThreadFactory() {
+            int counter;
+
+            public synchronized Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable);
+                thread.setName("Thread" + (++counter) + " " + DefaultComponent.this.toString());
+                return thread;
+            }
+        });
+    }
+
+    protected void doStart() throws Exception {
+    }
+
+    protected void doStop() throws Exception {
+        if (executorService != null) {
+            executorService.shutdown();
+        }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Tue Apr  3 04:58:32 2007
@@ -20,13 +20,12 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.Consumer;
 import org.apache.camel.Service;
+import org.apache.camel.Component;
 import org.apache.camel.util.ObjectHelper;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * A default endpoint useful for implementation inheritence
@@ -35,14 +34,15 @@
  */
 public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
     private String endpointUri;
+    private final Component component;
     private CamelContext context;
-    private Processor<E> inboundProcessor;
     protected AtomicBoolean activated = new AtomicBoolean(false);
     protected AtomicBoolean deactivated = new AtomicBoolean(false);
 
-    protected DefaultEndpoint(String endpointUri, CamelContext container) {
+    protected DefaultEndpoint(String endpointUri, Component component) {
         this.endpointUri = endpointUri;
-        this.context = container;
+        this.component = component;
+        this.context = component.getCamelContext();
     }
 
     public int hashCode() {
@@ -71,6 +71,14 @@
         return context;
     }
 
+    public Component getComponent() {
+        return component;
+    }
+
+    public ScheduledExecutorService getExecutorService() {
+        return getComponent().getExecutorService();
+    }
+
     /**
      * Converts the given exchange to the specified exchange type
      */
@@ -82,25 +90,6 @@
         return getContext().getExchangeConverter().convertTo(type, exchange);
     }
 
-    public void activate(Processor<E> inboundProcessor) throws Exception {
-        if (activated.compareAndSet(false, true)) {
-            deactivated.set(false);
-            this.inboundProcessor = inboundProcessor;
-            doActivate();
-        }
-        else {
-            throw new IllegalStateException("Endpoint is already active: " + getEndpointUri());
-        }
-    }
-
-    public void deactivate() {
-        if (deactivated.compareAndSet(false, true)) {
-            activated.set(false);
-            doDeactivate();
-        }
-    }
-
-
     public E createExchange(E exchange) {
         E answer = createExchange();
         answer.copyFrom(exchange);
@@ -108,28 +97,8 @@
     }
 
     /**
-     * The processor used to process inbound message exchanges
+     * A helper method to reduce the clutter of implementors of {@link #createProducer()} and {@link #createConsumer(Processor)}
      */
-    public Processor<E> getInboundProcessor() {
-        return inboundProcessor;
-    }
-
-    public void setInboundProcessor(Processor<E> inboundProcessor) {
-        this.inboundProcessor = inboundProcessor;
-    }
-
-    /**
-     * Called at most once by the container to activate the endpoint
-     */
-    protected void doActivate() throws Exception {
-    }
-
-    /**
-     * Called at most once by the container to deactivate the endpoint
-     */
-    protected void doDeactivate() {
-    }
-
     protected <T extends Service> T startService(T service) throws Exception {
         service.start();
         return service;

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java?view=auto&rev=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java Tue Apr  3 04:58:32 2007
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A useful base class for any consumer which is polling based
+ *
+ * @version $Revision$
+ */
+public abstract class PollingConsumer<E extends Exchange> extends DefaultConsumer<E> implements Runnable {
+    private long initialDelay = 1000;
+    private long delay = 500;
+    private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+    private boolean useFixedDelay;
+    private ScheduledFuture<?> future;
+
+    public PollingConsumer(Endpoint<E> endpoint, Processor<E> processor) {
+        super(endpoint, processor);
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public long getInitialDelay() {
+        return initialDelay;
+    }
+
+    public void setInitialDelay(long initialDelay) {
+        this.initialDelay = initialDelay;
+    }
+
+    public long getDelay() {
+        return delay;
+    }
+
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    public void setTimeUnit(TimeUnit timeUnit) {
+        this.timeUnit = timeUnit;
+    }
+
+    public boolean isUseFixedDelay() {
+        return useFixedDelay;
+    }
+
+    public void setUseFixedDelay(boolean useFixedDelay) {
+        this.useFixedDelay = useFixedDelay;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    @Override
+    protected void doStart() throws Exception {
+        ScheduledExecutorService executor = getEndpoint().getExecutorService();
+        if (isUseFixedDelay()) {
+            future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
+        }
+        else {
+            future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
+        }
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            future.cancel(false);
+        }
+        super.doStop();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Tue Apr  3 04:58:32 2007
@@ -66,7 +66,7 @@
 
         // now lets sleep for a while
         boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+        assertTrue("Did not receive the message!", received);
 
         container.deactivateEndpoints();
     }

Modified: activemq/camel/trunk/camel-cxf/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-cxf/pom.xml (original)
+++ activemq/camel/trunk/camel-cxf/pom.xml Tue Apr  3 04:58:32 2007
@@ -130,7 +130,7 @@
           </includes>
           <excludes>
             <!-- TODO re-enable ASAP! -->
-            <exclude>**/Camel*Test.*</exclude>
+            <exclude>**/*Test.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Tue Apr  3 04:58:32 2007
@@ -38,7 +38,7 @@
     private boolean inOut = true;
 
     public CxfEndpoint(String uri, CxfComponent component, EndpointInfo endpointInfo) {
-        super(uri, component.getContext());
+        super(uri, component);
         this.component = component;
         this.endpointInfo = endpointInfo;
     }

Added: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java?view=auto&rev=525142
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java (added)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java Tue Apr  3 04:58:32 2007
@@ -0,0 +1,26 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.http;
+
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * @version $Revision$
+ */
+public class HttpComponent extends DefaultComponent<HttpExchange> {
+}

Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java Tue Apr  3 04:58:32 2007
@@ -37,8 +37,8 @@
 
     private HttpBinding binding;
 
-    protected HttpEndpoint(String uri, CamelContext camelContext) {
-        super(uri, camelContext);
+    protected HttpEndpoint(String uri, HttpComponent component) {
+        super(uri, component);
     }
 
     public Producer<HttpExchange> createProducer() throws Exception {

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java Tue Apr  3 04:58:32 2007
@@ -24,6 +24,7 @@
 import org.apache.servicemix.jbi.util.IntrospectionSupport;
 import org.apache.servicemix.jbi.util.URISupport;
 import org.apache.servicemix.jbi.resolver.URIResolver;
+import org.apache.servicemix.executors.Executor;
 
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
@@ -32,6 +33,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Deploys the camel endpoints within JBI
@@ -41,6 +44,7 @@
 public class CamelJbiComponent extends DefaultComponent implements Component<JbiExchange>, EndpointResolver {
     private JbiBinding binding;
     private CamelContext camelContext;
+    private ScheduledExecutorService executorService;
 
     /**
      * @return List of endpoints
@@ -66,6 +70,7 @@
         return new Class[]{CamelJbiEndpoint.class};
     }
 
+
     /**
      * @return the binding
      */
@@ -137,6 +142,13 @@
 
     public void setCamelContext(CamelContext camelContext) {
         this.camelContext = camelContext;
+    }
+
+    public ScheduledExecutorService getExecutorService() {
+        if (executorService == null) {
+            executorService = new ScheduledThreadPoolExecutor(5);
+        }
+        return executorService;
     }
 
     /**

Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java Tue Apr  3 04:58:32 2007
@@ -36,7 +36,7 @@
     private final CamelJbiComponent jbiComponent;
 
     public JbiEndpoint(CamelJbiComponent jbiComponent, String uri) {
-        super(uri, jbiComponent.getCamelContext());
+        super(uri, jbiComponent);
         this.jbiComponent = jbiComponent;
         toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(), uri);
     }
@@ -72,21 +72,6 @@
         });
     }
 
-    /*
-    public void onExchange(Exchange exchange) {
-        if (getInboundProcessor() != null) {
-            getInboundProcessor().onExchange(exchange);
-        } else {
-            toJbiProcessor.onExchange(exchange);        }
-    }
-    */
-
-    @Override
-    protected void doActivate() throws Exception {
-        super.doActivate();
-
-        // lets create and activate the endpoint in JBI
-    }
 
     public JbiExchange createExchange() {
         return new JbiExchange(getContext(), getBinding());

Modified: activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java (original)
+++ activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java Tue Apr  3 04:58:32 2007
@@ -63,7 +63,7 @@
     protected Object assertReceivedValidExchange(Class type) throws Exception {
         // lets wait on the message being received
         boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+        assertTrue("Did not receive the message!", received);
 
         assertNotNull(receivedExchange);
         Message receivedMessage = receivedExchange.getIn();

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Tue Apr  3 04:58:32 2007
@@ -90,7 +90,7 @@
     }
 
     public JmsEndpoint createEndpoint(String uri, String path) throws URISyntaxException {
-        ObjectHelper.notNull(getContext(), "container");
+        ObjectHelper.notNull(getCamelContext(), "camelContext");
 
         boolean pubSubDomain = false;
         if (path.startsWith(QUEUE_PREFIX)) {
@@ -105,7 +105,7 @@
         final String subject = convertPathToActualDestination(path);
 
         // lets make sure we copy the configuration as each endpoint can customize its own version
-        JmsEndpoint endpoint = new JmsEndpoint(uri, getContext(), subject, pubSubDomain, getConfiguration().copy());
+        JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
 
         URI u = new URI(uri);
         Map options = URISupport.parseParamters(u);

Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Tue Apr  3 04:58:32 2007
@@ -41,8 +41,8 @@
     private String selector;
     private JmsConfiguration configuration;
 
-    public JmsEndpoint(String uri, CamelContext context, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
-        super(uri, context);
+    public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
+        super(uri, component);
         this.configuration = configuration;
         this.destination = destination;
         this.pubSubDomain = pubSubDomain;

Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Tue Apr  3 04:58:32 2007
@@ -78,7 +78,7 @@
     protected Object assertReceivedValidExchange(Class type) throws Exception {
         // lets wait on the message being received
         boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+        assertTrue("Did not receive the message!", received);
 
         assertNotNull(receivedExchange);
         JmsMessage receivedMessage = receivedExchange.getIn();

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Apr  3 04:58:32 2007
@@ -19,20 +19,26 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.PollingConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import javax.persistence.EntityManager;
 import javax.persistence.Query;
+import javax.persistence.EntityTransaction;
 import java.util.List;
 
 /**
  * @version $Revision$
  */
-public class JpaConsumer extends DefaultConsumer<Exchange> {
+public class JpaConsumer extends PollingConsumer<Exchange> {
+    private static final transient Log log = LogFactory.getLog(JpaConsumer.class);
+
     private final JpaEndpoint endpoint;
     private final EntityManager entityManager;
     private QueryFactory queryFactory;
     private DeleteHandler<Object> deleteHandler;
+    private EntityTransaction transaction;
 
     public JpaConsumer(JpaEndpoint endpoint, Processor<Exchange> processor, EntityManager entityManager) {
         super(endpoint, processor);
@@ -43,15 +49,33 @@
     /**
      * Invoked whenever we should be polled
      */
-    public void run() {
-        Query query = queryFactory.createQuery(this);
-        configureParameters(query);
-        List results = query.getResultList();
-        for (Object result : results) {
-            // lets turn the result into an exchange and fire it into the processor
-            Exchange exchange = createExchange(result);
-            getProcessor().onExchange(exchange);
-            deleteHandler.deleteObject(this, result);
+    public synchronized void run() {
+        log.debug("Starting to poll for new database entities to process");
+        transaction = entityManager.getTransaction();
+        transaction.begin();
+
+        try {
+            Query query = getQueryFactory().createQuery(this);
+            configureParameters(query);
+            List results = query.getResultList();
+            for (Object result : results) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Processing new entity: " + result);
+                }
+                // lets turn the result into an exchange and fire it into the processor
+                Exchange exchange = createExchange(result);
+                getProcessor().onExchange(exchange);
+                getDeleteHandler().deleteObject(this, result);
+            }
+
+            transaction.commit();
+            transaction = null;
+        }
+        catch (RuntimeException e) {
+            log.warn("Caught: " + e, e);
+            if (transaction != null) {
+                transaction.rollback();
+            }
         }
     }
 
@@ -78,6 +102,9 @@
     }
 
     public DeleteHandler getDeleteHandler() {
+        if (deleteHandler == null) {
+            deleteHandler = createDeleteHandler();
+        }
         return deleteHandler;
     }
 
@@ -88,7 +115,10 @@
     // Implementation methods
     //-------------------------------------------------------------------------
     @Override
-    protected void doStop() throws Exception {
+    protected synchronized void doStop() throws Exception {
+        if (transaction != null) {
+            transaction.rollback();
+        }
         entityManager.close();
         super.doStop();
     }
@@ -101,6 +131,16 @@
         else {
             return QueryBuilder.query("select x from " + entityType.getName() + " x");
         }
+    }
+
+    protected DeleteHandler<Object> createDeleteHandler() {
+        // TODO auto-discover an annotation in the entity bean to indicate the process completed method call?
+
+        return new DeleteHandler<Object>() {
+            public void deleteObject(JpaConsumer consumer, Object entityBean) {
+                consumer.getEntityManager().remove(entityBean);
+            }
+        };
     }
 
     protected void configureParameters(Query query) {

Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java Tue Apr  3 04:58:32 2007
@@ -39,7 +39,7 @@
     private Class<?> entityType;
 
     public JpaEndpoint(String uri, JpaComponent component) {
-        super(uri, component.getContext());
+        super(uri, component);
         this.component = component;
     }
 
@@ -48,11 +48,11 @@
     }
 
     public Producer<Exchange> createProducer() throws Exception {
-        return new JpaProducer(this, createEntityManager(), getProducerExpression());
+        return startService(new JpaProducer(this, createEntityManager(), getProducerExpression()));
     }
 
     public Consumer<Exchange> createConsumer(Processor<Exchange> processor) throws Exception {
-        return new JpaConsumer(this, processor, createEntityManager());
+        return startService(new JpaConsumer(this, processor, createEntityManager()));
     }
 
     // Properties

Modified: activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java (original)
+++ activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java Tue Apr  3 04:58:32 2007
@@ -50,15 +50,21 @@
     protected Consumer<Exchange> consumer;
     protected Exchange receivedExchange;
     protected CountDownLatch latch = new CountDownLatch(1);
-    protected String queryText = "select o from " + SendEmail.class.getName() + " o";
+    protected String entityName = SendEmail.class.getName();
+    protected String queryText = "select o from " + entityName + " o";
     protected EntityTransaction transaction;
 
     public void testProducerInsertsIntoDatabaseThenConsumerFiresMessageExchange() throws Exception {
         // lets assert that there are no existing send mail tasks
         transaction = entityManager.getTransaction();
         transaction.begin();
+
+        // lets delete any exiting records before the test
+        entityManager.createQuery("delete from " + entityName).executeUpdate();
+
         List results = entityManager.createQuery(queryText).getResultList();
         assertEquals("Should have no results: " + results, 0, results.size());
+        transaction.commit();
 
         // lets produce some objects
         client.send(endpoint, new Processor<Exchange>() {
@@ -66,7 +72,6 @@
                 exchange.getIn().setBody(new SendEmail("foo@bar.com"));
             }
         });
-        transaction.commit();
 
         // now lets assert that there is a result
         transaction.begin();
@@ -74,6 +79,8 @@
         assertEquals("Should have no results: " + results, 1, results.size());
         SendEmail mail = (SendEmail) results.get(0);
         assertEquals("address property", "foo@bar.com", mail.getAddress());
+        transaction.commit();
+        transaction = null;
 
         // now lets create a consumer to consume it
         consumer = endpoint.createConsumer(new Processor<Exchange>() {
@@ -84,8 +91,8 @@
             }
         });
 
-        boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+        boolean received = latch.await(50, TimeUnit.SECONDS);
+        assertTrue("Did not receive the message!", received);
 
         assertNotNull(receivedExchange);
         SendEmail result = receivedExchange.getIn().getBody(SendEmail.class);

Modified: activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java (original)
+++ activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java Tue Apr  3 04:58:32 2007
@@ -32,11 +32,15 @@
     private String address;
 
     public SendEmail() {
-
     }
 
     public SendEmail(String address) {
         setAddress(address);
+    }
+
+    @Override
+    public String toString() {
+        return "SendEmail[id: " + getId() + " address: " + getAddress() + "]";
     }
 
     @Id

Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Tue Apr  3 04:58:32 2007
@@ -73,7 +73,7 @@
         IoAcceptor acceptor = new VmPipeAcceptor();
         SocketAddress address = new VmPipeAddress(connectUri.getPort());
         IoConnector connector = new VmPipeConnector();
-        return new MinaEndpoint(uri, getContext(), address, acceptor, connector, null);
+        return new MinaEndpoint(uri, this, address, acceptor, connector, null);
     }
 
     protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri) {
@@ -85,7 +85,7 @@
         SocketConnectorConfig config = new SocketConnectorConfig();
         config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
 
-        return new MinaEndpoint(uri, getContext(), address, acceptor, connector, config);
+        return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
 
     protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri) {
@@ -97,6 +97,6 @@
         DatagramConnectorConfig config = new DatagramConnectorConfig();
         config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
 
-        return new MinaEndpoint(uri, getContext(), address, acceptor, connector, config);
+        return new MinaEndpoint(uri, this, address, acceptor, connector, config);
     }
 }

Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Tue Apr  3 04:58:32 2007
@@ -36,15 +36,13 @@
  * @version $Revision$
  */
 public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
-    private static final transient Log log = LogFactory.getLog(MinaEndpoint.class);
-
     private final IoAcceptor acceptor;
     private final SocketAddress address;
     private final IoConnector connector;
     private final IoServiceConfig config;
 
-    public MinaEndpoint(String endpointUri, CamelContext container, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
-        super(endpointUri, container);
+    public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
+        super(endpointUri, component);
         this.config = config;
         this.address = address;
         this.acceptor = acceptor;
@@ -86,20 +84,6 @@
 
     public IoServiceConfig getConfig() {
         return config;
-    }
-
-    
-    // Implementation methods
-    //-------------------------------------------------------------------------
-
-    @Override
-    protected void doActivate() throws Exception {
-        super.doActivate();
-    }
-
-    @Override
-    protected void doDeactivate() {
-        acceptor.unbindAll();
     }
 
 }

Modified: activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java Tue Apr  3 04:58:32 2007
@@ -54,9 +54,9 @@
     }
 
     public XmppEndpoint createEndpoint(String uri, String path) throws URISyntaxException {
-        ObjectHelper.notNull(getContext(), "context");
+        ObjectHelper.notNull(getCamelContext(), "context");
 
-        XmppEndpoint endpoint = new XmppEndpoint(uri, getContext());
+        XmppEndpoint endpoint = new XmppEndpoint(uri, this);
 
         URI u = new URI(uri);
         endpoint.setHost(u.getHost());

Modified: activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java Tue Apr  3 04:58:32 2007
@@ -51,8 +51,8 @@
     private String room;
     private String participant;
 
-    public XmppEndpoint(String uri, CamelContext context) {
-        super(uri, context);
+    public XmppEndpoint(String uri, XmppComponent component) {
+        super(uri, component);
     }
 
     public Producer<XmppExchange> createProducer() throws Exception {

Modified: activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java Tue Apr  3 04:58:32 2007
@@ -93,7 +93,7 @@
     protected Object assertReceivedValidExchange() throws Exception {
         // lets wait on the message being received
         boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not recieve the message!", received);
+        assertTrue("Did not receive the message!", received);
 
         assertNotNull(receivedExchange);
         XmppMessage receivedMessage = receivedExchange.getIn();

Modified: activemq/camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/pom.xml (original)
+++ activemq/camel/trunk/pom.xml Tue Apr  3 04:58:32 2007
@@ -118,6 +118,7 @@
     <module>camel-itest</module>
     <module>camel-jbi</module>
     <module>camel-jms</module>
+    <module>camel-jpa</module>
     <module>camel-mina</module>
     <module>camel-script</module>
     <module>camel-spring</module>
@@ -161,6 +162,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-jpa</artifactId>
+        <version>${camel-version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-mina</artifactId>
         <version>${camel-version}</version>
       </dependency>
@@ -378,6 +384,23 @@
         <groupId>org.easymock</groupId>
         <artifactId>easymockclassextension</artifactId>
         <version>2.2.1</version>
+        <scope>test</scope>
+      </dependency>
+
+
+      <!-- default JPA support -->
+      <dependency>
+        <groupId>org.apache.openjpa</groupId>
+        <artifactId>openjpa-persistence-jdbc</artifactId>
+        <version>0.9.6-incubating</version>
+        <scope>test</scope>
+      </dependency>
+
+      <!-- common testing dependencies -->
+      <dependency>
+        <groupId>org.apache.derby</groupId>
+        <artifactId>derby</artifactId>
+        <version>10.1.3.1</version>
         <scope>test</scope>
       </dependency>
     </dependencies>