You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/22 23:53:05 UTC

svn commit: r1353042 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/command/ test/java/org/apache/activemq/network/

Author: tabish
Date: Fri Jun 22 21:53:01 2012
New Revision: 1353042

URL: http://svn.apache.org/viewvc?rev=1353042&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3787

Adds support for setting useCompression on NetworkConnectors to enforce that all messages forwarded across the network are compressed.  

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java Fri Jun 22 21:53:01 2012
@@ -118,7 +118,8 @@ public class ActiveMQBytesMessage extend
         storeContent();
     }
 
-    private void storeContent() {
+    @Override
+    public void storeContent() {
         try {
             if (dataOut != null) {
                 dataOut.close();
@@ -853,4 +854,22 @@ public class ActiveMQBytesMessage extend
     public String toString() {
         return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
     }
+
+    @Override
+    protected void doCompress() throws IOException {
+        compressed = true;
+        ByteSequence bytes = getContent();
+        int length = bytes.getLength();
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        bytesOut.write(new byte[4]);
+        DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
+        DataOutputStream dataOut = new DataOutputStream(os);
+        dataOut.write(bytes.data, bytes.offset, bytes.length);
+        dataOut.flush();
+        dataOut.close();
+        bytes = bytesOut.toByteSequence();
+        ByteSequenceData.writeIntBig(bytes, length);
+        bytes.offset = 0;
+        setContent(bytes);
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java Fri Jun 22 21:53:01 2012
@@ -131,7 +131,8 @@ public class ActiveMQMapMessage extends 
         map.clear();
     }
 
-    private void storeContent() {
+    @Override
+    public void storeContent() {
         try {
             if (getContent() == null && !map.isEmpty()) {
                 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
@@ -741,6 +742,12 @@ public class ActiveMQMapMessage extends 
         setContent(null);
     }
 
+    @Override
+    public void compress() throws IOException {
+        storeContent();
+        super.compress();
+    }
+
     public String toString() {
         return super.toString() + " ActiveMQMapMessage{ " + "theTable = " + map + " }";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQMessage.java Fri Jun 22 21:53:01 2012
@@ -20,15 +20,16 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageNotWriteableException;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.broker.scheduler.CronParser;
@@ -39,7 +40,7 @@ import org.apache.activemq.util.JMSExcep
 import org.apache.activemq.util.TypeConversionSupport;
 
 /**
- * 
+ *
  * @openwire:marshaller code="23"
  */
 public class ActiveMQMessage extends Message implements org.apache.activemq.Message, ScheduledMessage {
@@ -53,7 +54,6 @@ public class ActiveMQMessage extends Mes
         return DATA_STRUCTURE_TYPE;
     }
 
-
     @Override
     public Message copy() {
         ActiveMQMessage copy = new ActiveMQMessage();
@@ -280,6 +280,7 @@ public class ActiveMQMessage extends Mes
         }
     }
 
+    @SuppressWarnings("rawtypes")
     public Enumeration getPropertyNames() throws JMSException {
         try {
             Vector<String> result = new Vector<String>(this.getProperties().keySet());
@@ -294,6 +295,7 @@ public class ActiveMQMessage extends Mes
      * @return  Enumeration of all property names on this message
      * @throws JMSException
      */
+    @SuppressWarnings("rawtypes")
     public Enumeration getAllPropertyNames() throws JMSException {
         try {
             Vector<String> result = new Vector<String>(this.getProperties().keySet());
@@ -305,7 +307,6 @@ public class ActiveMQMessage extends Mes
     }
 
     interface PropertySetter {
-
         void set(Message message, Object value) throws MessageFormatException;
     }
 
@@ -445,10 +446,8 @@ public class ActiveMQMessage extends Mes
         }
     }
 
-    public void setProperties(Map properties) throws JMSException {
-        for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
-
+    public void setProperties(Map<String, ?> properties) throws JMSException {
+        for (Map.Entry<String, ?> entry : properties.entrySet()) {
             // Lets use the object property method as we may contain standard
             // extension headers like JMSXGroupID
             setObjectProperty((String) entry.getKey(), entry.getValue());
@@ -473,7 +472,7 @@ public class ActiveMQMessage extends Mes
             }
         }
     }
-    
+
     protected void  checkValidScheduled(String name, Object value) throws MessageFormatException {
         if (AMQ_SCHEDULED_DELAY.equals(name) || AMQ_SCHEDULED_PERIOD.equals(name) || AMQ_SCHEDULED_REPEAT.equals(name)) {
             if (value instanceof Long == false && value instanceof Integer == false) {
@@ -484,7 +483,7 @@ public class ActiveMQMessage extends Mes
             CronParser.validate(value.toString());
         }
     }
-    
+
     protected Object  convertScheduled(String name, Object value) throws MessageFormatException {
         Object result = value;
         if (AMQ_SCHEDULED_DELAY.equals(name)){
@@ -680,4 +679,8 @@ public class ActiveMQMessage extends Mes
     public Response visit(CommandVisitor visitor) throws Exception {
         return visitor.processMessage(this);
     }
+
+    @Override
+    public void storeContent() {
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java Fri Jun 22 21:53:01 2012
@@ -53,7 +53,7 @@ import org.apache.activemq.wireformat.Wi
  * <CODE>MessageNotWriteableException</CODE> is thrown. If
  * <CODE>clearBody</CODE> is called, the message can now be both read from and
  * written to.
- * 
+ *
  * @openwire:marshaller code="26"
  * @see javax.jms.Session#createObjectMessage()
  * @see javax.jms.Session#createObjectMessage(Serializable)
@@ -64,10 +64,10 @@ import org.apache.activemq.wireformat.Wi
  * @see javax.jms.TextMessage
  */
 public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
-    
+
     // TODO: verify classloader
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
-    static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader(); 
+    static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
 
     protected transient Serializable object;
 
@@ -86,9 +86,10 @@ public class ActiveMQObjectMessage exten
             copy.object = object;
         }
         super.copy(copy);
-        
+
     }
 
+    @Override
     public void storeContent() {
         ByteSequence bodyAsBytes = getContent();
         if (bodyAsBytes == null && object != null) {
@@ -128,7 +129,7 @@ public class ActiveMQObjectMessage exten
      * If this message body was read-only, calling this method leaves the
      * message body in the same state as an empty body in a newly created
      * message.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to clear the message body
      *                 due to some internal error.
      */
@@ -144,7 +145,7 @@ public class ActiveMQObjectMessage exten
      * snapshot of the object at the time <CODE>setObject()</CODE> is called;
      * subsequent modifications of the object will have no effect on the
      * <CODE>ObjectMessage</CODE> body.
-     * 
+     *
      * @param newObject the message's data
      * @throws JMSException if the JMS provider fails to set the object due to
      *                 some internal error.
@@ -166,7 +167,7 @@ public class ActiveMQObjectMessage exten
     /**
      * Gets the serializable object containing this message's data. The default
      * value is null.
-     * 
+     *
      * @return the serializable object containing this message's data
      * @throws JMSException
      */
@@ -214,6 +215,12 @@ public class ActiveMQObjectMessage exten
         object = null;
     }
 
+    @Override
+    public void compress() throws IOException {
+        storeContent();
+        super.compress();
+    }
+
     public String toString() {
         try {
             getObject();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQStreamMessage.java Fri Jun 22 21:53:01 2012
@@ -83,16 +83,16 @@ import org.apache.activemq.util.Marshall
  * <CODE>String</CODE> representation of the primitive. <p/>
  * <P>
  * A value written as the row type can be read as the column type. <p/>
- * 
+ *
  * <PRE>
  *  | | boolean byte short char int long float double String byte[]
  * |----------------------------------------------------------------------
  * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
  * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
  * X |----------------------------------------------------------------------
- * 
+ *
  * </PRE>
- * 
+ *
  * <p/>
  * <P>
  * Attempting to read a null value as a primitive type must be treated as
@@ -100,7 +100,7 @@ import org.apache.activemq.util.Marshall
  * conversion method with a null value. Since <code>char</code> does not
  * support a <code>String</code> conversion, attempting to read a null value
  * as a <code>char</code> must throw a <code>NullPointerException</code>.
- * 
+ *
  * @openwire:marshaller code="27"
  * @see javax.jms.Session#createStreamMessage()
  * @see javax.jms.BytesMessage
@@ -137,7 +137,8 @@ public class ActiveMQStreamMessage exten
         storeContent();
     }
 
-    private void storeContent() {
+    @Override
+    public void storeContent() {
         if (dataOut != null) {
             try {
                 dataOut.close();
@@ -165,7 +166,7 @@ public class ActiveMQStreamMessage exten
      * If this message body was read-only, calling this method leaves the
      * message body in the same state as an empty body in a newly created
      * message.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to clear the message body
      *                 due to some internal error.
      */
@@ -180,7 +181,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a <code>boolean</code> from the stream message.
-     * 
+     *
      * @return the <code>boolean</code> value read
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -221,7 +222,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a <code>byte</code> value from the stream message.
-     * 
+     *
      * @return the next byte from the stream message as a 8-bit
      *         <code>byte</code>
      * @throws JMSException if the JMS provider fails to read the message due to
@@ -271,7 +272,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a 16-bit integer from the stream message.
-     * 
+     *
      * @return a 16-bit integer from the stream message
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -324,7 +325,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a Unicode character value from the stream message.
-     * 
+     *
      * @return a Unicode character from the stream message
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -370,7 +371,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a 32-bit integer from the stream message.
-     * 
+     *
      * @return a 32-bit integer value from the stream message, interpreted as an
      *         <code>int</code>
      * @throws JMSException if the JMS provider fails to read the message due to
@@ -426,7 +427,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a 64-bit integer from the stream message.
-     * 
+     *
      * @return a 64-bit integer value from the stream message, interpreted as a
      *         <code>long</code>
      * @throws JMSException if the JMS provider fails to read the message due to
@@ -485,7 +486,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a <code>float</code> from the stream message.
-     * 
+     *
      * @return a <code>float</code> value from the stream message
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -533,7 +534,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a <code>double</code> from the stream message.
-     * 
+     *
      * @return a <code>double</code> value from the stream message
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -585,7 +586,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Reads a <CODE>String</CODE> from the stream message.
-     * 
+     *
      * @return a Unicode string from the stream message
      * @throws JMSException if the JMS provider fails to read the message due to
      *                 some internal error.
@@ -682,7 +683,7 @@ public class ActiveMQStreamMessage exten
      * <P>
      * To read the byte field value into a new <CODE>byte[]</CODE> object, use
      * the <CODE>readObject</CODE> method.
-     * 
+     *
      * @param value the buffer into which the data is read
      * @return the total number of bytes read into the buffer, or -1 if there is
      *         no more data because the end of the byte field has been reached
@@ -755,7 +756,7 @@ public class ActiveMQStreamMessage exten
      * An attempt to call <CODE>readObject</CODE> to read a byte field value
      * into a new <CODE>byte[]</CODE> object before the full value of the byte
      * field has been read will throw a <CODE>MessageFormatException</CODE>.
-     * 
+     *
      * @return a Java object from the stream message, in objectified format (for
      *         example, if the object was written as an <CODE>int</CODE>, an
      *         <CODE>Integer</CODE> is returned)
@@ -841,7 +842,7 @@ public class ActiveMQStreamMessage exten
      * Writes a <code>boolean</code> to the stream message. The value
      * <code>true</code> is written as the value <code>(byte)1</code>; the
      * value <code>false</code> is written as the value <code>(byte)0</code>.
-     * 
+     *
      * @param value the <code>boolean</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -859,7 +860,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>byte</code> to the stream message.
-     * 
+     *
      * @param value the <code>byte</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -877,7 +878,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>short</code> to the stream message.
-     * 
+     *
      * @param value the <code>short</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -895,7 +896,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>char</code> to the stream message.
-     * 
+     *
      * @param value the <code>char</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -913,7 +914,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes an <code>int</code> to the stream message.
-     * 
+     *
      * @param value the <code>int</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -931,7 +932,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>long</code> to the stream message.
-     * 
+     *
      * @param value the <code>long</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -949,7 +950,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>float</code> to the stream message.
-     * 
+     *
      * @param value the <code>float</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -967,7 +968,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>double</code> to the stream message.
-     * 
+     *
      * @param value the <code>double</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -985,7 +986,7 @@ public class ActiveMQStreamMessage exten
 
     /**
      * Writes a <code>String</code> to the stream message.
-     * 
+     *
      * @param value the <code>String</code> value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -1011,7 +1012,7 @@ public class ActiveMQStreamMessage exten
      * The byte array <code>value</code> is written to the message as a byte
      * array field. Consecutively written byte array fields are treated as two
      * distinct fields when the fields are read.
-     * 
+     *
      * @param value the byte array value to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -1029,7 +1030,7 @@ public class ActiveMQStreamMessage exten
      * The a portion of the byte array <code>value</code> is written to the
      * message as a byte array field. Consecutively written byte array fields
      * are treated as two distinct fields when the fields are read.
-     * 
+     *
      * @param value the byte array value to be written
      * @param offset the initial offset within the byte array
      * @param length the number of bytes to use
@@ -1053,7 +1054,7 @@ public class ActiveMQStreamMessage exten
      * This method works only for the objectified primitive object types (<code>Integer</code>,
      * <code>Double</code>, <code>Long</code>&nbsp;...),
      * <code>String</code> objects, and byte arrays.
-     * 
+     *
      * @param value the Java object to be written
      * @throws JMSException if the JMS provider fails to write the message due
      *                 to some internal error.
@@ -1097,7 +1098,7 @@ public class ActiveMQStreamMessage exten
     /**
      * Puts the message body in read-only mode and repositions the stream of
      * bytes to the beginning.
-     * 
+     *
      * @throws JMSException if an internal error occurs
      */
 
@@ -1146,6 +1147,12 @@ public class ActiveMQStreamMessage exten
         }
     }
 
+    @Override
+    public void compress() throws IOException {
+        storeContent();
+        super.compress();
+    }
+
     public String toString() {
         return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java Fri Jun 22 21:53:01 2012
@@ -105,20 +105,28 @@ public class ActiveMQTextMessage extends
 
     public void beforeMarshall(WireFormat wireFormat) throws IOException {
         super.beforeMarshall(wireFormat);
+        storeContent();
+    }
 
-        ByteSequence content = getContent();
-        if (content == null && text != null) {
-            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-            OutputStream os = bytesOut;
-            ActiveMQConnection connection = getConnection();
-            if (connection != null && connection.isUseCompression()) {
-                compressed = true;
-                os = new DeflaterOutputStream(os);
+    @Override
+    public void storeContent() {
+        try {
+            ByteSequence content = getContent();
+            if (content == null && text != null) {
+                ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                OutputStream os = bytesOut;
+                ActiveMQConnection connection = getConnection();
+                if (connection != null && connection.isUseCompression()) {
+                    compressed = true;
+                    os = new DeflaterOutputStream(os);
+                }
+                DataOutputStream dataOut = new DataOutputStream(os);
+                MarshallingSupport.writeUTF8(dataOut, this.text);
+                dataOut.close();
+                setContent(bytesOut.toByteSequence());
             }
-            DataOutputStream dataOut = new DataOutputStream(os);
-            MarshallingSupport.writeUTF8(dataOut, this.text);
-            dataOut.close();
-            setContent(bytesOut.toByteSequence());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Fri Jun 22 21:53:01 2012
@@ -19,10 +19,14 @@ package org.apache.activemq.command;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.zip.DeflaterOutputStream;
+
 import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.region.Destination;
@@ -94,6 +98,7 @@ public abstract class Message extends Ba
 
     public abstract Message copy();
     public abstract void clearBody() throws JMSException;
+    public abstract void storeContent();
 
     // useful to reduce the memory footprint of a persisted message
     public void clearMarshalledState() throws JMSException {
@@ -743,6 +748,25 @@ public abstract class Message extends Ba
         return false;
     }
 
+    public void compress() throws IOException {
+        if (!isCompressed()) {
+            storeContent();
+            if (!isCompressed() && getContent() != null) {
+                doCompress();
+            }
+        }
+    }
+
+    protected void doCompress() throws IOException {
+        compressed = true;
+        ByteSequence bytes = getContent();
+        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+        OutputStream os = new DeflaterOutputStream(bytesOut);
+        os.write(bytes.data, bytes.offset, bytes.length);
+        os.close();
+        setContent(bytesOut.toByteSequence());
+    }
+
     @Override
     public String toString() {
         return toString(null);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Jun 22 21:53:01 2012
@@ -666,7 +666,7 @@ public abstract class DemandForwardingBr
         }
     }
 
-    protected Message configureMessage(MessageDispatch md) {
+    protected Message configureMessage(MessageDispatch md) throws IOException {
         Message message = md.getMessage().copy();
         // Update the packet to show where it came from.
         message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
@@ -676,6 +676,9 @@ public abstract class DemandForwardingBr
             message.setOriginalTransactionId(message.getTransactionId());
         }
         message.setTransactionId(null);
+        if (configuration.isUseCompression()) {
+            message.compress();
+        }
         return message;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Jun 22 21:53:01 2012
@@ -48,10 +48,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Forwards all messages from the local broker to the remote broker.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
- * 
+ *
+ *
  */
 public class ForwardingBridge implements Service {
 
@@ -77,6 +77,7 @@ public class ForwardingBridge implements
     private boolean dispatchAsync;
     private String destinationFilter = ">";
     private NetworkBridgeListener bridgeFailedListener;
+    private boolean useCompression = false;
 
     public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
         this.localBroker = localBroker;
@@ -232,11 +233,13 @@ public class ForwardingBridge implements
                 }
                 message.setTransactionId(null);
 
+                if (isUseCompression()) {
+                    message.compress();
+                }
+
                 if (!message.isResponseRequired()) {
-                    // If the message was originally sent using async send, we
-                    // will preserve that QOS
-                    // by bridging it using an async send (small chance of
-                    // message loss).
+                    // If the message was originally sent using async send, we will preserve that
+                    // QOS by bridging it using an async send (small chance of message loss).
                     remoteBroker.oneway(message);
                     dequeueCounter.incrementAndGet();
                     localBroker.oneway(new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
@@ -382,4 +385,19 @@ public class ForwardingBridge implements
         return enqueueCounter.get();
     }
 
+    /**
+     * @param useCompression
+     *      True if forwarded Messages should have their bodies compressed.
+     */
+    public void setUseCompression(boolean useCompression) {
+        this.useCompression = useCompression;
+    }
+
+    /**
+     * @return the vale of the useCompression setting, true if forwarded messages will be compressed.
+     */
+    public boolean isUseCompression() {
+        return useCompression;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Fri Jun 22 21:53:01 2012
@@ -16,16 +16,16 @@
  */
 package org.apache.activemq.network;
 
+import java.util.List;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 
-import java.util.List;
-
 /**
  * Configuration for a NetworkBridge
- * 
- * 
+ *
+ *
  */
 public class NetworkBridgeConfiguration {
     private boolean conduitSubscriptions = true;
@@ -43,7 +43,7 @@ public class NetworkBridgeConfiguration 
     private String password;
     private String destinationFilter = null;
     private String name = "NC";
-    
+
     private List<ActiveMQDestination> excludedDestinations;
     private List<ActiveMQDestination> dynamicallyIncludedDestinations;
     private List<ActiveMQDestination> staticallyIncludedDestinations;
@@ -53,6 +53,7 @@ public class NetworkBridgeConfiguration 
 
     private boolean alwaysSyncSend = false;
     private boolean staticBridge = false;
+    private boolean useCompression = false;
 
     /**
      * @return the conduitSubscriptions
@@ -264,41 +265,41 @@ public class NetworkBridgeConfiguration 
         this.name = name;
     }
 
-	public List<ActiveMQDestination> getExcludedDestinations() {
-		return excludedDestinations;
-	}
-
-	public void setExcludedDestinations(
-			List<ActiveMQDestination> excludedDestinations) {
-		this.excludedDestinations = excludedDestinations;
-	}
-
-	public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
-		return dynamicallyIncludedDestinations;
-	}
-
-	public void setDynamicallyIncludedDestinations(
-			List<ActiveMQDestination> dynamicallyIncludedDestinations) {
-		this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
-	}
-
-	public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
-		return staticallyIncludedDestinations;
-	}
-
-	public void setStaticallyIncludedDestinations(
-			List<ActiveMQDestination> staticallyIncludedDestinations) {
-		this.staticallyIncludedDestinations = staticallyIncludedDestinations;
-	}
-	
-    
+    public List<ActiveMQDestination> getExcludedDestinations() {
+        return excludedDestinations;
+    }
+
+    public void setExcludedDestinations(
+            List<ActiveMQDestination> excludedDestinations) {
+        this.excludedDestinations = excludedDestinations;
+    }
+
+    public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
+        return dynamicallyIncludedDestinations;
+    }
+
+    public void setDynamicallyIncludedDestinations(
+            List<ActiveMQDestination> dynamicallyIncludedDestinations) {
+        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
+    }
+
+    public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
+        return staticallyIncludedDestinations;
+    }
+
+    public void setStaticallyIncludedDestinations(
+            List<ActiveMQDestination> staticallyIncludedDestinations) {
+        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
+    }
+
+
 
     public boolean isSuppressDuplicateQueueSubscriptions() {
         return suppressDuplicateQueueSubscriptions;
     }
-    
+
     /**
-     * 
+     *
      * @param val if true, duplicate network queue subscriptions (in a cyclic network) will be suppressed
      */
     public void setSuppressDuplicateQueueSubscriptions(boolean val) {
@@ -310,13 +311,13 @@ public class NetworkBridgeConfiguration 
     }
 
     /**
-     * 
+     *
      * @param val if true, duplicate network topic subscriptions (in a cyclic network) will be suppressed
      */
     public void setSuppressDuplicateTopicSubscriptions(boolean val) {
         suppressDuplicateTopicSubscriptions  = val;
     }
-    
+
     /**
      * @return the brokerURL
      */
@@ -368,4 +369,19 @@ public class NetworkBridgeConfiguration 
     public void setStaticBridge(boolean staticBridge) {
         this.staticBridge = staticBridge;
     }
+
+    /**
+     * @param useCompression
+     *      True if the Network should enforce compression for messages sent.
+     */
+    public void setUseCompression(boolean useCompression) {
+        this.useCompression = useCompression;
+    }
+
+    /**
+     * @return the useCompression setting, true if message will be compressed on send.
+     */
+    public boolean isUseCompression() {
+        return useCompression;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/command/ActiveMQMessageTest.java Fri Jun 22 21:53:01 2012
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class ActiveMQMessageTest extends TestCase {
 
@@ -54,10 +54,10 @@ public class ActiveMQMessageTest extends
     private int jmsPriority;
     private long jmsTimestamp;
     private long[] consumerIDs;
-    
+
     /**
      * Constructor for ActiveMQMessageTest.
-     * 
+     *
      * @param name
      */
     public ActiveMQMessageTest(String name) {
@@ -87,7 +87,6 @@ public class ActiveMQMessageTest extends
         for (int i = 0; i < this.consumerIDs.length; i++) {
             this.consumerIDs[i] = i;
         }
-
     }
 
     /*
@@ -126,7 +125,6 @@ public class ActiveMQMessageTest extends
     public void testSetToForeignJMSID() throws Exception {
         ActiveMQMessage msg = new ActiveMQMessage();
         msg.setJMSMessageID("ID:EMS-SERVER.8B443C380083:429");
-
     }
 
     /*
@@ -269,7 +267,7 @@ public class ActiveMQMessageTest extends
         assertEquals(0, msg.getJMSPriority());
 
         msg.setJMSPriority(90);
-        assertEquals(9, msg.getJMSPriority());                
+        assertEquals(9, msg.getJMSPriority());
     }
 
     public void testClearProperties() throws JMSException {
@@ -356,6 +354,7 @@ public class ActiveMQMessageTest extends
         assertTrue(((Float)msg.getObjectProperty(name)).floatValue() == 1.3f);
     }
 
+    @SuppressWarnings("rawtypes")
     public void testGetPropertyNames() throws JMSException {
         ActiveMQMessage msg = new ActiveMQMessage();
         String name1 = "floatProperty";
@@ -379,6 +378,7 @@ public class ActiveMQMessageTest extends
         assertFalse("prop name4 not found", found3);
     }
 
+    @SuppressWarnings("rawtypes")
     public void testGetAllPropertyNames() throws JMSException {
         ActiveMQMessage msg = new ActiveMQMessage();
         String name1 = "floatProperty";
@@ -449,7 +449,11 @@ public class ActiveMQMessageTest extends
             }
 
             @Override
-            public void clearBody() throws JMSException {                
+            public void clearBody() throws JMSException {
+            }
+
+            @Override
+            public void storeContent() {
             }
         };
 
@@ -465,7 +469,7 @@ public class ActiveMQMessageTest extends
 
         msg.beforeMarshall(new OpenWireFormat());
 
-        Map properties = msg.getProperties();
+        Map<String, Object> properties = msg.getProperties();
         assertEquals(properties.get("stringProperty"), "string");
         assertEquals(((Byte)properties.get("byteProperty")).byteValue(), 1);
         assertEquals(((Short)properties.get("shortProperty")).shortValue(), 1);
@@ -475,7 +479,6 @@ public class ActiveMQMessageTest extends
         assertEquals(((Double)properties.get("doubleProperty")).doubleValue(), 1.1, 0);
         assertEquals(((Boolean)properties.get("booleanProperty")).booleanValue(), true);
         assertNull(properties.get("nullProperty"));
-
     }
 
     public void testSetNullProperty() throws JMSException {
@@ -942,5 +945,4 @@ public class ActiveMQMessageTest extends
         msg.setJMSExpiration(System.currentTimeMillis() + 10000);
         assertFalse(msg.isExpired());
     }
-
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java?rev=1353042&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java Fri Jun 22 21:53:01 2012
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class CompressionOverNetworkTest {
+
+    protected static final int MESSAGE_COUNT = 10;
+    private static final Logger LOG = LoggerFactory.getLogger(CompressionOverNetworkTest.class);
+
+    protected AbstractApplicationContext context;
+    protected Connection localConnection;
+    protected Connection remoteConnection;
+    protected BrokerService localBroker;
+    protected BrokerService remoteBroker;
+    protected Session localSession;
+    protected Session remoteSession;
+    protected ActiveMQTopic included;
+
+    @Test
+    public void testCompressedOverCompressedNetwork() throws Exception {
+
+        ActiveMQConnection localAmqConnection = (ActiveMQConnection) localConnection;
+        localAmqConnection.setUseCompression(true);
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createTextMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(1000);
+        assertNotNull(msg);
+        ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getText());
+    }
+
+    @Test
+    public void testTextMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createTextMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(1000);
+        assertNotNull(msg);
+        ActiveMQTextMessage message = (ActiveMQTextMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getText());
+    }
+
+    @Test
+    public void testBytesMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        byte[] bytes = payload.toString().getBytes("UTF-8");
+
+        BytesMessage test = localSession.createBytesMessage();
+        test.writeBytes(bytes);
+        producer.send(test);
+        Message msg = consumer1.receive(1000*6000);
+        assertNotNull(msg);
+        ActiveMQBytesMessage message = (ActiveMQBytesMessage) msg;
+        assertTrue(message.isCompressed());
+        assertTrue(message.getContent().getLength() < bytes.length);
+
+        byte[] result = new byte[bytes.length];
+        assertEquals(bytes.length, message.readBytes(result));
+        assertEquals(-1, message.readBytes(result));
+
+        for(int i = 0; i < bytes.length; ++i) {
+            assertEquals(bytes[i], result[i]);
+        }
+    }
+
+    @Test
+    public void testStreamMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StreamMessage test = localSession.createStreamMessage();
+
+        for (int i = 0; i < 100; ++i) {
+            test.writeString("test string: " + i);
+        }
+
+        producer.send(test);
+        Message msg = consumer1.receive(1000);
+        assertNotNull(msg);
+        ActiveMQStreamMessage message = (ActiveMQStreamMessage) msg;
+        assertTrue(message.isCompressed());
+
+        for (int i = 0; i < 100; ++i) {
+            assertEquals("test string: " + i, message.readString());
+        }
+    }
+
+    @Test
+    public void testMapMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        MapMessage test = localSession.createMapMessage();
+
+        for (int i = 0; i < 100; ++i) {
+            test.setString(Integer.toString(i), "test string: " + i);
+        }
+
+        producer.send(test);
+        Message msg = consumer1.receive(1000);
+        assertNotNull(msg);
+        ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
+        assertTrue(message.isCompressed());
+
+        for (int i = 0; i < 100; ++i) {
+            assertEquals("test string: " + i, message.getString(Integer.toString(i)));
+        }
+    }
+
+    @Test
+    public void testObjectMessageCompression() throws Exception {
+
+        MessageConsumer consumer1 = remoteSession.createConsumer(included);
+        MessageProducer producer = localSession.createProducer(included);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        StringBuilder payload = new StringBuilder("test-");
+        for (int i = 0; i < 100; ++i) {
+            payload.append(UUID.randomUUID().toString());
+        }
+
+        Message test = localSession.createObjectMessage(payload.toString());
+        producer.send(test);
+        Message msg = consumer1.receive(1000);
+        assertNotNull(msg);
+        ActiveMQObjectMessage message = (ActiveMQObjectMessage) msg;
+        assertTrue(message.isCompressed());
+        assertEquals(payload.toString(), message.getObject());
+    }
+
+    private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
+        assertTrue("Internal bridge consumers registered in time", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Object[] bridges = brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
+                if (bridges.length > 0) {
+                    LOG.info(brokerService + " bridges "  + Arrays.toString(bridges));
+                    DemandForwardingBridgeSupport demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
+                    ConcurrentHashMap<ConsumerId, DemandSubscription> forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
+                    LOG.info(brokerService + " bridge "  + demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
+                    if (!forwardingBridges.isEmpty()) {
+                        for (DemandSubscription demandSubscription : forwardingBridges.values()) {
+                            if (demandSubscription.getLocalInfo().getDestination().equals(destination)) {
+                                LOG.info(brokerService + " DemandSubscription "  + demandSubscription + ", size: " + demandSubscription.size());
+                                return demandSubscription.size() >= min;
+                            }
+                        }
+                    }
+                }
+                return false;
+            }
+        }));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        doSetUp(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+        localConnection.close();
+        remoteConnection.close();
+        localBroker.stop();
+        remoteBroker.stop();
+    }
+
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        remoteBroker = createRemoteBroker();
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        localBroker = createLocalBroker();
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        included = new ActiveMQTopic("include.test.bar");
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected String getRemoteBrokerURI() {
+        return "org/apache/activemq/network/remoteBroker.xml";
+    }
+
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/localBroker.xml";
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        Resource resource = new ClassPathResource(uri);
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        resource = new ClassPathResource(uri);
+        factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        BrokerService result = factory.getBroker();
+
+        for (NetworkConnector connector : result.getNetworkConnectors()) {
+            connector.setUseCompression(true);
+        }
+
+        return result;
+    }
+
+    protected BrokerService createLocalBroker() throws Exception {
+        return createBroker(getLocalBrokerURI());
+    }
+
+    protected BrokerService createRemoteBroker() throws Exception {
+        return createBroker(getRemoteBrokerURI());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/CompressionOverNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java?rev=1353042&r1=1353041&r2=1353042&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java Fri Jun 22 21:53:01 2012
@@ -19,8 +19,10 @@ package org.apache.activemq.network;
 import javax.jms.DeliveryMode;
 
 import junit.framework.Test;
+
 import org.apache.activemq.broker.StubConnection;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
@@ -34,6 +36,56 @@ public class ForwardingBridgeTest extend
     public int deliveryMode;
     private ForwardingBridge bridge;
 
+    public void initCombosForTestForwardMessageCompressed() {
+        addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
+                                                           new Integer(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE),
+                                                              new Byte(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testForwardMessageCompressed() throws Exception {
+
+        bridge.setUseCompression(true);
+
+        // Start a producer on local broker
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        // Start a consumer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination);
+        connection2.send(consumerInfo);
+        Thread.sleep(1000);
+        // Give forwarding bridge a chance to finish setting up
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+
+        // Send the message to the local boker.
+        connection1.send(createMessage(producerInfo, destination, deliveryMode));
+
+        // Make sure the message was delivered via the remote.
+        Message m = receiveMessage(connection2);
+        assertNotNull(m);
+
+        // Make sure its compressed now
+        ActiveMQMessage message = (ActiveMQMessage) m;
+        assertTrue(message.isCompressed());
+    }
+
     public void initCombosForTestAddConsumerThenSend() {
         addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT),
                                                            new Integer(DeliveryMode.PERSISTENT)});