You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/20 06:44:25 UTC
svn commit: r520287 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/impl/
camel-core/src/main/java/org/apache/camel/queue/
camel-core/src/test/java/org/apache/camel/queue/ camel-jms/src/m...
Author: chirino
Date: Mon Mar 19 22:44:24 2007
New Revision: 520287
URL: http://svn.apache.org/viewvc?view=rev&rev=520287
Log:
Got rid the the activate/deactivate methods on Component since they look better on Endpoint..
but I did add similar methods to the Container.
Also repliated the JmsRouteTest as a QueueRouteTest
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java
activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContainer.java Mon Mar 19 22:44:24 2007
@@ -17,15 +17,15 @@
*/
package org.apache.camel;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultEndpointResolver;
-import org.apache.camel.impl.DefaultExchangeConverter;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultEndpointResolver;
+import org.apache.camel.impl.DefaultExchangeConverter;
+
/**
* Represents the container used to configure routes and the policies to use.
*
@@ -37,19 +37,34 @@
private EndpointResolver<E> endpointResolver;
private ExchangeConverter exchangeConverter;
private Map<String, Component> components = new HashMap<String, Component>();
+ private Map<Endpoint<E>, Processor<E>> routes;
+
+ /**
+ * Activates all the starting endpoints in that were added as routes.
+ */
+ public void activateEndpoints() {
+ for (Map.Entry<Endpoint<E>, Processor<E>> entry : routes.entrySet()) {
+ Endpoint<E> endpoint = entry.getKey();
+ Processor<E> processor = entry.getValue();
+ endpoint.activate(processor);
+ }
+ }
+
+ /**
+ * Deactivates all the starting endpoints in that were added as routes.
+ */
+ public void deactivateEndpoints() {
+ for (Endpoint<E> endpoint : routes.keySet()) {
+ endpoint.deactivate();
+ }
+ }
// Builder APIs
//-----------------------------------------------------------------------
public void routes(RouteBuilder<E> builder) {
// lets now add the routes from the builder
builder.setContainer(this);
- Map<Endpoint<E>, Processor<E>> routeMap = builder.getRouteMap();
- Set<Map.Entry<Endpoint<E>, Processor<E>>> entries = routeMap.entrySet();
- for (Map.Entry<Endpoint<E>, Processor<E>> entry : entries) {
- Endpoint<E> endpoint = entry.getKey();
- Processor<E> processor = entry.getValue();
- endpoint.setInboundProcessor(processor);
- }
+ routes = builder.getRouteMap();
}
public void routes(final RouteFactory factory) {
@@ -64,18 +79,18 @@
/**
* Adds a component to the container if there is not currently a component already registered.
*/
- public void addComponent(String componentName, final Component<E, ? extends Endpoint<E>> component) {
+ public void addComponent(String componentName, final Component<E> component) {
// TODO provide a version of this which barfs if the component is registered multiple times
- getOrCreateComponent(componentName, new Callable<Component<E, ? extends Endpoint<E>>>() {
- public Component<E, ? extends Endpoint<E>> call() throws Exception {
+ getOrCreateComponent(componentName, new Callable<Component<E>>() {
+ public Component<E> call() throws Exception {
return component;
}
});
}
- /**
+ /**O
* Resolves the given URI to an endpoint
*/
public Endpoint<E> endpoint(String uri) {
@@ -121,7 +136,7 @@
return new DefaultExchangeConverter();
}
- public Component getOrCreateComponent(String componentName, Callable<Component<E, ? extends Endpoint<E>>> factory) {
+ public Component getOrCreateComponent(String componentName, Callable<Component<E>> factory) {
synchronized (components) {
Component component = components.get(componentName);
if (component == null) {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Component.java Mon Mar 19 22:44:24 2007
@@ -21,23 +21,12 @@
*
* @version $Revision: 519901 $
*/
-public interface Component<E, EP extends Endpoint<E>> {
+public interface Component<E> {
/**
* The CamelContainer is injected into the component when it is added to it
*/
void setContainer(CamelContainer container);
- /**
- * Asks the component to activate the delivery of {@link Exchange} objects
- * from the {@link Endpoint} to the {@link Processor}.
- */
- void activate(EP endpoint, Processor<E> processor);
-
- /**
- * Stops the delivery of messages from a previously activated
- * {@link Endpoint}.
- */
- void deactivate(EP endpoint);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Mon Mar 19 22:44:24 2007
@@ -16,6 +16,7 @@
*/
package org.apache.camel;
+
/**
* Represents an endpoint on which messages can be exchanged
*
@@ -32,11 +33,6 @@
* Sends the message exchange to this endpoint
*/
void send(E exchange);
-
- /**
- * Sets the processor for inbound messages
- */
- void setInboundProcessor(Processor<E> processor);
/**
* Create a new exchange for communicating with this endpoint
@@ -45,9 +41,13 @@
/**
- * Called by the container when an endpoint is activiated
+ * Called by the container to Activate the endpoint. Once activated,
+ * the endpoint will start delivering messages inbound exchanges
+ * it receives to the specified processor.
+ *
+ * @throws IllegalStateException is the Endpoint has already been activated.
*/
- void activate();
+ public void activate(Processor<E> processor) throws IllegalStateException;
/**
* Called by the container when the endpoint is deactivated
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Mon Mar 19 22:44:24 2007
@@ -77,9 +77,12 @@
}
- public void activate() {
+ public void activate(Processor<E> inboundProcessor) {
if (activated.compareAndSet(false, true)) {
+ this.inboundProcessor = inboundProcessor;
doActivate();
+ } else {
+ throw new IllegalStateException("Endpoint is already active: "+getEndpointUri());
}
}
public void deactivate() {
@@ -97,7 +100,6 @@
public void setInboundProcessor(Processor<E> inboundProcessor) {
this.inboundProcessor = inboundProcessor;
- activate();
}
/**
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpointResolver.java Mon Mar 19 22:44:24 2007
@@ -42,7 +42,6 @@
return resolver.resolveEndpoint(container, uri);
}
-
public Component resolveComponent(CamelContainer container, String uri) {
EndpointResolver resolver = getDelegate(uri);
return resolver.resolveComponent(container, uri);
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueComponent.java Mon Mar 19 22:44:24 2007
@@ -16,14 +16,12 @@
*/
package org.apache.camel.queue;
-import org.apache.camel.CamelContainer;
-import org.apache.camel.Component;
-import org.apache.camel.Processor;
-
import java.util.HashMap;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Component;
/**
* Represents the component that manages {@link QueueEndpoint}. It holds the
@@ -32,45 +30,17 @@
* @org.apache.xbean.XBean
* @version $Revision: 519973 $
*/
-public class QueueComponent<E> implements Component<E, QueueEndpoint<E>> {
+public class QueueComponent<E> implements Component<E> {
- private HashMap<String, Queue<E>> registry = new HashMap<String, Queue<E>>();
- private HashMap<QueueEndpoint<E>, Activation> activations = new HashMap<QueueEndpoint<E>, Activation>();
+ private HashMap<String, BlockingQueue<E>> registry = new HashMap<String, BlockingQueue<E>>();
private CamelContainer container;
public void setContainer(CamelContainer container) {
this.container = container;
}
- class Activation implements Runnable {
- private final QueueEndpoint<E> endpoint;
- AtomicBoolean stop = new AtomicBoolean();
- private Thread thread;
-
- public Activation(QueueEndpoint<E> endpoint) {
- this.endpoint = endpoint;
- }
-
- public void run() {
- while(!stop.get()) {
-
- }
- }
-
- public void start() {
- thread = new Thread(this, endpoint.getEndpointUri());
- thread.setDaemon(true);
- thread.start();
- }
-
- public void stop() throws InterruptedException {
- stop.set(true);
- thread.join();
- }
- }
-
- synchronized public Queue<E> getOrCreateQueue(String uri) {
- Queue<E> queue = registry.get(uri);
+ synchronized public BlockingQueue<E> getOrCreateQueue(String uri) {
+ BlockingQueue<E> queue = registry.get(uri);
if( queue == null ) {
queue = createQueue();
registry.put(uri, queue);
@@ -78,30 +48,13 @@
return queue;
}
- private Queue<E> createQueue() {
+ protected BlockingQueue<E> createQueue() {
return new LinkedBlockingQueue<E>();
}
- public void activate(QueueEndpoint<E> endpoint, Processor<E> processor) {
- Activation activation = activations.get(endpoint);
- if( activation!=null ) {
- throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" has already been activated.");
- }
-
- activation = new Activation(endpoint);
- activation.start();
+ public CamelContainer getContainer() {
+ return container;
}
- public void deactivate(QueueEndpoint<E> endpoint) {
- Activation activation = activations.remove(endpoint);
- if( activation==null ) {
- throw new IllegalArgumentException("Endpoint "+endpoint.getEndpointUri()+" is not activate.");
- }
- try {
- activation.stop();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpoint.java Mon Mar 19 22:44:24 2007
@@ -16,24 +16,28 @@
*/
package org.apache.camel.queue;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.camel.CamelContainer;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
-import java.util.Queue;
-
/**
- * Represents a queue endpoint that uses a {@link Queue}
+ * Represents a queue endpoint that uses a {@link BlockingQueue}
* object to process inbound exchanges.
*
* @org.apache.xbean.XBean
* @version $Revision: 519973 $
*/
public class QueueEndpoint<E> extends DefaultEndpoint<E> {
- private Queue<E> queue;
+ private BlockingQueue<E> queue;
+ private org.apache.camel.queue.QueueEndpoint.Activation activation;
- public QueueEndpoint(String uri, CamelContainer container, Queue<E> queue) {
+ public QueueEndpoint(String uri, CamelContainer container, BlockingQueue<E> queue) {
super(uri, container);
this.queue = queue;
}
@@ -55,5 +59,60 @@
public Queue<E> getQueue() {
return queue;
+ }
+
+ class Activation implements Runnable {
+ AtomicBoolean stop = new AtomicBoolean();
+ private Thread thread;
+
+ public void run() {
+ while(!stop.get()) {
+ E exchange=null;
+ try {
+ exchange = queue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ break;
+ }
+ if( exchange !=null ) {
+ try {
+ getInboundProcessor().onExchange(exchange);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public void start() {
+ thread = new Thread(this, getEndpointUri());
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void stop() throws InterruptedException {
+ stop.set(true);
+ thread.join();
+ }
+
+ @Override
+ public String toString() {
+ return "Activation: "+getEndpointUri();
+ }
+ }
+
+ @Override
+ protected void doActivate() {
+ activation = new Activation();
+ activation.start();
+ }
+
+ @Override
+ protected void doDeactivate() {
+ try {
+ activation.stop();
+ activation=null;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/queue/QueueEndpointResolver.java Mon Mar 19 22:44:24 2007
@@ -16,7 +16,7 @@
*/
package org.apache.camel.queue;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import org.apache.camel.CamelContainer;
@@ -61,7 +61,7 @@
public Endpoint<E> resolveEndpoint(CamelContainer container, String uri) {
String id[] = getEndpointId(uri);
QueueComponent<E> component = resolveQueueComponent(container, id[0]);
- Queue<E> queue = component.getOrCreateQueue(id[1]);
+ BlockingQueue<E> queue = component.getOrCreateQueue(id[1]);
return new QueueEndpoint<E>(uri, container, queue);
}
@@ -82,8 +82,8 @@
@SuppressWarnings("unchecked")
private QueueComponent<E> resolveQueueComponent(CamelContainer container, String componentName) {
- Component rc = container.getOrCreateComponent(componentName, new Callable<Component<E,? extends Endpoint<E>>>(){
- public Component<E, ? extends Endpoint<E>> call() throws Exception {
+ Component rc = container.getOrCreateComponent(componentName, new Callable<Component<E>>(){
+ public Component<E> call() throws Exception {
return new QueueComponent<E>();
}});
return (QueueComponent<E>) rc;
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java?view=auto&rev=520287
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/queue/QueueRouteTest.java Mon Mar 19 22:44:24 2007
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.queue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.CamelContainer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * @version $Revision: 520220 $
+ */
+public class QueueRouteTest extends TestCase {
+
+ static class StringExchange extends DefaultExchange<String, String, String> {
+ }
+
+ public void testJmsRoute() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ CamelContainer container = new CamelContainer();
+
+ // lets add some routes
+ container.routes(new RouteBuilder() {
+ public void configure() {
+ from("queue:test.a").to("queue:test.b");
+ from("queue:test.b").process(new Processor<StringExchange>() {
+ public void onExchange(StringExchange exchange) {
+ System.out.println("Received exchange: " + exchange.getRequest());
+ latch.countDown();
+ }
+ });
+ }
+ });
+
+
+ container.activateEndpoints();
+
+ // now lets fire in a message
+ Endpoint<StringExchange> endpoint = container.endpoint("queue:test.a");
+ StringExchange exchange = new StringExchange();
+ exchange.setHeader("cheese", 123);
+ endpoint.send(exchange);
+
+ // now lets sleep for a while
+ boolean received = latch.await(5, TimeUnit.SECONDS);
+ assertTrue("Did not recieve the message!", received);
+
+ container.deactivateEndpoints();
+ }
+}
Modified: activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/camel-jms/src/main/java/org/apache/camel/jms/JmsComponent.java Mon Mar 19 22:44:24 2007
@@ -32,7 +32,7 @@
/**
* @version $Revision$
*/
-public class JmsComponent implements Component<JmsExchange, JmsEndpoint> {
+public class JmsComponent implements Component<JmsExchange> {
public static final String QUEUE_PREFIX = "queue/";
public static final String TOPIC_PREFIX = "topic/";
Modified: activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java?view=diff&rev=520287&r1=520286&r2=520287
==============================================================================
--- activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/camel-jms/src/test/java/org/apache/camel/jms/JmsRouteTest.java Mon Mar 19 22:44:24 2007
@@ -55,6 +55,9 @@
}
});
+
+ container.activateEndpoints();
+
// now lets fire in a message
Endpoint<JmsExchange> endpoint = container.endpoint("jms:activemq:test.a");
JmsExchange exchange2 = endpoint.createExchange();
@@ -66,7 +69,6 @@
boolean received = latch.await(5, TimeUnit.SECONDS);
assertTrue("Did not recieve the message!", received);
- // TODO
- //container.stop();
+ container.deactivateEndpoints();
}
}