You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/04/24 15:17:26 UTC

svn commit: r768300 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/util/ main/resources/ main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/ test/java/org/apache/activemq/broker/util/ ...

Author: dejanb
Date: Fri Apr 24 13:17:25 2009
New Revision: 768300

URL: http://svn.apache.org/viewvc?rev=768300&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2224 and additional fix for https://issues.apache.org/activemq/browse/AMQ-2221

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TraceBrokerPathPlugin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/plugin-broker.xml
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/LoggingBrokerTest.java
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/logging-broker.xml
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
    activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?rev=768300&r1=768299&r2=768300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java Fri Apr 24 13:17:25 2009
@@ -46,8 +46,6 @@
  */
 public class TimeStampingBrokerPlugin extends BrokerPluginSupport {
 
-    private static final Log LOG = LogFactory.getLog(TimeStampingBrokerPlugin.class);
-
     /** 
     * variable which (when non-zero) is used to override
     * the expiration date for messages that arrive with

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TraceBrokerPathPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TraceBrokerPathPlugin.java?rev=768300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TraceBrokerPathPlugin.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/util/TraceBrokerPathPlugin.java Fri Apr 24 13:17:25 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The TraceBrokerPathPlugin can be used in a network of Brokers. Each Broker 
+ * that has the plugin configured, will add it's brokerName to the content 
+ * of a JMS Property. If all Brokers have this property enabled, the path the 
+ * message actually took through the network can be seen in the defined property.
+ * 
+ * @org.apache.xbean.XBean element="traceBrokerPathPlugin"
+ * 
+ * @version $Revision$
+ */
+
+public class TraceBrokerPathPlugin extends BrokerPluginSupport {
+
+	private String stampProperty = "BrokerPath";
+    private static final Log LOG = LogFactory.getLog(TraceBrokerPathPlugin.class);
+	
+	public String getStampProperty() {
+		return stampProperty;
+	}
+
+	public void setStampProperty(String stampProperty) {
+		this.stampProperty = stampProperty;
+	}
+
+	public void preProcessDispatch(MessageDispatch messageDispatch) {
+		try {
+    		String brokerStamp = (String)messageDispatch.getMessage().getProperty(getStampProperty());
+    		if (brokerStamp == null) {
+    			brokerStamp = getBrokerName();
+    		} else {
+    			brokerStamp += "," + getBrokerName();
+    		}
+    		messageDispatch.getMessage().setProperty(getStampProperty(), brokerStamp);
+		} catch (IOException ioe) {
+			LOG.warn("Setting broker property failed " + ioe, ioe);
+		}
+		super.preProcessDispatch(messageDispatch);
+	}
+}

Modified: activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core?rev=768300&r1=768299&r2=768300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core (original)
+++ activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/xbean/spring/http/activemq.apache.org/schema/core Fri Apr 24 13:17:25 2009
@@ -245,6 +245,8 @@
 topic = org.apache.activemq.command.ActiveMQTopic
 org.apache.activemq.command.ActiveMQTopic(java.lang.String).parameterNames = name
 
+traceBrokerPathPlugin = org.apache.activemq.broker.util.TraceBrokerPathPlugin
+
 transportConnector = org.apache.activemq.broker.TransportConnector
 org.apache.activemq.broker.TransportConnector(org.apache.activemq.transport.TransportServer).parameterNames = server
 

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd?rev=768300&r1=768299&r2=768300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd Fri Apr 24 13:17:25 2009
@@ -584,6 +584,7 @@
               <xs:element ref='tns:simpleAuthenticationPlugin'/>
               <xs:element ref='tns:timeStampingBrokerPlugin'/>
               <xs:element ref='tns:udpTraceBrokerPlugin'/>
+              <xs:element ref='tns:traceBrokerPathPlugin'/>
               <xs:any namespace='##other'/>
             </xs:choice>
           </xs:complexType>
@@ -614,6 +615,7 @@
               <xs:element ref='tns:multicastTraceBrokerPlugin'/>
               <xs:element ref='tns:timeStampingBrokerPlugin'/>
               <xs:element ref='tns:udpTraceBrokerPlugin'/>
+              <xs:element ref='tns:traceBrokerPathPlugin'/>
               <xs:any namespace='##other'/>
             </xs:choice>
           </xs:complexType>
@@ -3133,6 +3135,7 @@
               <xs:element ref='tns:multicastTraceBrokerPlugin'/>
               <xs:element ref='tns:timeStampingBrokerPlugin'/>
               <xs:element ref='tns:udpTraceBrokerPlugin'/>
+              <xs:element ref='tns:traceBrokerPathPlugin'/>
               <xs:any namespace='##other'/>
             </xs:choice>
           </xs:complexType>
@@ -3149,6 +3152,43 @@
       <xs:anyAttribute namespace='##other' processContents='lax'/>
     </xs:complexType>
   </xs:element>
+  
+    <!-- element for type: org.apache.activemq.broker.util.TraceBrokerPathPlugin -->
+  <xs:element name='traceBrokerPathPlugin'>
+    <xs:annotation>
+      <xs:documentation><![CDATA[
+The TraceBrokerPathPlugin can be used in a network of Brokers. Each Broker 
+that has the plugin configured, will add it's brokerName to the content 
+of a JMS Property. If all Brokers have this property enabled, the path the 
+message actually took through the network can be seen in the defined property.
+      ]]></xs:documentation>
+    </xs:annotation>
+    <xs:complexType>
+    	<xs:sequence>
+    		<xs:element name='next' minOccurs='0' maxOccurs='1'>
+    			<xs:complexType>
+    				<xs:choice minOccurs='0' maxOccurs='1'>
+    					<xs:element ref='tns:loggingBrokerPlugin' />
+    					<xs:element
+    						ref='tns:multicastTraceBrokerPlugin' />
+    					<xs:element ref='tns:timeStampingBrokerPlugin' />
+    					<xs:element ref='tns:udpTraceBrokerPlugin' />
+    					<xs:element ref='tns:traceBrokerPathPlugin'/>
+    					<xs:any namespace='##other' />
+    				</xs:choice>
+    			</xs:complexType>
+    		</xs:element>
+    		<xs:any namespace='##other' minOccurs='0'
+    			maxOccurs='unbounded' />
+    		<xs:element name="stampProperty" type="xs:string"
+    			maxOccurs="1" minOccurs="0">
+    		</xs:element>
+    	</xs:sequence>
+
+    	<xs:attribute name="stampProperty" type="xs:string"></xs:attribute>
+    	<xs:anyAttribute namespace='##other' processContents='lax' />
+    </xs:complexType>
+  </xs:element>
 
 
   <!-- element for type: org.apache.activemq.broker.jmx.ManagementContext -->
@@ -3540,6 +3580,7 @@
               <xs:element ref='tns:multicastTraceBrokerPlugin'/>
               <xs:element ref='tns:timeStampingBrokerPlugin'/>
               <xs:element ref='tns:udpTraceBrokerPlugin'/>
+              <xs:element ref='tns:traceBrokerPathPlugin'/>
               <xs:any namespace='##other'/>
             </xs:choice>
           </xs:complexType>
@@ -5114,23 +5155,30 @@
     </xs:annotation>
     <xs:complexType>
       <xs:sequence>
-        <xs:element name='adminConnectionContext' minOccurs='0' maxOccurs='1'>
-          <xs:complexType>
-            <xs:sequence minOccurs='0' maxOccurs='1'><xs:any namespace='##other'/></xs:sequence>
-          </xs:complexType>
-        </xs:element>
-        <xs:element name='next' minOccurs='0' maxOccurs='1'>
-          <xs:complexType>
-            <xs:choice minOccurs='0' maxOccurs='1'>
-              <xs:element ref='tns:loggingBrokerPlugin'/>
-              <xs:element ref='tns:multicastTraceBrokerPlugin'/>
-              <xs:element ref='tns:timeStampingBrokerPlugin'/>
-              <xs:element ref='tns:udpTraceBrokerPlugin'/>
-              <xs:any namespace='##other'/>
-            </xs:choice>
-          </xs:complexType>
-        </xs:element>
-        <xs:any namespace='##other' minOccurs='0' maxOccurs='unbounded'/>
+      	<xs:element name='adminConnectionContext' minOccurs='0'
+      		maxOccurs='1'>
+      		<xs:complexType>
+      			<xs:sequence minOccurs='0' maxOccurs='1'>
+      				<xs:any namespace='##other' />
+      			</xs:sequence>
+      		</xs:complexType>
+      	</xs:element>
+      	<xs:element name='next' minOccurs='0' maxOccurs='1'>
+      		<xs:complexType>
+      			<xs:choice minOccurs='0' maxOccurs='1'>
+      				<xs:element ref='tns:loggingBrokerPlugin' />
+      				<xs:element ref='tns:multicastTraceBrokerPlugin' />
+      				<xs:element ref='tns:timeStampingBrokerPlugin' />
+      				<xs:element ref='tns:udpTraceBrokerPlugin' />
+      				<xs:element ref='tns:traceBrokerPathPlugin' />
+      				<xs:any namespace='##other' />
+      			</xs:choice>
+      		</xs:complexType>
+      	</xs:element>
+      	<xs:any namespace='##other' minOccurs='0'
+      		maxOccurs='unbounded' />
+      	<xs:element name="ttlCeiling" type="xs:int"></xs:element>
+      	<xs:element name="zeroExpirationOverride" type="xs:int"></xs:element>
       </xs:sequence>
       <xs:attribute name='adminConnectionContext' type='xs:string'/>
       <xs:attribute name='next' type='xs:string'/>
@@ -5320,6 +5368,7 @@
               <xs:element ref='tns:multicastTraceBrokerPlugin'/>
               <xs:element ref='tns:timeStampingBrokerPlugin'/>
               <xs:element ref='tns:udpTraceBrokerPlugin'/>
+              <xs:element ref='tns:traceBrokerPathPlugin'/>
               <xs:any namespace='##other'/>
             </xs:choice>
           </xs:complexType>

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java?rev=768300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/util/PluginBrokerTest.java Fri Apr 24 13:17:25 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.util;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * 
+ * @version $Revision: 564271 $
+ */
+public class PluginBrokerTest extends JmsTopicSendReceiveTest {
+    private static final Log LOG = LogFactory.getLog(PluginBrokerTest.class);
+    private BrokerService broker;
+
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }   
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBroker("org/apache/activemq/util/plugin-broker.xml");
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        LOG.info("Loading broker configuration from the classpath with URI: " + uri);
+        return BrokerFactory.createBroker(new URI("xbean:" + uri));
+    }
+
+	protected void assertMessageValid(int index, Message message)
+			throws JMSException {
+		// check if broker path has been set 
+		assertEquals("localhost", message.getStringProperty("BrokerPath"));
+		ActiveMQMessage amqMsg = (ActiveMQMessage)message;
+		if (index == 7) {
+			// check custom expiration
+			assertEquals(2000, amqMsg.getExpiration() - amqMsg.getTimestamp());
+		} else if (index == 9) {
+			// check ceiling
+			assertEquals(60000, amqMsg.getExpiration() - amqMsg.getTimestamp());
+		} else {
+			// check default expiration
+			assertEquals(1000, amqMsg.getExpiration() - amqMsg.getTimestamp());
+		}
+		super.assertMessageValid(index, message);
+	}
+	
+    protected void sendMessage(int index, Message message) throws Exception {
+    	if (index == 7) {
+    		producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 2000);
+    	} else if (index == 9) {
+    		producer.send(producerDestination, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 200000);
+    	} else {
+    		super.sendMessage(index, message);
+    	}
+    }
+    
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java?rev=768300&r1=768299&r2=768300&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/JmsSendReceiveTestSupport.java Fri Apr 24 13:17:25 2009
@@ -78,7 +78,6 @@
 
         LOG.info("Message count for test case is: " + messageCount);
         data = new String[messageCount];
