You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/02 09:52:48 UTC

svn commit: r810399 - in /camel/trunk/components/camel-jms: ./ src/main/java/org/apache/camel/component/jms/ src/test/java/org/apache/camel/component/jms/

Author: davsclaus
Date: Wed Sep  2 07:52:47 2009
New Revision: 810399

URL: http://svn.apache.org/viewvc?rev=810399&view=rev
Log:
CAMEL-1976: restarting jms consumers will pickup jms endpoint changes such as managed using JMX. CAMEL-1933: Overhaul of JMX. Management of JmsEndpoint now possible. Exposed concurrent consumers but the other options to follow. 

Added:
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java   (with props)
Modified:
    camel/trunk/components/camel-jms/pom.xml
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java

Modified: camel/trunk/components/camel-jms/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/pom.xml?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/pom.xml (original)
+++ camel/trunk/components/camel-jms/pom.xml Wed Sep  2 07:52:47 2009
@@ -49,6 +49,10 @@
       <artifactId>spring-jms</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-context</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
       <artifactId>geronimo-jms_1.1_spec</artifactId>
     </dependency>

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Wed Sep  2 07:52:47 2009
@@ -27,33 +27,53 @@
  * @version $Revision$
  */
 public class JmsConsumer extends DefaultConsumer {
-    private final AbstractMessageListenerContainer listenerContainer;
+    private AbstractMessageListenerContainer listenerContainer;
     private EndpointMessageListener messageListener;
 
     public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
         super(endpoint, processor);
         this.listenerContainer = listenerContainer;
+        this.listenerContainer.setMessageListener(getEndpointMessageListener());
+    }
 
-        createMessageListener(endpoint, processor);
-        this.listenerContainer.setMessageListener(messageListener);
+    public JmsEndpoint getEndpoint() {
+        return (JmsEndpoint) super.getEndpoint();
     }
 
     public AbstractMessageListenerContainer getListenerContainer() {
+        if (listenerContainer == null) {
+            createMessageListenerContainer();
+        }
         return listenerContainer;
     }
 
     public EndpointMessageListener getEndpointMessageListener() {
+        if (messageListener == null) {
+            createMessageListener(getEndpoint(), getProcessor());
+        }
         return messageListener;
     }
-    
+
     protected void createMessageListener(JmsEndpoint endpoint, Processor processor) {
         messageListener = new EndpointMessageListener(endpoint, processor);
         messageListener.setBinding(endpoint.getBinding());
     }
 
+    protected void createMessageListenerContainer() {
+        listenerContainer = getEndpoint().createMessageListenerContainer();
+        getEndpoint().configureListenerContainer(listenerContainer);
+        listenerContainer.setMessageListener(getEndpointMessageListener());
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        // create listener container
+        if (listenerContainer == null) {
+            createMessageListenerContainer();
+        }
+
         listenerContainer.afterPropertiesSet();
         listenerContainer.start();
     }
@@ -62,6 +82,11 @@
     protected void doStop() throws Exception {
         listenerContainer.stop();
         listenerContainer.destroy();
+
+        // null container and listener so they are fully re created if this consumer is restarted
+        // then we will use updated configuration from jms endpoint that may have been managed using JMX
+        listenerContainer = null;
+        messageListener = null;
         super.doStop();
     }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Sep  2 07:52:47 2009
@@ -36,12 +36,15 @@
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.ManagementAware;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.support.converter.MessageConverter;
 import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
 import org.springframework.transaction.PlatformTransactionManager;
 
 /**
@@ -49,7 +52,8 @@
  *
  * @version $Revision:520964 $
  */
-public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
+@ManagedResource(description = "Managed JMS Endpoint")
+public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint> {
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
     private JmsBinding binding;
@@ -158,21 +162,16 @@
         return answer;
     }
 
-
     public JmsConsumer createConsumer(Processor processor) throws Exception {
         AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this);
         return createConsumer(processor, listenerContainer);
     }
 
-    /**
-     * Creates a consumer using the given processor and listener container
-     *
-     * @param processor         the processor to use to process the messages
-     * @param listenerContainer the listener container
-     * @return a newly created consumer
-     * @throws Exception if the consumer cannot be created
-     */
-    public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
+    public AbstractMessageListenerContainer createMessageListenerContainer() {
+        return configuration.createMessageListenerContainer(this);
+    }
+
+    public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer) {
         if (destinationName != null) {
             listenerContainer.setDestinationName(destinationName);
         } else if (destination != null) {
@@ -186,6 +185,18 @@
             }
         }
         listenerContainer.setPubSubDomain(pubSubDomain);
