You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/06/29 19:42:38 UTC
svn commit: r551974 - in /activemq/camel/trunk/camel-core/src:
main/java/org/apache/camel/component/queue/
main/java/org/apache/camel/component/vm/
main/resources/META-INF/services/org/apache/camel/component/
test/java/org/apache/camel/component/queue/...
Author: jstrachan
Date: Fri Jun 29 10:42:37 2007
New Revision: 551974
URL: http://svn.apache.org/viewvc?view=rev&rev=551974
Log:
added a VM component for communicating across web applications providing the camel-core is on the classpath - for CAMEL-59
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html
- copied, changed from r551858, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm
- copied, changed from r551858, activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
- copied, changed from r551858, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java Fri Jun 29 10:42:37 2007
@@ -28,8 +28,8 @@
import java.util.concurrent.LinkedBlockingQueue;
/**
- * Represents the component that manages {@link QueueEndpoint}. It holds the
- * list of named queues that queue endpoints reference.
+ * An implementation of the <a href="http://activemq.apache.org/camel/queue.html">Queue components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
*
* @org.apache.xbean.XBean
* @version $Revision: 519973 $
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Fri Jun 29 10:42:37 2007
@@ -22,13 +22,14 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
+import org.apache.camel.Component;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.DefaultProducer;
/**
- * Represents a queue endpoint that uses a {@link BlockingQueue}
- * object to process inbound exchanges.
+ * An implementation of the <a href="http://activemq.apache.org/camel/queue.html">Queue components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
*
* @org.apache.xbean.XBean
* @version $Revision: 519973 $
@@ -36,9 +37,13 @@
public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
private BlockingQueue<E> queue;
+ public QueueEndpoint(String endpointUri, Component component, BlockingQueue<E> queue) {
+ super(endpointUri, component);
+ this.queue = queue;
+ }
+
public QueueEndpoint(String uri, QueueComponent<E> component) {
- super(uri, component);
- this.queue = component.createQueue();
+ this(uri, component, component.createQueue());
}
public Producer<E> createProducer() throws Exception {
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html Fri Jun 29 10:42:37 2007
@@ -19,10 +19,10 @@
</head>
<body>
-The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> provides asynchronous (in-VM) dispatch
-of messages to consumer to implement
-<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> based message routing using Java's
-<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/Queue.html">Queue</a> interface.
+The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> for asynchronous
+<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> exchanges on a
+<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/BlockingQueue.html">BlockingQueue</a>
+within a single CamelContext
</body>
</html>
Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?view=auto&rev=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Fri Jun 29 10:42:37 2007
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.queue.QueueComponent;
+import org.apache.camel.component.queue.QueueEndpoint;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within the classloader tree containing
+ * the camel-core.jar. i.e. to handle communicating across CamelContext instances and possibly across
+ * web application contexts, providing that camel-core.jar is on the system classpath.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class VmComponent<E extends Exchange> extends QueueComponent<E> {
+ protected static Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
+
+ @Override
+ protected Endpoint<E> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+ BlockingQueue<E> blockingQueue = (BlockingQueue<E>) getBlockingQueue(uri);
+ return new QueueEndpoint<E>(uri, this, blockingQueue);
+ }
+
+ protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
+ synchronized (queues) {
+ BlockingQueue<Exchange> answer = queues.get(uri);
+ if (answer == null) {
+ answer = (BlockingQueue<Exchange>) createQueue();
+ queues.put(uri, answer);
+ }
+ return answer;
+ }
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html (from r551858, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html&r1=551858&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html Fri Jun 29 10:42:37 2007
@@ -19,10 +19,13 @@
</head>
<body>
-The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> provides asynchronous (in-VM) dispatch
-of messages to consumer to implement
-<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> based message routing using Java's
-<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/Queue.html">Queue</a> interface.
+The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> for asynchronous
+<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> exchanges on a
+<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/BlockingQueue.html">BlockingQueue</a>
+within the current JVM; so across CamelContext instances. Note that this communication can only take place
+between ClassLoaders which share the same camel-core.jar. So to communicate across web applications you
+need to put camel-core.jar on the system/boot classpath.
+
</body>
</html>
Copied: activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm (from r551858, activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue&r1=551858&p2=activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue (original)
+++ activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm Fri Jun 29 10:42:37 2007
@@ -15,4 +15,4 @@
# limitations under the License.
#
-class=org.apache.camel.component.queue.QueueComponent
+class=org.apache.camel.component.vm.VmComponent
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Fri Jun 29 10:42:37 2007
@@ -40,10 +40,10 @@
public void testSedaQueue() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
- CamelContext container = new DefaultCamelContext();
+ CamelContext context = new DefaultCamelContext();
// lets add some routes
- container.addRoutes(new RouteBuilder() {
+ context.addRoutes(new RouteBuilder() {
public void configure() {
from("queue:test.a").to("queue:test.b");
from("queue:test.b").process(new Processor() {
@@ -56,10 +56,10 @@
});
- container.start();
+ context.start();
// now lets fire in a message
- Endpoint<Exchange> endpoint = container.getEndpoint("queue:test.a");
+ Endpoint<Exchange> endpoint = context.getEndpoint("queue:test.a");
Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader("cheese", 123);
@@ -70,17 +70,17 @@
boolean received = latch.await(5, TimeUnit.SECONDS);
assertTrue("Did not receive the message!", received);
- container.stop();
+ context.stop();
}
- public void xtestThatShowsEndpointResolutionIsNotConsistent() throws Exception {
+ public void testThatShowsEndpointResolutionIsNotConsistent() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
- CamelContext container = new DefaultCamelContext();
+ CamelContext context = new DefaultCamelContext();
// lets add some routes
- container.addRoutes(new RouteBuilder() {
+ context.addRoutes(new RouteBuilder() {
public void configure() {
from("queue:test.a").to("queue:test.b");
from("queue:test.b").process(new Processor() {
@@ -93,10 +93,10 @@
});
- container.start();
+ context.start();
// now lets fire in a message
- Endpoint<Exchange> endpoint = container.getComponent("queue").createEndpoint("queue:test.a");
+ Endpoint<Exchange> endpoint = context.getEndpoint("queue:test.a");
Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader("cheese", 123);
@@ -107,7 +107,7 @@
boolean received = latch.await(5, TimeUnit.SECONDS);
assertTrue("Did not receive the message!", received);
- container.stop();
+ context.stop();
}
}
Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java (from r551858, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java&r1=551858&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java Fri Jun 29 10:42:37 2007
@@ -15,99 +15,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.queue;
+package org.apache.camel.component.vm;
-import junit.framework.TestCase;
import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
+import org.apache.camel.CamelTemplate;
import org.apache.camel.TestSupport;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.camel.util.ServiceHelper;
/**
* @version $Revision: 520220 $
*/
-public class QueueRouteTest extends TestSupport {
+public class VmRouteTest extends TestSupport {
+ private CamelContext context1 = new DefaultCamelContext();
+ private CamelContext context2 = new DefaultCamelContext();
+ private CamelTemplate template = new CamelTemplate(context1);
+ private Object expectedBody = "<hello>world!</hello>";
-
public void testSedaQueue() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
+ MockEndpoint result = context2.getEndpoint("mock:result", MockEndpoint.class);
+ result.expectedBodiesReceived(expectedBody);
+
+ template.sendBody("vm:test.a", expectedBody);
+
+ result.assertIsSatisfied();
+ }
- CamelContext container = new DefaultCamelContext();
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
- // lets add some routes
- container.addRoutes(new RouteBuilder() {
+ context1.addRoutes(new RouteBuilder() {
public void configure() {
- from("queue:test.a").to("queue:test.b");
- from("queue:test.b").process(new Processor() {
- public void process(Exchange e) {
- log.debug("Received exchange: " + e.getIn());
- latch.countDown();
- }
- });
+ from("vm:test.a").to("vm:test.b");
}
});
-
- container.start();
-
- // now lets fire in a message
- Endpoint<Exchange> endpoint = container.getEndpoint("queue:test.a");
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setHeader("cheese", 123);
-
- Producer<Exchange> producer = endpoint.createProducer();
- producer.process(exchange);
-
- // now lets sleep for a while
- boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not receive the message!", received);
-
- container.stop();
- }
-
-
- public void xtestThatShowsEndpointResolutionIsNotConsistent() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
-
- CamelContext container = new DefaultCamelContext();
-
- // lets add some routes
- container.addRoutes(new RouteBuilder() {
+ context2.addRoutes(new RouteBuilder() {
public void configure() {
- from("queue:test.a").to("queue:test.b");
- from("queue:test.b").process(new Processor() {
- public void process(Exchange e) {
- log.debug("Received exchange: " + e.getIn());
- latch.countDown();
- }
- });
+ from("vm:test.b").to("mock:result");
}
});
-
- container.start();
-
- // now lets fire in a message
- Endpoint<Exchange> endpoint = container.getComponent("queue").createEndpoint("queue:test.a");
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setHeader("cheese", 123);
-
- Producer<Exchange> producer = endpoint.createProducer();
- producer.process(exchange);
-
- // now lets sleep for a while
- boolean received = latch.await(5, TimeUnit.SECONDS);
- assertTrue("Did not receive the message!", received);
+ ServiceHelper.startServices(context1, context2);
+ }
- container.stop();
+ @Override
+ protected void tearDown() throws Exception {
+ ServiceHelper.stopServices(context2, context1);
+ super.tearDown();
}
-
-}
+}
\ No newline at end of file