You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2007/04/03 13:58:34 UTC
svn commit: r525142 - in /activemq/camel/trunk: ./ apache-camel/
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/pojo/
camel-core/src/main/java/org/apache/camel/component/pojo/timer/
camel-core/src/main/ja...
Author: jstrachan
Date: Tue Apr 3 04:58:32 2007
New Revision: 525142
URL: http://svn.apache.org/viewvc?view=rev&rev=525142
Log:
Added a working camel-jpa test case. Added an ExecutionService property to the Endpoint which defaults to the Component. Refactored the DefaultEndpoint constructor to take a Component parameter.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java (with props)
activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java (with props)
Modified:
activemq/camel/trunk/apache-camel/pom.xml
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
activemq/camel/trunk/camel-cxf/pom.xml
activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java
activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java
activemq/camel/trunk/pom.xml
Modified: activemq/camel/trunk/apache-camel/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/apache-camel/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/apache-camel/pom.xml (original)
+++ activemq/camel/trunk/apache-camel/pom.xml Tue Apr 3 04:58:32 2007
@@ -56,6 +56,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-jpa</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-mina</artifactId>
</dependency>
<dependency>
@@ -156,7 +160,7 @@
<goal>createbundle</goal>
</goals>
<configuration>
- <includes>camel-core,camel-cxf,camel-http,camel-jbi,camel-jms,camel-mina,camel-script,camel-spring,commons-logging</includes>
+ <includes>camel-core,camel-cxf,camel-http,camel-jbi,camel-jms,camel-jpa,camel-mina,camel-script,camel-spring,commons-logging</includes>
</configuration>
</execution>
</executions>
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Tue Apr 3 04:58:32 2007
@@ -16,17 +16,33 @@
*/
package org.apache.camel;
+import java.util.concurrent.ScheduledExecutorService;
+
/**
* Represents a component which manages a set of {@link Endpoint} objects.
*
* @version $Revision: 519901 $
*/
public interface Component<E> {
+
+ /**
+ * Returns the context
+ *
+ * @return the context of this component
+ */
+ CamelContext getCamelContext();
/**
* The {@link CamelContext} is injected into the component when it is added to it
*/
void setCamelContext(CamelContext context);
+
+ /**
+ * Returns the executor for this endpoint which typically defaults to the components executor
+ *
+ * @return the executor for this endpoint
+ */
+ ScheduledExecutorService getExecutorService();
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Tue Apr 3 04:58:32 2007
@@ -16,6 +16,7 @@
*/
package org.apache.camel;
+import java.util.concurrent.ScheduledExecutorService;
/**
* Implements the <a href="http://activemq.apache.org/camel/message-endpoint.html">Message Endpoint</a>
@@ -42,23 +43,6 @@
E createExchange(E exchange);
/**
- * Called by the container to Activate the endpoint. Once activated,
- * the endpoint will start delivering inbound message exchanges
- * that are received to the specified processor.
- *
- * The processor must be thread safe ( or stateless ) since some endpoints
- * may choose to deliver exchanges concurrently to the processor.
- *
- * @throws IllegalStateException if the Endpoint has already been activated.
- */
- void activate(Processor<E> processor) throws Exception;
-
- /**
- * Called by the container when the endpoint is deactivated
- */
- void deactivate();
-
- /**
* Returns the context which created the endpoint
*
* @return the context which created the endpoint
@@ -78,4 +62,11 @@
* @return a newly created consumer
*/
Consumer<E> createConsumer(Processor<E> processor) throws Exception;
+
+ /**
+ * Returns the executor for this endpoint which typically defaults to the components executor
+ *
+ * @return the executor for this endpoint
+ */
+ ScheduledExecutorService getExecutorService();
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoComponent.java Tue Apr 3 04:58:32 2007
@@ -16,8 +16,7 @@
*/
package org.apache.camel.component.pojo;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultComponent;
import java.util.HashMap;
@@ -27,20 +26,19 @@
*
* @version $Revision: 519973 $
*/
-public class PojoComponent implements Component<PojoExchange> {
-
+public class PojoComponent extends DefaultComponent<PojoExchange> {
protected final HashMap<String, Object> services = new HashMap<String, Object>();
protected final HashMap<String, PojoConsumer> consumers = new HashMap<String, PojoConsumer>();
-
- private CamelContext container;
public void addService(String uri, Object pojo) {
services.put(uri, pojo);
}
+
public void removeService(String uri) {
services.remove(uri);
removeConsumer(uri);
}
+
public Object getService(String uri) {
return services.get(uri);
}
@@ -48,17 +46,12 @@
void addConsumer(String uri, PojoConsumer endpoint) {
consumers.put(uri, endpoint);
}
+
void removeConsumer(String uri) {
consumers.remove(uri);
}
+
public PojoConsumer getConsumer(String uri) {
return consumers.get(uri);
- }
-
- public void setCamelContext(CamelContext container) {
- this.container = container;
- }
- public CamelContext getContainer() {
- return container;
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/PojoEndpoint.java Tue Apr 3 04:58:32 2007
@@ -36,7 +36,7 @@
private final String pojoId;
public PojoEndpoint(String uri, String pojoId, PojoComponent component) {
- super(uri, component.getContainer());
+ super(uri, component);
this.pojoId = pojoId;
this.component = component;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerComponent.java Tue Apr 3 04:58:32 2007
@@ -16,35 +16,25 @@
*/
package org.apache.camel.component.pojo.timer;
-import java.util.ArrayList;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
import org.apache.camel.component.pojo.PojoExchange;
+import org.apache.camel.impl.DefaultComponent;
+
+import java.util.ArrayList;
/**
* Represents the component that manages {@link TimerEndpoint}. It holds the
* list of {@link TimerConsumer} objects that are started.
- *
+ *
* @version $Revision: 519973 $
*/
-public class TimerComponent implements Component<PojoExchange> {
-
+public class TimerComponent extends DefaultComponent<PojoExchange> {
protected final ArrayList<TimerConsumer> timers = new ArrayList<TimerConsumer>();
-
- private CamelContext container;
boolean addConsumer(TimerConsumer consumer) {
return timers.add(consumer);
}
+
boolean removeConsumer(TimerConsumer consumer) {
return timers.remove(consumer);
- }
-
- public void setCamelContext(CamelContext container) {
- this.container = container;
- }
- public CamelContext getContainer() {
- return container;
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/pojo/timer/TimerEndpoint.java Tue Apr 3 04:58:32 2007
@@ -47,7 +47,7 @@
public TimerEndpoint(String fullURI, String timerPartURI, TimerComponent component) throws URISyntaxException {
- super(fullURI, component.getContainer());
+ super(fullURI, component);
this.component = component;
// Use a URI to extract query so they can be set as properties on the endpoint.
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java Tue Apr 3 04:58:32 2007
@@ -18,6 +18,8 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultComponent;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
@@ -30,21 +32,10 @@
* @org.apache.xbean.XBean
* @version $Revision: 519973 $
*/
-public class QueueComponent<E> implements Component<E> {
+public class QueueComponent<E extends Exchange> extends DefaultComponent<E> {
- private CamelContext container;
-
- public void setCamelContext(CamelContext container) {
- this.container = container;
- }
-
public BlockingQueue<E> createQueue() {
return new LinkedBlockingQueue<E>();
}
-
- public CamelContext getContainer() {
- return container;
- }
-
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Tue Apr 3 04:58:32 2007
@@ -37,8 +37,8 @@
public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
private BlockingQueue<E> queue;
- public QueueEndpoint(String uri, CamelContext container, BlockingQueue<E> queue) {
- super(uri, container);
+ public QueueEndpoint(String uri, QueueComponent<E> component, BlockingQueue<E> queue) {
+ super(uri, component);
this.queue = queue;
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpointResolver.java Tue Apr 3 04:58:32 2007
@@ -63,7 +63,7 @@
String id[] = getEndpointId(uri);
QueueComponent<E> component = resolveQueueComponent(container, id[0]);
BlockingQueue<E> queue = component.createQueue();
- return new QueueEndpoint<E>(uri, container, queue);
+ return new QueueEndpoint<E>(uri, component, queue);
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Tue Apr 3 04:58:32 2007
@@ -21,24 +21,65 @@
import org.apache.camel.Component;
import org.apache.camel.Exchange;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
/**
* @version $Revision$
*/
-public class DefaultComponent<E extends Exchange> implements Component<E> {
- private CamelContext context;
+public class DefaultComponent<E extends Exchange> extends ServiceSupport implements Component<E> {
+ private int defaultThreadPoolSize = 5;
+ private CamelContext camelContext;
+ private ScheduledExecutorService executorService;
public DefaultComponent() {
}
public DefaultComponent(CamelContext context) {
- this.context = context;
+ this.camelContext = context;
}
- public CamelContext getContext() {
- return context;
+ public CamelContext getCamelContext() {
+ return camelContext;
}
public void setCamelContext(CamelContext context) {
- this.context = context;
+ this.camelContext = context;
+ }
+
+ public ScheduledExecutorService getExecutorService() {
+ if (executorService == null) {
+ executorService = createExecutorService();
+ }
+ return executorService;
+ }
+
+ public void setExecutorService(ScheduledExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ /**
+ * A factory method to create a default thread pool and executor
+ */
+ protected ScheduledExecutorService createExecutorService() {
+ return new ScheduledThreadPoolExecutor(defaultThreadPoolSize, new ThreadFactory() {
+ int counter;
+
+ public synchronized Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.setName("Thread" + (++counter) + " " + DefaultComponent.this.toString());
+ return thread;
+ }
+ });
+ }
+
+ protected void doStart() throws Exception {
+ }
+
+ protected void doStop() throws Exception {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Tue Apr 3 04:58:32 2007
@@ -20,13 +20,12 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.Consumer;
import org.apache.camel.Service;
+import org.apache.camel.Component;
import org.apache.camel.util.ObjectHelper;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
/**
* A default endpoint useful for implementation inheritence
@@ -35,14 +34,15 @@
*/
public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
private String endpointUri;
+ private final Component component;
private CamelContext context;
- private Processor<E> inboundProcessor;
protected AtomicBoolean activated = new AtomicBoolean(false);
protected AtomicBoolean deactivated = new AtomicBoolean(false);
- protected DefaultEndpoint(String endpointUri, CamelContext container) {
+ protected DefaultEndpoint(String endpointUri, Component component) {
this.endpointUri = endpointUri;
- this.context = container;
+ this.component = component;
+ this.context = component.getCamelContext();
}
public int hashCode() {
@@ -71,6 +71,14 @@
return context;
}
+ public Component getComponent() {
+ return component;
+ }
+
+ public ScheduledExecutorService getExecutorService() {
+ return getComponent().getExecutorService();
+ }
+
/**
* Converts the given exchange to the specified exchange type
*/
@@ -82,25 +90,6 @@
return getContext().getExchangeConverter().convertTo(type, exchange);
}
- public void activate(Processor<E> inboundProcessor) throws Exception {
- if (activated.compareAndSet(false, true)) {
- deactivated.set(false);
- this.inboundProcessor = inboundProcessor;
- doActivate();
- }
- else {
- throw new IllegalStateException("Endpoint is already active: " + getEndpointUri());
- }
- }
-
- public void deactivate() {
- if (deactivated.compareAndSet(false, true)) {
- activated.set(false);
- doDeactivate();
- }
- }
-
-
public E createExchange(E exchange) {
E answer = createExchange();
answer.copyFrom(exchange);
@@ -108,28 +97,8 @@
}
/**
- * The processor used to process inbound message exchanges
+ * A helper method to reduce the clutter of implementors of {@link #createProducer()} and {@link #createConsumer(Processor)}
*/
- public Processor<E> getInboundProcessor() {
- return inboundProcessor;
- }
-
- public void setInboundProcessor(Processor<E> inboundProcessor) {
- this.inboundProcessor = inboundProcessor;
- }
-
- /**
- * Called at most once by the container to activate the endpoint
- */
- protected void doActivate() throws Exception {
- }
-
- /**
- * Called at most once by the container to deactivate the endpoint
- */
- protected void doDeactivate() {
- }
-
protected <T extends Service> T startService(T service) throws Exception {
service.start();
return service;
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java?view=auto&rev=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java Tue Apr 3 04:58:32 2007
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A useful base class for any consumer which is polling based
+ *
+ * @version $Revision$
+ */
+public abstract class PollingConsumer<E extends Exchange> extends DefaultConsumer<E> implements Runnable {
+ private long initialDelay = 1000;
+ private long delay = 500;
+ private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+ private boolean useFixedDelay;
+ private ScheduledFuture<?> future;
+
+ public PollingConsumer(Endpoint<E> endpoint, Processor<E> processor) {
+ super(endpoint, processor);
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public long getInitialDelay() {
+ return initialDelay;
+ }
+
+ public void setInitialDelay(long initialDelay) {
+ this.initialDelay = initialDelay;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+
+ public void setDelay(long delay) {
+ this.delay = delay;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ public void setTimeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ }
+
+ public boolean isUseFixedDelay() {
+ return useFixedDelay;
+ }
+
+ public void setUseFixedDelay(boolean useFixedDelay) {
+ this.useFixedDelay = useFixedDelay;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ @Override
+ protected void doStart() throws Exception {
+ ScheduledExecutorService executor = getEndpoint().getExecutorService();
+ if (isUseFixedDelay()) {
+ future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
+ }
+ else {
+ future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
+ }
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (future != null) {
+ future.cancel(false);
+ }
+ super.doStop();
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/PollingConsumer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Tue Apr 3 04:58:32 2007
@@ -66,7 +66,7 @@
// now lets sleep for a while
boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ assertTrue("Did not receive the message!", received);
container.deactivateEndpoints();
}
Modified: activemq/camel/trunk/camel-cxf/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-cxf/pom.xml (original)
+++ activemq/camel/trunk/camel-cxf/pom.xml Tue Apr 3 04:58:32 2007
@@ -130,7 +130,7 @@
</includes>
<excludes>
<!-- TODO re-enable ASAP! -->
- <exclude>**/Camel*Test.*</exclude>
+ <exclude>**/*Test.*</exclude>
</excludes>
</configuration>
</plugin>
Modified: activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java (original)
+++ activemq/camel/trunk/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java Tue Apr 3 04:58:32 2007
@@ -38,7 +38,7 @@
private boolean inOut = true;
public CxfEndpoint(String uri, CxfComponent component, EndpointInfo endpointInfo) {
- super(uri, component.getContext());
+ super(uri, component);
this.component = component;
this.endpointInfo = endpointInfo;
}
Added: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java?view=auto&rev=525142
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java (added)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java Tue Apr 3 04:58:32 2007
@@ -0,0 +1,26 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.http;
+
+import org.apache.camel.impl.DefaultComponent;
+
+/**
+ * @version $Revision$
+ */
+public class HttpComponent extends DefaultComponent<HttpExchange> {
+}
Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java (original)
+++ activemq/camel/trunk/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java Tue Apr 3 04:58:32 2007
@@ -37,8 +37,8 @@
private HttpBinding binding;
- protected HttpEndpoint(String uri, CamelContext camelContext) {
- super(uri, camelContext);
+ protected HttpEndpoint(String uri, HttpComponent component) {
+ super(uri, component);
}
public Producer<HttpExchange> createProducer() throws Exception {
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/CamelJbiComponent.java Tue Apr 3 04:58:32 2007
@@ -24,6 +24,7 @@
import org.apache.servicemix.jbi.util.IntrospectionSupport;
import org.apache.servicemix.jbi.util.URISupport;
import org.apache.servicemix.jbi.resolver.URIResolver;
+import org.apache.servicemix.executors.Executor;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
@@ -32,6 +33,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Deploys the camel endpoints within JBI
@@ -41,6 +44,7 @@
public class CamelJbiComponent extends DefaultComponent implements Component<JbiExchange>, EndpointResolver {
private JbiBinding binding;
private CamelContext camelContext;
+ private ScheduledExecutorService executorService;
/**
* @return List of endpoints
@@ -66,6 +70,7 @@
return new Class[]{CamelJbiEndpoint.class};
}
+
/**
* @return the binding
*/
@@ -137,6 +142,13 @@
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
+ }
+
+ public ScheduledExecutorService getExecutorService() {
+ if (executorService == null) {
+ executorService = new ScheduledThreadPoolExecutor(5);
+ }
+ return executorService;
}
/**
Modified: activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java (original)
+++ activemq/camel/trunk/camel-jbi/src/main/java/org/apache/camel/component/jbi/JbiEndpoint.java Tue Apr 3 04:58:32 2007
@@ -36,7 +36,7 @@
private final CamelJbiComponent jbiComponent;
public JbiEndpoint(CamelJbiComponent jbiComponent, String uri) {
- super(uri, jbiComponent.getCamelContext());
+ super(uri, jbiComponent);
this.jbiComponent = jbiComponent;
toJbiProcessor = new ToJbiProcessor(jbiComponent.getBinding(), jbiComponent.getComponentContext(), uri);
}
@@ -72,21 +72,6 @@
});
}
- /*
- public void onExchange(Exchange exchange) {
- if (getInboundProcessor() != null) {
- getInboundProcessor().onExchange(exchange);
- } else {
- toJbiProcessor.onExchange(exchange); }
- }
- */
-
- @Override
- protected void doActivate() throws Exception {
- super.doActivate();
-
- // lets create and activate the endpoint in JBI
- }
public JbiExchange createExchange() {
return new JbiExchange(getContext(), getBinding());
Modified: activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java (original)
+++ activemq/camel/trunk/camel-jbi/src/test/java/org/apache/camel/component/jbi/JbiTestSupport.java Tue Apr 3 04:58:32 2007
@@ -63,7 +63,7 @@
protected Object assertReceivedValidExchange(Class type) throws Exception {
// lets wait on the message being received
boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ assertTrue("Did not receive the message!", received);
assertNotNull(receivedExchange);
Message receivedMessage = receivedExchange.getIn();
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Tue Apr 3 04:58:32 2007
@@ -90,7 +90,7 @@
}
public JmsEndpoint createEndpoint(String uri, String path) throws URISyntaxException {
- ObjectHelper.notNull(getContext(), "container");
+ ObjectHelper.notNull(getCamelContext(), "camelContext");
boolean pubSubDomain = false;
if (path.startsWith(QUEUE_PREFIX)) {
@@ -105,7 +105,7 @@
final String subject = convertPathToActualDestination(path);
// lets make sure we copy the configuration as each endpoint can customize its own version
- JmsEndpoint endpoint = new JmsEndpoint(uri, getContext(), subject, pubSubDomain, getConfiguration().copy());
+ JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
URI u = new URI(uri);
Map options = URISupport.parseParamters(u);
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Tue Apr 3 04:58:32 2007
@@ -41,8 +41,8 @@
private String selector;
private JmsConfiguration configuration;
- public JmsEndpoint(String uri, CamelContext context, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
- super(uri, context);
+ public JmsEndpoint(String uri, JmsComponent component, String destination, boolean pubSubDomain, JmsConfiguration configuration) {
+ super(uri, component);
this.configuration = configuration;
this.destination = destination;
this.pubSubDomain = pubSubDomain;
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java Tue Apr 3 04:58:32 2007
@@ -78,7 +78,7 @@
protected Object assertReceivedValidExchange(Class type) throws Exception {
// lets wait on the message being received
boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ assertTrue("Did not receive the message!", received);
assertNotNull(receivedExchange);
JmsMessage receivedMessage = receivedExchange.getIn();
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java Tue Apr 3 04:58:32 2007
@@ -19,20 +19,26 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.PollingConsumer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import javax.persistence.EntityManager;
import javax.persistence.Query;
+import javax.persistence.EntityTransaction;
import java.util.List;
/**
* @version $Revision$
*/
-public class JpaConsumer extends DefaultConsumer<Exchange> {
+public class JpaConsumer extends PollingConsumer<Exchange> {
+ private static final transient Log log = LogFactory.getLog(JpaConsumer.class);
+
private final JpaEndpoint endpoint;
private final EntityManager entityManager;
private QueryFactory queryFactory;
private DeleteHandler<Object> deleteHandler;
+ private EntityTransaction transaction;
public JpaConsumer(JpaEndpoint endpoint, Processor<Exchange> processor, EntityManager entityManager) {
super(endpoint, processor);
@@ -43,15 +49,33 @@
/**
* Invoked whenever we should be polled
*/
- public void run() {
- Query query = queryFactory.createQuery(this);
- configureParameters(query);
- List results = query.getResultList();
- for (Object result : results) {
- // lets turn the result into an exchange and fire it into the processor
- Exchange exchange = createExchange(result);
- getProcessor().onExchange(exchange);
- deleteHandler.deleteObject(this, result);
+ public synchronized void run() {
+ log.debug("Starting to poll for new database entities to process");
+ transaction = entityManager.getTransaction();
+ transaction.begin();
+
+ try {
+ Query query = getQueryFactory().createQuery(this);
+ configureParameters(query);
+ List results = query.getResultList();
+ for (Object result : results) {
+ if (log.isDebugEnabled()) {
+ log.debug("Processing new entity: " + result);
+ }
+ // lets turn the result into an exchange and fire it into the processor
+ Exchange exchange = createExchange(result);
+ getProcessor().onExchange(exchange);
+ getDeleteHandler().deleteObject(this, result);
+ }
+
+ transaction.commit();
+ transaction = null;
+ }
+ catch (RuntimeException e) {
+ log.warn("Caught: " + e, e);
+ if (transaction != null) {
+ transaction.rollback();
+ }
}
}
@@ -78,6 +102,9 @@
}
public DeleteHandler getDeleteHandler() {
+ if (deleteHandler == null) {
+ deleteHandler = createDeleteHandler();
+ }
return deleteHandler;
}
@@ -88,7 +115,10 @@
// Implementation methods
//-------------------------------------------------------------------------
@Override
- protected void doStop() throws Exception {
+ protected synchronized void doStop() throws Exception {
+ if (transaction != null) {
+ transaction.rollback();
+ }
entityManager.close();
super.doStop();
}
@@ -101,6 +131,16 @@
else {
return QueryBuilder.query("select x from " + entityType.getName() + " x");
}
+ }
+
+ protected DeleteHandler<Object> createDeleteHandler() {
+ // TODO auto-discover an annotation in the entity bean to indicate the process completed method call?
+
+ return new DeleteHandler<Object>() {
+ public void deleteObject(JpaConsumer consumer, Object entityBean) {
+ consumer.getEntityManager().remove(entityBean);
+ }
+ };
}
protected void configureParameters(Query query) {
Modified: activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java (original)
+++ activemq/camel/trunk/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaEndpoint.java Tue Apr 3 04:58:32 2007
@@ -39,7 +39,7 @@
private Class<?> entityType;
public JpaEndpoint(String uri, JpaComponent component) {
- super(uri, component.getContext());
+ super(uri, component);
this.component = component;
}
@@ -48,11 +48,11 @@
}
public Producer<Exchange> createProducer() throws Exception {
- return new JpaProducer(this, createEntityManager(), getProducerExpression());
+ return startService(new JpaProducer(this, createEntityManager(), getProducerExpression()));
}
public Consumer<Exchange> createConsumer(Processor<Exchange> processor) throws Exception {
- return new JpaConsumer(this, processor, createEntityManager());
+ return startService(new JpaConsumer(this, processor, createEntityManager()));
}
// Properties
Modified: activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java (original)
+++ activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaTest.java Tue Apr 3 04:58:32 2007
@@ -50,15 +50,21 @@
protected Consumer<Exchange> consumer;
protected Exchange receivedExchange;
protected CountDownLatch latch = new CountDownLatch(1);
- protected String queryText = "select o from " + SendEmail.class.getName() + " o";
+ protected String entityName = SendEmail.class.getName();
+ protected String queryText = "select o from " + entityName + " o";
protected EntityTransaction transaction;
public void testProducerInsertsIntoDatabaseThenConsumerFiresMessageExchange() throws Exception {
// lets assert that there are no existing send mail tasks
transaction = entityManager.getTransaction();
transaction.begin();
+
+ // lets delete any exiting records before the test
+ entityManager.createQuery("delete from " + entityName).executeUpdate();
+
List results = entityManager.createQuery(queryText).getResultList();
assertEquals("Should have no results: " + results, 0, results.size());
+ transaction.commit();
// lets produce some objects
client.send(endpoint, new Processor<Exchange>() {
@@ -66,7 +72,6 @@
exchange.getIn().setBody(new SendEmail("foo@bar.com"));
}
});
- transaction.commit();
// now lets assert that there is a result
transaction.begin();
@@ -74,6 +79,8 @@
assertEquals("Should have no results: " + results, 1, results.size());
SendEmail mail = (SendEmail) results.get(0);
assertEquals("address property", "foo@bar.com", mail.getAddress());
+ transaction.commit();
+ transaction = null;
// now lets create a consumer to consume it
consumer = endpoint.createConsumer(new Processor<Exchange>() {
@@ -84,8 +91,8 @@
}
});
- boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ boolean received = latch.await(50, TimeUnit.SECONDS);
+ assertTrue("Did not receive the message!", received);
assertNotNull(receivedExchange);
SendEmail result = receivedExchange.getIn().getBody(SendEmail.class);
Modified: activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java (original)
+++ activemq/camel/trunk/camel-jpa/src/test/java/org/apache/camel/examples/SendEmail.java Tue Apr 3 04:58:32 2007
@@ -32,11 +32,15 @@
private String address;
public SendEmail() {
-
}
public SendEmail(String address) {
setAddress(address);
+ }
+
+ @Override
+ public String toString() {
+ return "SendEmail[id: " + getId() + " address: " + getAddress() + "]";
}
@Id
Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaComponent.java Tue Apr 3 04:58:32 2007
@@ -73,7 +73,7 @@
IoAcceptor acceptor = new VmPipeAcceptor();
SocketAddress address = new VmPipeAddress(connectUri.getPort());
IoConnector connector = new VmPipeConnector();
- return new MinaEndpoint(uri, getContext(), address, acceptor, connector, null);
+ return new MinaEndpoint(uri, this, address, acceptor, connector, null);
}
protected MinaEndpoint createSocketEndpoint(String uri, URI connectUri) {
@@ -85,7 +85,7 @@
SocketConnectorConfig config = new SocketConnectorConfig();
config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
- return new MinaEndpoint(uri, getContext(), address, acceptor, connector, config);
+ return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
protected MinaEndpoint createDatagramEndpoint(String uri, URI connectUri) {
@@ -97,6 +97,6 @@
DatagramConnectorConfig config = new DatagramConnectorConfig();
config.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
- return new MinaEndpoint(uri, getContext(), address, acceptor, connector, config);
+ return new MinaEndpoint(uri, this, address, acceptor, connector, config);
}
}
Modified: activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java (original)
+++ activemq/camel/trunk/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java Tue Apr 3 04:58:32 2007
@@ -36,15 +36,13 @@
* @version $Revision$
*/
public class MinaEndpoint extends DefaultEndpoint<MinaExchange> {
- private static final transient Log log = LogFactory.getLog(MinaEndpoint.class);
-
private final IoAcceptor acceptor;
private final SocketAddress address;
private final IoConnector connector;
private final IoServiceConfig config;
- public MinaEndpoint(String endpointUri, CamelContext container, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
- super(endpointUri, container);
+ public MinaEndpoint(String endpointUri, MinaComponent component, SocketAddress address, IoAcceptor acceptor, IoConnector connector, IoServiceConfig config) {
+ super(endpointUri, component);
this.config = config;
this.address = address;
this.acceptor = acceptor;
@@ -86,20 +84,6 @@
public IoServiceConfig getConfig() {
return config;
- }
-
-
- // Implementation methods
- //-------------------------------------------------------------------------
-
- @Override
- protected void doActivate() throws Exception {
- super.doActivate();
- }
-
- @Override
- protected void doDeactivate() {
- acceptor.unbindAll();
}
}
Modified: activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppComponent.java Tue Apr 3 04:58:32 2007
@@ -54,9 +54,9 @@
}
public XmppEndpoint createEndpoint(String uri, String path) throws URISyntaxException {
- ObjectHelper.notNull(getContext(), "context");
+ ObjectHelper.notNull(getCamelContext(), "context");
- XmppEndpoint endpoint = new XmppEndpoint(uri, getContext());
+ XmppEndpoint endpoint = new XmppEndpoint(uri, this);
URI u = new URI(uri);
endpoint.setHost(u.getHost());
Modified: activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java Tue Apr 3 04:58:32 2007
@@ -51,8 +51,8 @@
private String room;
private String participant;
- public XmppEndpoint(String uri, CamelContext context) {
- super(uri, context);
+ public XmppEndpoint(String uri, XmppComponent component) {
+ super(uri, component);
}
public Producer<XmppExchange> createProducer() throws Exception {
Modified: activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java (original)
+++ activemq/camel/trunk/camel-xmpp/src/test/java/org/apache/camel/component/xmpp/XmppRouteTest.java Tue Apr 3 04:58:32 2007
@@ -93,7 +93,7 @@
protected Object assertReceivedValidExchange() throws Exception {
// lets wait on the message being received
boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not recieve the message!", received);
+ assertTrue("Did not receive the message!", received);
assertNotNull(receivedExchange);
XmppMessage receivedMessage = receivedExchange.getIn();
Modified: activemq/camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/pom.xml?view=diff&rev=525142&r1=525141&r2=525142
==============================================================================
--- activemq/camel/trunk/pom.xml (original)
+++ activemq/camel/trunk/pom.xml Tue Apr 3 04:58:32 2007
@@ -118,6 +118,7 @@
<module>camel-itest</module>
<module>camel-jbi</module>
<module>camel-jms</module>
+ <module>camel-jpa</module>
<module>camel-mina</module>
<module>camel-script</module>
<module>camel-spring</module>
@@ -161,6 +162,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-jpa</artifactId>
+ <version>${camel-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-mina</artifactId>
<version>${camel-version}</version>
</dependency>
@@ -378,6 +384,23 @@
<groupId>org.easymock</groupId>
<artifactId>easymockclassextension</artifactId>
<version>2.2.1</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <!-- default JPA support -->
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ <version>0.9.6-incubating</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- common testing dependencies -->
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.1.3.1</version>
<scope>test</scope>
</dependency>
</dependencies>