+    }
+
+    /**
+     * Creates a consumer using the given processor and listener container
+     *
+     * @param processor         the processor to use to process the messages
+     * @param listenerContainer the listener container
+     * @return a newly created consumer
+     * @throws Exception if the consumer cannot be created
+     */
+    public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
+        configureListenerContainer(listenerContainer);
         return new JmsConsumer(this, processor, listenerContainer);
     }
 
@@ -222,6 +233,10 @@
         return configuration.createInOutTemplate(this, pubSubDomain, destinationName, configuration.getRequestTimeout());
     }
 
+    public Object getManagedObject(JmsEndpoint endpoint) {
+        return this;
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public HeaderFilterStrategy getHeaderFilterStrategy() {
@@ -384,6 +399,7 @@
         return getConfiguration().getClientId();
     }
 
+    @ManagedAttribute
     public int getConcurrentConsumers() {
         return getConfiguration().getConcurrentConsumers();
     }
@@ -596,6 +612,7 @@
         getConfiguration().setClientId(consumerClientId);
     }
 
+    @ManagedAttribute
     public void setConcurrentConsumers(int concurrentConsumers) {
         getConfiguration().setConcurrentConsumers(concurrentConsumers);
     }
@@ -616,7 +633,6 @@
         getConfiguration().setDestinationResolver(destinationResolver);
     }
 
-
     public void setDisableReplyTo(boolean disableReplyTo) {
         getConfiguration().setDisableReplyTo(disableReplyTo);
     }
@@ -801,6 +817,17 @@
         getConfiguration().setTransferException(transferException);
     }
 
+    @ManagedAttribute(description = "Camel id")
+    public String getCamelId() {
+        return getCamelContext().getName();
+    }
+
+    @ManagedAttribute(description = "Endpoint Uri")
+    @Override
+    public String getEndpointUri() {
+        return super.getEndpointUri();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Wed Sep  2 07:52:47 2009
@@ -26,12 +26,16 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.springframework.jms.core.JmsOperations;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
 
 /**
  * An endpoint for a JMS Queue which is also browsable
  *
  * @version $Revision$
  */
+@ManagedResource(description = "Managed JMS Queue Endpoint")
 public class JmsQueueEndpoint extends JmsEndpoint implements BrowsableEndpoint {
     private static final transient Log LOG = LogFactory.getLog(JmsQueueEndpoint.class);
 
@@ -72,6 +76,7 @@
         queueBrowseStrategy = createQueueBrowseStrategy();
     }
 
+    @ManagedAttribute
     public int getMaximumBrowseSize() {
         return maximumBrowseSize;
     }
@@ -80,6 +85,7 @@
      * If a number is set > 0 then this limits the number of messages that are
      * returned when browsing the queue
      */
+    @ManagedAttribute
     public void setMaximumBrowseSize(int maximumBrowseSize) {
         this.maximumBrowseSize = maximumBrowseSize;
     }
@@ -93,6 +99,17 @@
         return queueBrowseStrategy.browse(template, queue, this);
     }
 
+    @ManagedOperation(description = "Current number of Exchanges in Queue")
+    public long qeueSize() {
+        return getExchanges().size();
+    }
+
+    @ManagedOperation(description = "Get Exchange from queue by index")
+    public Exchange browseExchange(Integer index) {
+        return getExchanges().get(index);
+    }
+
+
     protected QueueBrowseStrategy createQueueBrowseStrategy() {
         QueueBrowseStrategy answer = null;
         try {

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java?rev=810399&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java Wed Sep  2 07:52:47 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsConsumerRestartPickupConfigurationChangesTest extends CamelTestSupport {
+
+    @Test
+    public void testRestartJmsConsumerPickupChanges() throws Exception {
+        JmsEndpoint endpoint = context.getEndpoint("activemq:queue:foo", JmsEndpoint.class);
+        JmsConsumer consumer = endpoint.createConsumer(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                template.send("mock:result", exchange);
+            }
+        });
+
+        consumer.start();
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("Hello World");
+        template.sendBody("activemq:queue:foo", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        consumer.stop();
+
+        // change to listen on another queue
+        endpoint.setDestinationName("bar");
+        endpoint.setConcurrentConsumers(2);
+
+        // restart it
+        consumer.start();
+
+        result.reset();
+        result.expectedBodiesReceived("Bye World");
+        template.sendBody("activemq:queue:bar", "Bye World");
+        assertMockEndpointsSatisfied();
+
+        consumer.stop();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+}

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Wed Sep  2 07:52:47 2009
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.jms;
 
-
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 
@@ -35,8 +34,6 @@
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
 
-
-
 /**
  * @version $Revision$
  */