You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2008/03/06 20:52:11 UTC

svn commit: r634396 - in /activemq/camel/trunk: camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/test/java/org/apache/camel/ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/test/java/org/apache/c...

Author: jstrachan
Date: Thu Mar  6 11:52:07 2008
New Revision: 634396

URL: http://svn.apache.org/viewvc?rev=634396&view=rev
Log:
added support for https://issues.apache.org/activemq/browse/CAMEL-369 so that JMS queue endpoints are browseable

Added:
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java   (with props)
    activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java
      - copied, changed from r634228, activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
    activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java?rev=634396&r1=634395&r2=634396&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/BrowsableEndpoint.java Thu Mar  6 11:52:07 2008
@@ -28,7 +28,7 @@
  *
  * @version $Revision$
  */
-public interface BrowsableEndpoint extends Endpoint<Exchange> {
+public interface BrowsableEndpoint {
     /**
      * Return the exchanges available on this endpoint
      *

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?rev=634396&r1=634395&r2=634396&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java Thu Mar  6 11:52:07 2008
@@ -17,6 +17,7 @@
 package org.apache.camel;
 
 import java.io.File;
+import java.util.List;
 
 import javax.naming.Context;
 
@@ -27,6 +28,7 @@
 import org.apache.camel.processor.CreateRouteWithNonExistingEndpointTest;
 import org.apache.camel.spi.Language;
 import org.apache.camel.util.jndi.JndiTest;
+import org.apache.camel.util.CamelContextHelper;
 
 /**
  * A useful base class which creates a {@link CamelContext} with some routes
@@ -260,5 +262,21 @@
 
     protected void assertValidContext(CamelContext context) {
         assertNotNull("No context found!", context);
+    }
+
+    protected <T> List<T> getSingletonEndpoints(Class<T> type) {
+        return CamelContextHelper.getSingletonEndpoints(context, type);
+    }
+
+    protected <T extends Endpoint> T getMandatoryEndpoint(String uri, Class<T> type) {
+        T endpoint = context.getEndpoint(uri, type);
+        assertNotNull("No endpoint found for uri: " + uri, endpoint);
+        return endpoint;
+    }
+
+    protected Endpoint getMandatoryEndpoint(String uri) {
+        Endpoint endpoint = context.getEndpoint(uri);
+        assertNotNull("No endpoint found for uri: " + uri, endpoint);
+        return endpoint;
     }
 }

Modified: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=634396&r1=634395&r2=634396&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Thu Mar  6 11:52:07 2008
@@ -333,7 +333,14 @@
 
         // lets make sure we copy the configuration as each endpoint can
         // customize its own version
-        JmsEndpoint endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, getConfiguration().copy());
+        JmsConfiguration newConfiguration = getConfiguration().copy();
+        JmsEndpoint endpoint;
+        if (pubSubDomain) {
+            endpoint = new JmsEndpoint(uri, this, subject, pubSubDomain, newConfiguration);
+        }
+        else {
+            endpoint = new JmsQueueEndpoint(uri, this, subject, newConfiguration);
+        }
 
         String selector = (String) parameters.remove("selector");
         if (selector != null) {

Added: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=634396&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (added)
+++ activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Thu Mar  6 11:52:07 2008
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.BrowsableEndpoint;
+import org.springframework.jms.core.BrowserCallback;
+import org.springframework.jms.core.JmsOperations;
+
+/**
+ * An endpoint for a JMS Queue which is also browsable
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JmsQueueEndpoint extends JmsEndpoint implements BrowsableEndpoint {
+    private int maximumBrowseSize = -1;
+
+    public JmsQueueEndpoint(String uri, JmsComponent component, String destination, JmsConfiguration configuration) {
+        super(uri, component, destination, false, configuration);
+    }
+
+    public int getMaximumBrowseSize() {
+        return maximumBrowseSize;
+    }
+
+    /**
+     * If a number is set > 0 then this limits the number of messages that are returned when browsing the queue
+     */
+    public void setMaximumBrowseSize(int maximumBrowseSize) {
+        this.maximumBrowseSize = maximumBrowseSize;
+    }
+
+    public List<Exchange> getExchanges() {
+        String queue = getDestination();
+        JmsOperations template = getConfiguration().createInOnlyTemplate(false, queue);
+
+        // TODO not the best implementation in the world as we have to browse the entire queue, which could be massive
+        final List<Exchange> answer = new ArrayList<Exchange>();
+        template.browse(queue, new BrowserCallback() {
+            public Object doInJms(Session session, QueueBrowser browser) throws JMSException {
+                Enumeration iter = browser.getEnumeration();
+                while (iter.hasMoreElements()) {
+                    Message message = (Message) iter.nextElement();
+                    JmsExchange exchange = createExchange(message);
+                    answer.add(exchange);
+                }
+                return answer;
+            }
+        });
+        return answer;
+    }
+}

Propchange: activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java (from r634228, activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java?p2=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java&p1=activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java&r1=634228&r2=634396&rev=634396&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRouteTest.java (original)
+++ activemq/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/BrowsableQueueTest.java Thu Mar  6 11:52:07 2008
@@ -16,60 +16,61 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.List;
+
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class JmsRouteTest extends ContextTestSupport {
+public class BrowsableQueueTest extends ContextTestSupport {
+    private static final transient Log LOG = LogFactory.getLog(BrowsableQueueTest.class);
+
     protected MockEndpoint resultEndpoint;
     protected String componentName = "activemq";
     protected String startEndpointUri;
+    protected int counter;
+    protected Object[] expectedBodies = { "body1", "body2" };
 
-    public void testJmsRouteWithTextMessage() throws Exception {
-        String expectedBody = "Hello there!";
-
-        resultEndpoint.expectedBodiesReceived(expectedBody);
-        resultEndpoint.message(0).header("cheese").isEqualTo(123);
-
-        sendExchange(expectedBody);
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    public void testJmsRouteWithObjectMessage() throws Exception {
-        PurchaseOrder expectedBody = new PurchaseOrder("Beer", 10);
-
-        resultEndpoint.expectedBodiesReceived(expectedBody);
-        resultEndpoint.message(0).header("cheese").isEqualTo(123);
-
-        sendExchange(expectedBody);
-
-        resultEndpoint.assertIsSatisfied();
+    public void testSendMessagesThenBrowseQueue() throws Exception {
+        // send some messages
+        for (int i = 0; i < expectedBodies.length; i++) {
+            Object expectedBody = expectedBodies[i];
+            template.sendBodyAndHeader("activemq:test.b", expectedBody, "counter", i);
+        }
+
+        // now lets browse the queue
+        JmsQueueEndpoint endpoint = getMandatoryEndpoint("activemq:test.b", JmsQueueEndpoint.class);
+        List<Exchange> list = endpoint.getExchanges();
+        LOG.debug("Received: " + list);
+        assertEquals("Size of list", 2, list.size());
+        int index = -1;
+        for (Exchange exchange : list) {
+            String actual = exchange.getIn().getBody(String.class);
+            LOG.debug("Received body: "+ actual);
+            
+            Object expected = expectedBodies[++index];
+            assertEquals("Body: " + index, expected, actual);
+        }
     }
 
     protected void sendExchange(final Object expectedBody) {
-        template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123);
+        template.sendBodyAndHeader(startEndpointUri, expectedBody, "counter", ++counter);
     }
 
 
-    @Override
-    protected void setUp() throws Exception {
-        startEndpointUri = componentName + ":queue:test.a";
-
-        super.setUp();
-
-        resultEndpoint = (MockEndpoint) context.getEndpoint("mock:result");
-    }
-
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
@@ -82,9 +83,8 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from(startEndpointUri).to(componentName + ":queue:test.b");
-                from(componentName + ":queue:test.b").to("mock:result");
+                from("activemq:test.a").to("activemq:test.b");
             }
         };
     }
-}
+}
\ No newline at end of file