You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/06/29 19:42:38 UTC

svn commit: r551974 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/queue/ main/java/org/apache/camel/component/vm/ main/resources/META-INF/services/org/apache/camel/component/ test/java/org/apache/camel/component/queue/...

Author: jstrachan
Date: Fri Jun 29 10:42:37 2007
New Revision: 551974

URL: http://svn.apache.org/viewvc?view=rev&rev=551974
Log:
added a VM component for communicating across web applications providing the camel-core is on the classpath - for CAMEL-59

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html
      - copied, changed from r551858, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
    activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm
      - copied, changed from r551858, activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java
      - copied, changed from r551858, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueComponent.java Fri Jun 29 10:42:37 2007
@@ -28,8 +28,8 @@
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * Represents the component that manages {@link QueueEndpoint}.  It holds the 
- * list of named queues that queue endpoints reference.
+ * An implementation of the <a href="http://activemq.apache.org/camel/queue.html">Queue components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
  *
  * @org.apache.xbean.XBean
  * @version $Revision: 519973 $

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/QueueEndpoint.java Fri Jun 29 10:42:37 2007
@@ -22,13 +22,14 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.Component;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.DefaultProducer;
 
 /**
- * Represents a queue endpoint that uses a {@link BlockingQueue}
- * object to process inbound exchanges.
+ * An implementation of the <a href="http://activemq.apache.org/camel/queue.html">Queue components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within a CamelContext
  *
  * @org.apache.xbean.XBean
  * @version $Revision: 519973 $
@@ -36,9 +37,13 @@
 public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
     private BlockingQueue<E> queue;
 
+    public QueueEndpoint(String endpointUri, Component component, BlockingQueue<E> queue) {
+        super(endpointUri, component);
+        this.queue = queue;
+    }
+
     public QueueEndpoint(String uri, QueueComponent<E> component) {
-        super(uri, component);
-        this.queue = component.createQueue();
+        this(uri, component, component.createQueue());
     }
 
     public Producer<E> createProducer() throws Exception {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html Fri Jun 29 10:42:37 2007
@@ -19,10 +19,10 @@
 </head>
 <body>
 
-The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> provides asynchronous (in-VM) dispatch
-of messages to consumer to implement
-<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> based message routing using Java's
-<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/Queue.html">Queue</a> interface.
+The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> for asynchronous
+<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> exchanges on a
+<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/BlockingQueue.html">BlockingQueue</a>
+within a single CamelContext
 
 </body>
 </html>

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java?view=auto&rev=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java Fri Jun 29 10:42:37 2007
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.queue.QueueComponent;
+import org.apache.camel.component.queue.QueueEndpoint;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
+ * for asynchronous SEDA exchanges on a {@link BlockingQueue} within the classloader tree containing
+ * the camel-core.jar. i.e. to handle communicating across CamelContext instances and possibly across
+ * web application contexts, providing that camel-core.jar is on the system classpath.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class VmComponent<E extends Exchange> extends QueueComponent<E> {
+    protected static Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
+
+    @Override
+    protected Endpoint<E> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
+        BlockingQueue<E> blockingQueue = (BlockingQueue<E>) getBlockingQueue(uri);
+        return new QueueEndpoint<E>(uri, this, blockingQueue);
+    }
+
+    protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
+        synchronized (queues) {
+            BlockingQueue<Exchange> answer = queues.get(uri);
+            if (answer == null) {
+                answer = (BlockingQueue<Exchange>) createQueue();
+                queues.put(uri, answer);
+            }
+            return answer;
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html (from r551858, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html&r1=551858&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/queue/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/vm/package.html Fri Jun 29 10:42:37 2007
@@ -19,10 +19,13 @@
 </head>
 <body>
 
-The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> provides asynchronous (in-VM) dispatch
-of messages to consumer to implement
-<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> based message routing using Java's
-<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/Queue.html">Queue</a> interface.
+The <a href="http://activemq.apache.org/camel/queue.html">Queue Component</a> for asynchronous
+<a href="http://www.eecs.harvard.edu/~mdw/proj/seda/">SEDA</a> exchanges on a
+<a href="http://java.sun.com/j2se/1.5.0/docs/api/java/util/BlockingQueue.html">BlockingQueue</a>
+within the current JVM; so across CamelContext instances. Note that this communication can only take place
+between ClassLoaders which share the same camel-core.jar. So to communicate across web applications you
+need to put camel-core.jar on the system/boot classpath.
+
 
 </body>
 </html>

Copied: activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm (from r551858, activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue&r1=551858&p2=activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/queue (original)
+++ activemq/camel/trunk/camel-core/src/main/resources/META-INF/services/org/apache/camel/component/vm Fri Jun 29 10:42:37 2007
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-class=org.apache.camel.component.queue.QueueComponent
+class=org.apache.camel.component.vm.VmComponent

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java?view=diff&rev=551974&r1=551973&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java Fri Jun 29 10:42:37 2007
@@ -40,10 +40,10 @@
     public void testSedaQueue() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
 
-        CamelContext container = new DefaultCamelContext();
+        CamelContext context = new DefaultCamelContext();
 
         // lets add some routes
-        container.addRoutes(new RouteBuilder() {
+        context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("queue:test.a").to("queue:test.b");
                 from("queue:test.b").process(new Processor() {
@@ -56,10 +56,10 @@
         });
 
         
-        container.start();
+        context.start();
         
         // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getEndpoint("queue:test.a");
+        Endpoint<Exchange> endpoint = context.getEndpoint("queue:test.a");
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setHeader("cheese", 123);
 
@@ -70,17 +70,17 @@
         boolean received = latch.await(5, TimeUnit.SECONDS);
         assertTrue("Did not receive the message!", received);
 
-        container.stop();
+        context.stop();
     }
     
     
-    public void xtestThatShowsEndpointResolutionIsNotConsistent() throws Exception {
+    public void testThatShowsEndpointResolutionIsNotConsistent() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
 
-        CamelContext container = new DefaultCamelContext();
+        CamelContext context = new DefaultCamelContext();
         
         // lets add some routes
-        container.addRoutes(new RouteBuilder() {
+        context.addRoutes(new RouteBuilder() {
             public void configure() {
                 from("queue:test.a").to("queue:test.b");
                 from("queue:test.b").process(new Processor() {
@@ -93,10 +93,10 @@
         });
 
         
-        container.start();
+        context.start();
         
         // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getComponent("queue").createEndpoint("queue:test.a");
+        Endpoint<Exchange> endpoint = context.getEndpoint("queue:test.a");
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setHeader("cheese", 123);
 
@@ -107,7 +107,7 @@
         boolean received = latch.await(5, TimeUnit.SECONDS);
         assertTrue("Did not receive the message!", received);
 
-        container.stop();
+        context.stop();
     }
     
 }

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java (from r551858, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java?view=diff&rev=551974&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java&r1=551858&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java&r2=551974
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/queue/QueueRouteTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/vm/VmRouteTest.java Fri Jun 29 10:42:37 2007
@@ -15,99 +15,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.queue;
+package org.apache.camel.component.vm;
 
-import junit.framework.TestCase;
 import org.apache.camel.CamelContext;
-import org.apache.camel.Component;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
+import org.apache.camel.CamelTemplate;
 import org.apache.camel.TestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.DefaultCamelContext;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * @version $Revision: 520220 $
  */
