You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/22 23:53:05 UTC
svn commit: r1353042 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/
test/java/org/apache/activemq/command/ test/java/org/apache/activemq/network/
Author: tabish
Date: Fri Jun 22 21:53:01 2012
New Revision: 1353042
URL: http://svn.apache.org/viewvc?rev=1353042&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3787
Adds support for setting useCompression on NetworkConnectors to enforce that all messages forwarded across the network are compressed.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Fri Jun 22 21:53:01 2012
@@ -118,7 +118,8 @@ public class ActiveMQBytesMessage extend
storeContent();
}
- private void storeContent() {
+ @Override
+ public void storeContent() {
try {
if (dataOut != null) {
dataOut.close();
@@ -853,4 +854,22 @@ public class ActiveMQBytesMessage extend
public String toString() {
return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
}
+
+ @Override
+ protected void doCompress() throws IOException {
+ compressed = true;
+ ByteSequence bytes = getContent();
+ int length = bytes.getLength();
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ bytesOut.write(new byte[4]);
+ DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
+ DataOutputStream dataOut = new DataOutputStream(os);
+ dataOut.write(bytes.data, bytes.offset, bytes.length);
+ dataOut.flush();
+ dataOut.close();
+ bytes = bytesOut.toByteSequence();
+ ByteSequenceData.writeIntBig(bytes, length);
+ bytes.offset = 0;
+ setContent(bytes);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Fri Jun 22 21:53:01 2012
@@ -131,7 +131,8 @@ public class ActiveMQMapMessage extends
map.clear();
}
- private void storeContent() {
+ @Override
+ public void storeContent() {
try {
if (getContent() == null && !map.isEmpty()) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
@@ -741,6 +742,12 @@ public class ActiveMQMapMessage extends
setContent(null);
}
+ @Override
+ public void compress() throws IOException {
+ storeContent();
+ super.compress();
+ }
+
public String toString() {
return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Fri Jun 22 21:53:01 2012
@@ -20,15 +20,16 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Vector;
+
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.scheduler.CronParser;
@@ -39,7 +40,7 @@ import org.apache.activemq.util.JMSExcep
import org.apache.activemq.util.TypeConversionSupport;
/**
- *
+ *
* @openwire:marshaller code="23"
*/
public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
@@ -53,7 +54,6 @@ public class ActiveMQMessage extends Mes
return DATA_STRUCTURE_TYPE;
}
-
@Override
public Message copy() {
ActiveMQMessage copy = new ActiveMQMessage();
@@ -280,6 +280,7 @@ public class ActiveMQMessage extends Mes
}
}
+ @SuppressWarnings("rawtypes")
public Enumeration getPropertyNames() throws JMSException {
try {
Vector<String> result = new Vector<String>(this.getProperties().keySet());
@@ -294,6 +295,7 @@ public class ActiveMQMessage extends Mes
* @return Enumeration of all property names on this message
* @throws JMSException
*/
+ @SuppressWarnings("rawtypes")
public Enumeration getAllPropertyNames() throws JMSException {
try {
Vector<String> result = new Vector<String>(this.getProperties().keySet());
@@ -305,7 +307,6 @@ public class ActiveMQMessage extends Mes
}
interface PropertySetter {
-
void set(Message message, Object value) throws MessageFormatException;
}
@@ -445,10 +446,8 @@ public class ActiveMQMessage extends Mes
}
}
- public void setProperties(Map properties) throws JMSException {
- for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
-
+ public void setProperties(Map<String, ?> properties) throws JMSException {
+ for (Map.Entry<String, ?> entry : properties.entrySet()) {
// Lets use the object property method as we may contain standard
// extension headers like JMSXGroupID
setObjectProperty((String) entry.getKey(), entry.getValue());
@@ -473,7 +472,7 @@ public class ActiveMQMessage extends Mes
}
}
}
-
+
protected void checkValidScheduled(String name, Object value) throws MessageFormatException {
if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
if (value instanceof Long == false && value instanceof Integer == false) {
@@ -484,7 +483,7 @@ public class ActiveMQMessage extends Mes
CronParser.validate(value.toString());
}
}
-
+
protected Object convertScheduled(String name, Object value) throws MessageFormatException {
Object result = value;
if (AMQ_SCHEDULED_DELAY.equals(name)){
@@ -680,4 +679,8 @@ public class ActiveMQMessage extends Mes
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessage(this);
}
+
+ @Override
+ public void storeContent() {
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Fri Jun 22 21:53:01 2012
@@ -53,7 +53,7 @@ import org.apache.activemq.wireformat.Wi
* <CODE>MessageNotWriteableException</CODE> is thrown. If
* <CODE>clearBody</CODE> is called, the message can now be both read from and
* written to.
- *
+ *
* @openwire:marshaller code="26"
* @see javax.jms.Session#createObjectMessage()
* @see javax.jms.Session#createObjectMessage(Serializable)
@@ -64,10 +64,10 @@ import org.apache.activemq.wireformat.Wi
* @see javax.jms.TextMessage
*/
public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
-
+
// TODO: verify classloader
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
- static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
+ static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
protected transient Serializable object;
@@ -86,9 +86,10 @@ public class ActiveMQObjectMessage exten
copy.object = object;
}
super.copy(copy);
-
+
}
+ @Override
public void storeContent() {
ByteSequence bodyAsBytes = getContent();
if (bodyAsBytes == null && object != null) {
@@ -128,7 +129,7 @@ public class ActiveMQObjectMessage exten
* If this message body was read-only, calling this method leaves the
* message body in the same state as an empty body in a newly created
* message.
- *
+ *
* @throws JMSException if the JMS provider fails to clear the message body
* due to some internal error.
*/
@@ -144,7 +145,7 @@ public class ActiveMQObjectMessage exten
* snapshot of the object at the time <CODE>setObject()</CODE> is called;
* subsequent modifications of the object will have no effect on the
* <CODE>ObjectMessage</CODE> body.
- *
+ *
* @param newObject the message's data
* @throws JMSException if the JMS provider fails to set the object due to
* some internal error.
@@ -166,7 +167,7 @@ public class ActiveMQObjectMessage exten
/**
* Gets the serializable object containing this message's data. The default
* value is null.
- *
+ *
* @return the serializable object containing this message's data
* @throws JMSException
*/
@@ -214,6 +215,12 @@ public class ActiveMQObjectMessage exten
object = null;
}
+ @Override
+ public void compress() throws IOException {
+ storeContent();
+ super.compress();
+ }
+
public String toString() {
try {
getObject();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Fri Jun 22 21:53:01 2012
@@ -83,16 +83,16 @@ import org.apache.activemq.util.Marshall
* <CODE>String</CODE> representation of the primitive. <p/>
* <P>
* A value written as the row type can be read as the column type. <p/>
- *
+ *
* <PRE>
* | | boolean byte short char int long float double String byte[]
* |----------------------------------------------------------------------
* |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
* |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
* X |----------------------------------------------------------------------
- *
+ *
* </PRE>
- *
+ *
* <p/>
* <P>
* Attempting to read a null value as a primitive type must be treated as
@@ -100,7 +100,7 @@ import org.apache.activemq.util.Marshall
* conversion method with a null value. Since <code>char</code> does not
* support a <code>String</code> conversion, attempting to read a null value
* as a <code>char</code> must throw a <code>NullPointerException</code>.
- *
+ *
* @openwire:marshaller code="27"
* @see javax.jms.Session#createStreamMessage()
* @see javax.jms.BytesMessage
@@ -137,7 +137,8 @@ public class ActiveMQStreamMessage exten
storeContent();
}
- private void storeContent() {
+ @Override
+ public void storeContent() {
if (dataOut != null) {
try {
dataOut.close();
@@ -165,7 +166,7 @@ public class ActiveMQStreamMessage exten
* If this message body was read-only, calling this method leaves the
* message body in the same state as an empty body in a newly created
* message.
- *
+ *
* @throws JMSException if the JMS provider fails to clear the message body
* due to some internal error.
*/
@@ -180,7 +181,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a <code>boolean</code> from the stream message.
- *
+ *
* @return the <code>boolean</code> value read
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -221,7 +222,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a <code>byte</code> value from the stream message.
- *
+ *
* @return the next byte from the stream message as a 8-bit
* <code>byte</code>
* @throws JMSException if the JMS provider fails to read the message due to
@@ -271,7 +272,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a 16-bit integer from the stream message.
- *
+ *
* @return a 16-bit integer from the stream message
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -324,7 +325,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a Unicode character value from the stream message.
- *
+ *
* @return a Unicode character from the stream message
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -370,7 +371,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a 32-bit integer from the stream message.
- *
+ *
* @return a 32-bit integer value from the stream message, interpreted as an
* <code>int</code>
* @throws JMSException if the JMS provider fails to read the message due to
@@ -426,7 +427,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a 64-bit integer from the stream message.
- *
+ *
* @return a 64-bit integer value from the stream message, interpreted as a
* <code>long</code>
* @throws JMSException if the JMS provider fails to read the message due to
@@ -485,7 +486,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a <code>float</code> from the stream message.
- *
+ *
* @return a <code>float</code> value from the stream message
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -533,7 +534,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a <code>double</code> from the stream message.
- *
+ *
* @return a <code>double</code> value from the stream message
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -585,7 +586,7 @@ public class ActiveMQStreamMessage exten
/**
* Reads a <CODE>String</CODE> from the stream message.
- *
+ *
* @return a Unicode string from the stream message
* @throws JMSException if the JMS provider fails to read the message due to
* some internal error.
@@ -682,7 +683,7 @@ public class ActiveMQStreamMessage exten
* <P>
* To read the byte field value into a new <CODE>byte[]</CODE> object, use
* the <CODE>readObject</CODE> method.
- *
+ *
* @param value the buffer into which the data is read
* @return the total number of bytes read into the buffer, or -1 if there is
* no more data because the end of the byte field has been reached
@@ -755,7 +756,7 @@ public class ActiveMQStreamMessage exten
* An attempt to call <CODE>readObject</CODE> to read a byte field value
* into a new <CODE>byte[]</CODE> object before the full value of the byte
* field has been read will throw a <CODE>MessageFormatException</CODE>.
- *
+ *
* @return a Java object from the stream message, in objectified format (for
* example, if the object was written as an <CODE>int</CODE>, an
* <CODE>Integer</CODE> is returned)
@@ -841,7 +842,7 @@ public class ActiveMQStreamMessage exten
* Writes a <code>boolean</code> to the stream message. The value
* <code>true</code> is written as the value <code>(byte)1</code>; the
* value <code>false</code> is written as the value <code>(byte)0</code>.
- *
+ *
* @param value the <code>boolean</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -859,7 +860,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>byte</code> to the stream message.
- *
+ *
* @param value the <code>byte</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -877,7 +878,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>short</code> to the stream message.
- *
+ *
* @param value the <code>short</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -895,7 +896,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>char</code> to the stream message.
- *
+ *
* @param value the <code>char</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -913,7 +914,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes an <code>int</code> to the stream message.
- *
+ *
* @param value the <code>int</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -931,7 +932,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>long</code> to the stream message.
- *
+ *
* @param value the <code>long</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -949,7 +950,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>float</code> to the stream message.
- *
+ *
* @param value the <code>float</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -967,7 +968,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>double</code> to the stream message.
- *
+ *
* @param value the <code>double</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -985,7 +986,7 @@ public class ActiveMQStreamMessage exten
/**
* Writes a <code>String</code> to the stream message.
- *
+ *
* @param value the <code>String</code> value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -1011,7 +1012,7 @@ public class ActiveMQStreamMessage exten
* The byte array <code>value</code> is written to the message as a byte
* array field. Consecutively written byte array fields are treated as two
* distinct fields when the fields are read.
- *
+ *
* @param value the byte array value to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -1029,7 +1030,7 @@ public class ActiveMQStreamMessage exten
* The a portion of the byte array <code>value</code> is written to the
* message as a byte array field. Consecutively written byte array fields
* are treated as two distinct fields when the fields are read.
- *
+ *
* @param value the byte array value to be written
* @param offset the initial offset within the byte array
* @param length the number of bytes to use
@@ -1053,7 +1054,7 @@ public class ActiveMQStreamMessage exten
* This method works only for the objectified primitive object types (<code>Integer</code>,
* <code>Double</code>, <code>Long</code> ...),
* <code>String</code> objects, and byte arrays.
- *
+ *
* @param value the Java object to be written
* @throws JMSException if the JMS provider fails to write the message due
* to some internal error.
@@ -1097,7 +1098,7 @@ public class ActiveMQStreamMessage exten
/**
* Puts the message body in read-only mode and repositions the stream of
* bytes to the beginning.
- *
+ *
* @throws JMSException if an internal error occurs
*/
@@ -1146,6 +1147,12 @@ public class ActiveMQStreamMessage exten
}
}
+ @Override
+ public void compress() throws IOException {
+ storeContent();
+ super.compress();
+ }
+
public String toString() {
return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Fri Jun 22 21:53:01 2012
@@ -105,20 +105,28 @@ public class ActiveMQTextMessage extends
public void beforeMarshall(WireFormat wireFormat) throws IOException {
super.beforeMarshall(wireFormat);
+ storeContent();
+ }
- ByteSequence content = getContent();
- if (content == null && text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- OutputStream os = bytesOut;
- ActiveMQConnection connection = getConnection();
- if (connection != null && connection.isUseCompression()) {
- compressed = true;
- os = new DeflaterOutputStream(os);
+ @Override
+ public void storeContent() {
+ try {
+ ByteSequence content = getContent();
+ if (content == null && text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = bytesOut;
+ ActiveMQConnection connection = getConnection();
+ if (connection != null && connection.isUseCompression()) {
+ compressed = true;
+ os = new DeflaterOutputStream(os);
+ }
+ DataOutputStream dataOut = new DataOutputStream(os);
+ MarshallingSupport.writeUTF8(dataOut, this.text);
+ dataOut.close();
+ setContent(bytesOut.toByteSequence());
}
- DataOutputStream dataOut = new DataOutputStream(os);
- MarshallingSupport.writeUTF8(dataOut, this.text);
- dataOut.close();
- setContent(bytesOut.toByteSequence());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jun 22 21:53:01 2012
@@ -19,10 +19,14 @@ package org.apache.activemq.command;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+
import javax.jms.JMSException;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.Destination;
@@ -94,6 +98,7 @@ public abstract class Message extends Ba
public abstract Message copy();
public abstract void clearBody() throws JMSException;
+ public abstract void storeContent();
// useful to reduce the memory footprint of a persisted message
public void clearMarshalledState() throws JMSException {
@@ -743,6 +748,25 @@ public abstract class Message extends Ba
return false;
}
+ public void compress() throws IOException {
+ if (!isCompressed()) {
+ storeContent();
+ if (!isCompressed() && getContent() != null) {
+ doCompress();
+ }
+ }
+ }
+
+ protected void doCompress() throws IOException {
+ compressed = true;
+ ByteSequence bytes = getContent();
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ OutputStream os = new DeflaterOutputStream(bytesOut);
+ os.write(bytes.data, bytes.offset, bytes.length);
+ os.close();
+ setContent(bytesOut.toByteSequence());
+ }
+
@Override
public String toString() {
return toString(null);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jun 22 21:53:01 2012
@@ -666,7 +666,7 @@ public abstract class DemandForwardingBr
}
}
- protected Message configureMessage(MessageDispatch md) {
+ protected Message configureMessage(MessageDispatch md) throws IOException {
Message message = md.getMessage().copy();
// Update the packet to show where it came from.
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
@@ -676,6 +676,9 @@ public abstract class DemandForwardingBr
message.setOriginalTransactionId(message.getTransactionId());
}
message.setTransactionId(null);
+ if (configuration.isUseCompression()) {
+ message.compress();
+ }
return message;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Jun 22 21:53:01 2012
@@ -48,10 +48,10 @@ import org.slf4j.LoggerFactory;
/**
* Forwards all messages from the local broker to the remote broker.
- *
+ *
* @org.apache.xbean.XBean
- *
- *
+ *
+ *
*/
public class ForwardingBridge implements Service {
@@ -77,6 +77,7 @@ public class ForwardingBridge implements
private boolean dispatchAsync;
private String destinationFilter = ">";
private NetworkBridgeListener bridgeFailedListener;
+ private boolean useCompression = false;
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
@@ -232,11 +233,13 @@ public class ForwardingBridge implements
}
message.setTransactionId(null);
+ if (isUseCompression()) {
+ message.compress();
+ }
+
if (!message.isResponseRequired()) {
- // If the message was originally sent using async send, we
- // will preserve that QOS
- // by bridging it using an async send (small chance of
- // message loss).
+ // If the message was originally sent using async send, we will preserve that
+ // QOS by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
dequeueCounter.incrementAndGet();
localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
@@ -382,4 +385,19 @@ public class ForwardingBridge implements
return enqueueCounter.get();
}
+ /**
+ * @param useCompression
+ * True if forwarded Messages should have their bodies compressed.
+ */
+ public void setUseCompression(boolean useCompression) {
+ this.useCompression = useCompression;
+ }
+
+ /**
+ * @return the vale of the useCompression setting, true if forwarded messages will be compressed.
+ */
+ public boolean isUseCompression() {
+ return useCompression;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Jun 22 21:53:01 2012
@@ -16,16 +16,16 @@
*/
package org.apache.activemq.network;
+import java.util.List;
+
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
-import java.util.List;
-
/**
* Configuration for a NetworkBridge
- *
- *
+ *
+ *
*/
public class NetworkBridgeConfiguration {
private boolean conduitSubscriptions = true;
@@ -43,7 +43,7 @@ public class NetworkBridgeConfiguration
private String password;
private String destinationFilter = null;
private String name = "NC";
-
+
private List<ActiveMQDestination> excludedDestinations;
private List<ActiveMQDestination> dynamicallyIncludedDestinations;
private List<ActiveMQDestination> staticallyIncludedDestinations;
@@ -53,6 +53,7 @@ public class NetworkBridgeConfiguration
private boolean alwaysSyncSend = false;
private boolean staticBridge = false;
+ private boolean useCompression = false;
/**
* @return the conduitSubscriptions
@@ -264,41 +265,41 @@ public class NetworkBridgeConfiguration
this.name = name;
}
- public List<ActiveMQDestination> getExcludedDestinations() {
- return excludedDestinations;
- }
-
- public void setExcludedDestinations(
- List<ActiveMQDestination> excludedDestinations) {
- this.excludedDestinations = excludedDestinations;
- }
-
- public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
- return dynamicallyIncludedDestinations;
- }
-
- public void setDynamicallyIncludedDestinations(
- List<ActiveMQDestination> dynamicallyIncludedDestinations) {
- this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
- }
-
- public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
- return staticallyIncludedDestinations;
- }
-
- public void setStaticallyIncludedDestinations(
- List<ActiveMQDestination> staticallyIncludedDestinations) {
- this.staticallyIncludedDestinations = staticallyIncludedDestinations;
- }
-
-
+ public List<ActiveMQDestination> getExcludedDestinations() {
+ return excludedDestinations;
+ }
+
+ public void setExcludedDestinations(
+ List<ActiveMQDestination> excludedDestinations) {
+ this.excludedDestinations = excludedDestinations;
+ }
+
+ public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
+ return dynamicallyIncludedDestinations;
+ }
+
+ public void setDynamicallyIncludedDestinations(
+ List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+ this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+ }
+
+ public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
+ return staticallyIncludedDestinations;
+ }
+
+ public void setStaticallyIncludedDestinations(
+ List<ActiveMQDestination> staticallyIncludedDestinations) {
+ this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+ }
+
+
public boolean isSuppressDuplicateQueueSubscriptions() {
return suppressDuplicateQueueSubscriptions;
}
-
+
/**
- *
+ *
* @param val if true, duplicate network queue subscriptions (in a cyclic network) will be suppressed
*/
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
@@ -310,13 +311,13 @@ public class NetworkBridgeConfiguration
}
/**
- *
+ *
* @param val if true, duplicate network topic subscriptions (in a cyclic network) will be suppressed
*/
public void setSuppressDuplicateTopicSubscriptions(boolean val) {
suppressDuplicateTopicSubscriptions = val;
}
-
+
/**
* @return the brokerURL
*/
@@ -368,4 +369,19 @@ public class NetworkBridgeConfiguration
public void setStaticBridge(boolean staticBridge) {
this.staticBridge = staticBridge;
}
+
+ /**
+ * @param useCompression
+ * True if the Network should enforce compression for messages sent.
+ */
+ public void setUseCompression(boolean useCompression) {
+ this.useCompression = useCompression;
+ }
+
+ /**
+ * @return the useCompression setting, true if message will be compressed on send.
+ */
+ public boolean isUseCompression() {
+ return useCompression;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java Fri Jun 22 21:53:01 2012
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class ActiveMQMessageTest extends TestCase {
@@ -54,10 +54,10 @@ public class ActiveMQMessageTest extends
private int jmsPriority;
private long jmsTimestamp;
private long[] consumerIDs;
-
+
/**
* Constructor for ActiveMQMessageTest.
- *
+ *
* @param name
*/
public ActiveMQMessageTest(String name) {
@@ -87,7 +87,6 @@ public class ActiveMQMessageTest extends
for (int i = 0; i < this.consumerIDs.length; i++) {
this.consumerIDs[i] = i;
}
-
}
/*
@@ -126,7 +125,6 @@ public class ActiveMQMessageTest extends
public void testSetToForeignJMSID() throws Exception {
ActiveMQMessage msg = new ActiveMQMessage();
msg.setJMSMessageID("ID:EMS-SERVER.8B443C380083:429");
-
}
/*
@@ -269,7 +267,7 @@ public class ActiveMQMessageTest extends
assertEquals(0, msg.getJMSPriority());
msg.setJMSPriority(90);
- assertEquals(9, msg.getJMSPriority());
+ assertEquals(9, msg.getJMSPriority());
}
public void testClearProperties() throws JMSException {
@@ -356,6 +354,7 @@ public class ActiveMQMessageTest extends
assertTrue(((Float)msg.getObjectProperty(name)).floatValue() == 1.3f);
}
+ @SuppressWarnings("rawtypes")
public void testGetPropertyNames() throws JMSException {
ActiveMQMessage msg = new ActiveMQMessage();
String name1 = "floatProperty";
@@ -379,6 +378,7 @@ public class ActiveMQMessageTest extends
assertFalse("prop name4 not found", found3);
}
+ @SuppressWarnings("rawtypes")
public void testGetAllPropertyNames() throws JMSException {
ActiveMQMessage msg = new ActiveMQMessage();
String name1 = "floatProperty";
@@ -449,7 +449,11 @@ public class ActiveMQMessageTest extends
}
@Override
- public void clearBody() throws JMSException {
+ public void clearBody() throws JMSException {
+ }
+
+ @Override
+ public void storeContent() {
}
};
@@ -465,7 +469,7 @@ public class ActiveMQMessageTest extends
msg.beforeMarshall(new OpenWireFormat());
- Map properties = msg.getProperties();
+ Map<String, Object> properties = msg.getProperties();
assertEquals(properties.get("stringProperty"), "string");
assertEquals(((Byte)properties.get("byteProperty")).byteValue(), 1);
assertEquals(((Short)properties.get("shortProperty")).shortValue(), 1);
@@ -475,7 +479,6 @@ public class ActiveMQMessageTest extends
assertEquals(((Double)properties.get("doubleProperty")).doubleValue(), 1.1, 0);
assertEquals(((Boolean)properties.get("booleanProperty")).booleanValue(), true);
assertNull(properties.get("nullProperty"));
-
}
public void testSetNullProperty() throws JMSException {
@@ -942,5 +945,4 @@ public class ActiveMQMessageTest extends
msg.setJMSExpiration(System.currentTimeMillis() + 10000);
assertFalse(msg.isExpired());
}
-
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java?rev=1353042&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java Fri Jun 22 21:53:01 2012
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class CompressionOverNetworkTest {
+
+ protected static final int MESSAGE_COUNT = 10;
+ private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
+
+ protected AbstractApplicationContext context;
+ protected Connection localConnection;
+ protected Connection remoteConnection;
+ protected BrokerService localBroker;
+ protected BrokerService remoteBroker;
+ protected Session localSession;
+ protected Session remoteSession;
+ protected ActiveMQTopic included;
+
+ @Test
+ public void testCompressedOverCompressedNetwork() throws Exception {
+
+ ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
+ localAmqConnection.setUseCompression(true);
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ StringBuilder payload = new StringBuilder("test-");
+ for (int i = 0; i < 100; ++i) {
+ payload.append(UUID.randomUUID().toString());
+ }
+
+ Message test = localSession.createTextMessage(payload.toString());
+ producer.send(test);
+ Message msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+ assertTrue(message.isCompressed());
+ assertEquals(payload.toString(), message.getText());
+ }
+
+ @Test
+ public void testTextMessageCompression() throws Exception {
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ StringBuilder payload = new StringBuilder("test-");
+ for (int i = 0; i < 100; ++i) {
+ payload.append(UUID.randomUUID().toString());
+ }
+
+ Message test = localSession.createTextMessage(payload.toString());
+ producer.send(test);
+ Message msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+ assertTrue(message.isCompressed());
+ assertEquals(payload.toString(), message.getText());
+ }
+
+ @Test
+ public void testBytesMessageCompression() throws Exception {
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ StringBuilder payload = new StringBuilder("test-");
+ for (int i = 0; i < 100; ++i) {
+ payload.append(UUID.randomUUID().toString());
+ }
+
+ byte[] bytes = payload.toString().getBytes("UTF-8");
+
+ BytesMessage test = localSession.createBytesMessage();
+ test.writeBytes(bytes);
+ producer.send(test);
+ Message msg = consumer1.receive(1000*6000);
+ assertNotNull(msg);
+ ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg;
+ assertTrue(message.isCompressed());
+ assertTrue(message.getContent().getLength() < bytes.length);
+
+ byte[] result = new byte[bytes.length];
+ assertEquals(bytes.length, message.readBytes(result));
+ assertEquals(-1, message.readBytes(result));
+
+ for(int i = 0; i < bytes.length; ++i) {
+ assertEquals(bytes[i], result[i]);
+ }
+ }
+
+ @Test
+ public void testStreamMessageCompression() throws Exception {
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ StreamMessage test = localSession.createStreamMessage();
+
+ for (int i = 0; i < 100; ++i) {
+ test.writeString("test string: " + i);
+ }
+
+ producer.send(test);
+ Message msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg;
+ assertTrue(message.isCompressed());
+
+ for (int i = 0; i < 100; ++i) {
+ assertEquals("test string: " + i, message.readString());
+ }
+ }
+
+ @Test
+ public void testMapMessageCompression() throws Exception {
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ MapMessage test = localSession.createMapMessage();
+
+ for (int i = 0; i < 100; ++i) {
+ test.setString(Integer.toString(i), "test string: " + i);
+ }
+
+ producer.send(test);
+ Message msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
+ assertTrue(message.isCompressed());
+
+ for (int i = 0; i < 100; ++i) {
+ assertEquals("test string: " + i, message.getString(Integer.toString(i)));
+ }
+ }
+
+ @Test
+ public void testObjectMessageCompression() throws Exception {
+
+ MessageConsumer consumer1 = remoteSession.createConsumer(included);
+ MessageProducer producer = localSession.createProducer(included);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ StringBuilder payload = new StringBuilder("test-");
+ for (int i = 0; i < 100; ++i) {
+ payload.append(UUID.randomUUID().toString());
+ }
+
+ Message test = localSession.createObjectMessage(payload.toString());
+ producer.send(test);
+ Message msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
+ assertTrue(message.isCompressed());
+ assertEquals(payload.toString(), message.getObject());
+ }
+
+ private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
+ assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
+ if (bridges.length > 0) {
+ LOG.info(brokerService + " bridges " + Arrays.toString(bridges));
+ DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
+ ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
+ LOG.info(brokerService + " bridge " + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
+ if (!forwardingBridges.isEmpty()) {
+ for (DemandSubscription demandSubscription : forwardingBridges.values()) {
+ if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
+ LOG.info(brokerService + " DemandSubscription " + demandSubscription + ", size: " + demandSubscription.size());
+ return demandSubscription.size() >= min;
+ }
+ }
+ }
+ }
+ return false;
+ }
+ }));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ doSetUp(true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ doTearDown();
+ }
+
+ protected void doTearDown() throws Exception {
+ localConnection.close();
+ remoteConnection.close();
+ localBroker.stop();
+ remoteBroker.stop();
+ }
+
+ protected void doSetUp(boolean deleteAllMessages) throws Exception {
+ remoteBroker = createRemoteBroker();
+ remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ localBroker = createLocalBroker();
+ localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+ URI localURI = localBroker.getVmConnectorURI();
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+ fac.setAlwaysSyncSend(true);
+ fac.setDispatchAsync(false);
+ localConnection = fac.createConnection();
+ localConnection.setClientID("clientId");
+ localConnection.start();
+ URI remoteURI = remoteBroker.getVmConnectorURI();
+ fac = new ActiveMQConnectionFactory(remoteURI);
+ remoteConnection = fac.createConnection();
+ remoteConnection.setClientID("clientId");
+ remoteConnection.start();
+ included = new ActiveMQTopic("include.test.bar");
+ localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected String getRemoteBrokerURI() {
+ return "org/apache/activemq/network/remoteBroker.xml";
+ }
+
+ protected String getLocalBrokerURI() {
+ return "org/apache/activemq/network/localBroker.xml";
+ }
+
+ protected BrokerService createBroker(String uri) throws Exception {
+ Resource resource = new ClassPathResource(uri);
+ BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+ resource = new ClassPathResource(uri);
+ factory = new BrokerFactoryBean(resource);
+ factory.afterPropertiesSet();
+ BrokerService result = factory.getBroker();
+
+ for (NetworkConnector connector : result.getNetworkConnectors()) {
+ connector.setUseCompression(true);
+ }
+
+ return result;
+ }
+
+ protected BrokerService createLocalBroker() throws Exception {
+ return createBroker(getLocalBrokerURI());
+ }
+
+ protected BrokerService createRemoteBroker() throws Exception {
+ return createBroker(getRemoteBrokerURI());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Fri Jun 22 21:53:01 2012
@@ -19,8 +19,10 @@ package org.apache.activemq.network;
import javax.jms.DeliveryMode;
import junit.framework.Test;
+
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -34,6 +36,56 @@ public class ForwardingBridgeTest extend
public int deliveryMode;
private ForwardingBridge bridge;
+ public void initCombosForTestForwardMessageCompressed() {
+ addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
+ new Integer(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE),
+ new Byte(ActiveMQDestination.TOPIC_TYPE)});
+ }
+
+ public void testForwardMessageCompressed() throws Exception {
+
+ bridge.setUseCompression(true);
+
+ // Start a producer on local broker
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ // Start a consumer on a remote broker
+ StubConnection connection2 = createRemoteConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
+ connection2.send(consumerInfo);
+ Thread.sleep(1000);
+ // Give forwarding bridge a chance to finish setting up
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ ie.printStackTrace();
+ }
+
+ // Send the message to the local boker.
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+
+ // Make sure the message was delivered via the remote.
+ Message m = receiveMessage(connection2);
+ assertNotNull(m);
+
+ // Make sure its compressed now
+ ActiveMQMessage message = (ActiveMQMessage) m;
+ assertTrue(message.isCompressed());
+ }
+
public void initCombosForTestAddConsumerThenSend() {
addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
new Integer(DeliveryMode.PERSISTENT)});