You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/09/02 09:52:48 UTC
svn commit: r810399 - in /camel/trunk/components/camel-jms: ./
src/main/java/org/apache/camel/component/jms/
src/test/java/org/apache/camel/component/jms/
Author: davsclaus
Date: Wed Sep 2 07:52:47 2009
New Revision: 810399
URL: http://svn.apache.org/viewvc?rev=810399&view=rev
Log:
CAMEL-1976: restarting jms consumers will pickup jms endpoint changes such as managed using JMX. CAMEL-1933: Overhaul of JMX. Management of JmsEndpoint now possible. Exposed concurrent consumers but the other options to follow.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java (with props)
Modified:
camel/trunk/components/camel-jms/pom.xml
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
Modified: camel/trunk/components/camel-jms/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/pom.xml?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/pom.xml (original)
+++ camel/trunk/components/camel-jms/pom.xml Wed Sep 2 07:52:47 2009
@@ -49,6 +49,10 @@
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Wed Sep 2 07:52:47 2009
@@ -27,33 +27,53 @@
* @version $Revision$
*/
public class JmsConsumer extends DefaultConsumer {
- private final AbstractMessageListenerContainer listenerContainer;
+ private AbstractMessageListenerContainer listenerContainer;
private EndpointMessageListener messageListener;
public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) {
super(endpoint, processor);
this.listenerContainer = listenerContainer;
+ this.listenerContainer.setMessageListener(getEndpointMessageListener());
+ }
- createMessageListener(endpoint, processor);
- this.listenerContainer.setMessageListener(messageListener);
+ public JmsEndpoint getEndpoint() {
+ return (JmsEndpoint) super.getEndpoint();
}
public AbstractMessageListenerContainer getListenerContainer() {
+ if (listenerContainer == null) {
+ createMessageListenerContainer();
+ }
return listenerContainer;
}
public EndpointMessageListener getEndpointMessageListener() {
+ if (messageListener == null) {
+ createMessageListener(getEndpoint(), getProcessor());
+ }
return messageListener;
}
-
+
protected void createMessageListener(JmsEndpoint endpoint, Processor processor) {
messageListener = new EndpointMessageListener(endpoint, processor);
messageListener.setBinding(endpoint.getBinding());
}
+ protected void createMessageListenerContainer() {
+ listenerContainer = getEndpoint().createMessageListenerContainer();
+ getEndpoint().configureListenerContainer(listenerContainer);
+ listenerContainer.setMessageListener(getEndpointMessageListener());
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
+
+ // create listener container
+ if (listenerContainer == null) {
+ createMessageListenerContainer();
+ }
+
listenerContainer.afterPropertiesSet();
listenerContainer.start();
}
@@ -62,6 +82,11 @@
protected void doStop() throws Exception {
listenerContainer.stop();
listenerContainer.destroy();
+
+ // null container and listener so they are fully re created if this consumer is restarted
+ // then we will use updated configuration from jms endpoint that may have been managed using JMX
+ listenerContainer = null;
+ messageListener = null;
super.doStop();
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Sep 2 07:52:47 2009
@@ -36,12 +36,15 @@
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
+import org.apache.camel.spi.ManagementAware;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.transaction.PlatformTransactionManager;
/**
@@ -49,7 +52,8 @@
*
* @version $Revision:520964 $
*/
-public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {
+@ManagedResource(description = "Managed JMS Endpoint")
+public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, ManagementAware<JmsEndpoint> {
private HeaderFilterStrategy headerFilterStrategy;
private boolean pubSubDomain;
private JmsBinding binding;
@@ -158,21 +162,16 @@
return answer;
}
-
public JmsConsumer createConsumer(Processor processor) throws Exception {
AbstractMessageListenerContainer listenerContainer = configuration.createMessageListenerContainer(this);
return createConsumer(processor, listenerContainer);
}
- /**
- * Creates a consumer using the given processor and listener container
- *
- * @param processor the processor to use to process the messages
- * @param listenerContainer the listener container
- * @return a newly created consumer
- * @throws Exception if the consumer cannot be created
- */
- public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
+ public AbstractMessageListenerContainer createMessageListenerContainer() {
+ return configuration.createMessageListenerContainer(this);
+ }
+
+ public void configureListenerContainer(AbstractMessageListenerContainer listenerContainer) {
if (destinationName != null) {
listenerContainer.setDestinationName(destinationName);
} else if (destination != null) {
@@ -186,6 +185,18 @@
}
}
listenerContainer.setPubSubDomain(pubSubDomain);
+ }
+
+ /**
+ * Creates a consumer using the given processor and listener container
+ *
+ * @param processor the processor to use to process the messages
+ * @param listenerContainer the listener container
+ * @return a newly created consumer
+ * @throws Exception if the consumer cannot be created
+ */
+ public JmsConsumer createConsumer(Processor processor, AbstractMessageListenerContainer listenerContainer) throws Exception {
+ configureListenerContainer(listenerContainer);
return new JmsConsumer(this, processor, listenerContainer);
}
@@ -222,6 +233,10 @@
return configuration.createInOutTemplate(this, pubSubDomain, destinationName, configuration.getRequestTimeout());
}
+ public Object getManagedObject(JmsEndpoint endpoint) {
+ return this;
+ }
+
// Properties
// -------------------------------------------------------------------------
public HeaderFilterStrategy getHeaderFilterStrategy() {
@@ -384,6 +399,7 @@
return getConfiguration().getClientId();
}
+ @ManagedAttribute
public int getConcurrentConsumers() {
return getConfiguration().getConcurrentConsumers();
}
@@ -596,6 +612,7 @@
getConfiguration().setClientId(consumerClientId);
}
+ @ManagedAttribute
public void setConcurrentConsumers(int concurrentConsumers) {
getConfiguration().setConcurrentConsumers(concurrentConsumers);
}
@@ -616,7 +633,6 @@
getConfiguration().setDestinationResolver(destinationResolver);
}
-
public void setDisableReplyTo(boolean disableReplyTo) {
getConfiguration().setDisableReplyTo(disableReplyTo);
}
@@ -801,6 +817,17 @@
getConfiguration().setTransferException(transferException);
}
+ @ManagedAttribute(description = "Camel id")
+ public String getCamelId() {
+ return getCamelContext().getName();
+ }
+
+ @ManagedAttribute(description = "Endpoint Uri")
+ @Override
+ public String getEndpointUri() {
+ return super.getEndpointUri();
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsQueueEndpoint.java Wed Sep 2 07:52:47 2009
@@ -26,12 +26,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsOperations;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedOperation;
+import org.springframework.jmx.export.annotation.ManagedResource;
/**
* An endpoint for a JMS Queue which is also browsable
*
* @version $Revision$
*/
+@ManagedResource(description = "Managed JMS Queue Endpoint")
public class JmsQueueEndpoint extends JmsEndpoint implements BrowsableEndpoint {
private static final transient Log LOG = LogFactory.getLog(JmsQueueEndpoint.class);
@@ -72,6 +76,7 @@
queueBrowseStrategy = createQueueBrowseStrategy();
}
+ @ManagedAttribute
public int getMaximumBrowseSize() {
return maximumBrowseSize;
}
@@ -80,6 +85,7 @@
* If a number is set > 0 then this limits the number of messages that are
* returned when browsing the queue
*/
+ @ManagedAttribute
public void setMaximumBrowseSize(int maximumBrowseSize) {
this.maximumBrowseSize = maximumBrowseSize;
}
@@ -93,6 +99,17 @@
return queueBrowseStrategy.browse(template, queue, this);
}
+ @ManagedOperation(description = "Current number of Exchanges in Queue")
+ public long qeueSize() {
+ return getExchanges().size();
+ }
+
+ @ManagedOperation(description = "Get Exchange from queue by index")
+ public Exchange browseExchange(Integer index) {
+ return getExchanges().get(index);
+ }
+
+
protected QueueBrowseStrategy createQueueBrowseStrategy() {
QueueBrowseStrategy answer = null;
try {
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java?rev=810399&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java Wed Sep 2 07:52:47 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsConsumerRestartPickupConfigurationChangesTest extends CamelTestSupport {
+
+ @Test
+ public void testRestartJmsConsumerPickupChanges() throws Exception {
+ JmsEndpoint endpoint = context.getEndpoint("activemq:queue:foo", JmsEndpoint.class);
+ JmsConsumer consumer = endpoint.createConsumer(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ template.send("mock:result", exchange);
+ }
+ });
+
+ consumer.start();
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("Hello World");
+ template.sendBody("activemq:queue:foo", "Hello World");
+ assertMockEndpointsSatisfied();
+
+ consumer.stop();
+
+ // change to listen on another queue
+ endpoint.setDestinationName("bar");
+ endpoint.setConcurrentConsumers(2);
+
+ // restart it
+ consumer.start();
+
+ result.reset();
+ result.expectedBodiesReceived("Bye World");
+ template.sendBody("activemq:queue:bar", "Bye World");
+ assertMockEndpointsSatisfied();
+
+ consumer.stop();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext camelContext = super.createCamelContext();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+ return camelContext;
+ }
+
+}
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsConsumerRestartPickupConfigurationChangesTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java?rev=810399&r1=810398&r2=810399&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java Wed Sep 2 07:52:47 2009
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.jms;
-
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -35,8 +34,6 @@
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
-
-
/**
* @version $Revision$
*/