-
         for (int i = 0; i < messageCount; i++) {
             data[i] = createMessageText(i);
         }
@@ -116,13 +115,16 @@
             if (verbose) {
                 LOG.info("About to send a message: " + message + " with text: " + data[i]);
             }
-
-            producer.send(producerDestination, message);
+            sendMessage(i, message);
         }
 
         assertMessagesAreReceived();
         LOG.info("" + data.length + " messages(s) received, closing down connections");
     }
+    
+    protected void sendMessage(int index, Message message) throws Exception {
+    	producer.send(producerDestination, message);
+    }
 
     protected Message createMessage(int index) throws JMSException {
         Message message = session.createTextMessage(data[index]);

Added: activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/plugin-broker.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/plugin-broker.xml?rev=768300&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/plugin-broker.xml (added)
+++ activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/util/plugin-broker.xml Fri Apr 24 13:17:25 2009
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans>
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker useJmx="false" persistent="false" xmlns="http://activemq.apache.org/schema/core">
+
+    <plugins>
+    
+      <!-- lets enable detailed logging in the broker but ignore ConnectionEvents -->
+      <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
+      
+      <timeStampingBrokerPlugin zeroExpirationOverride="1000" ttlCeiling="60000"/>
+      
+      <traceBrokerPathPlugin/>
+      
+    </plugins>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->