-public class QueueRouteTest extends TestSupport {
+public class VmRouteTest extends TestSupport {
+    private CamelContext context1 = new DefaultCamelContext();
+    private CamelContext context2 = new DefaultCamelContext();
+    private CamelTemplate template = new CamelTemplate(context1);
+    private Object expectedBody = "<hello>world!</hello>";
 
-	
     public void testSedaQueue() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
+        MockEndpoint result = context2.getEndpoint("mock:result", MockEndpoint.class);
+        result.expectedBodiesReceived(expectedBody);
+
+        template.sendBody("vm:test.a", expectedBody);
+
+        result.assertIsSatisfied();
+    }
 
-        CamelContext container = new DefaultCamelContext();
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
 
-        // lets add some routes
-        container.addRoutes(new RouteBuilder() {
+        context1.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("queue:test.a").to("queue:test.b");
-                from("queue:test.b").process(new Processor() {
-                    public void process(Exchange e) {
-                        log.debug("Received exchange: " + e.getIn());
-                        latch.countDown();
-                    }
-                });
+                from("vm:test.a").to("vm:test.b");
             }
         });
 
-        
-        container.start();
-        
-        // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getEndpoint("queue:test.a");
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setHeader("cheese", 123);
-
-        Producer<Exchange> producer = endpoint.createProducer();
-        producer.process(exchange);
-
-        // now lets sleep for a while
-        boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not receive the message!", received);
-
-        container.stop();
-    }
-    
-    
-    public void xtestThatShowsEndpointResolutionIsNotConsistent() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        CamelContext container = new DefaultCamelContext();
-        
-        // lets add some routes
-        container.addRoutes(new RouteBuilder() {
+        context2.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("queue:test.a").to("queue:test.b");
-                from("queue:test.b").process(new Processor() {
-                    public void process(Exchange e) {
-                        log.debug("Received exchange: " + e.getIn());
-                        latch.countDown();
-                    }
-                });
+                from("vm:test.b").to("mock:result");
             }
         });
 
-        
-        container.start();
-        
-        // now lets fire in a message
-        Endpoint<Exchange> endpoint = container.getComponent("queue").createEndpoint("queue:test.a");
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setHeader("cheese", 123);
-
-        Producer<Exchange> producer = endpoint.createProducer();
-        producer.process(exchange);
-
-        // now lets sleep for a while
-        boolean received = latch.await(5, TimeUnit.SECONDS);
-        assertTrue("Did not receive the message!", received);
+        ServiceHelper.startServices(context1, context2);
+    }
 
-        container.stop();
+    @Override
+    protected void tearDown() throws Exception {
+        ServiceHelper.stopServices(context2, context1);
+        super.tearDown();
     }
-    
-}
+}
\ No newline at end of file