You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/12/20 21:00:21 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1566 Openwire client can't receive compressed messages

ARTEMIS-1566 Openwire client can't receive compressed messages

When openwire client uses compressed option to send messages
(jms.useCompression=true) openwire client failed to receive them.
The reason is in OpenwireMessageConverter.toAMQMessage():

1. message.setContent() should be called after setting properties
 (It will cause the compressed content to decompressed before delivering to clients)
2. message.onSend() should not be called here (it should be used
by producers. If used here it changes the internal flags of the
message and cause receive to fail).


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6792dcdf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6792dcdf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6792dcdf

Branch: refs/heads/master
Commit: 6792dcdf2745ca9c2d735b56511a01f1168da01b
Parents: bd8ec58
Author: Howard Gao <ho...@gmail.com>
Authored: Tue Dec 19 21:15:05 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 20 16:00:14 2017 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 17 ++--
 .../openwire/SimpleOpenWireTest.java            | 94 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6792dcdf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index a5bb0f9..6af9997 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -489,9 +489,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       amqMsg.setCompressed(isCompressed);
 
+      byte[] bytes = null;
       if (buffer != null) {
          buffer.resetReaderIndex();
-         byte[] bytes = null;
          synchronized (buffer) {
             if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
                SimpleString text = buffer.readNullableSimpleString();
@@ -642,11 +642,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             buffer.resetReaderIndex();// this is important for topics as the buffer
             // may be read multiple times
          }
-
-         if (bytes != null) {
-            ByteSequence content = new ByteSequence(bytes);
-            amqMsg.setContent(content);
-         }
       }
 
       //we need check null because messages may come from other clients
@@ -805,11 +800,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             }
          }
       }
-      try {
-         amqMsg.onSend();
-         amqMsg.setCompressed(isCompressed);
-      } catch (JMSException e) {
-         throw new IOException("Failed to covert to Openwire message", e);
+
+      amqMsg.setCompressed(isCompressed);
+      if (bytes != null) {
+         ByteSequence content = new ByteSequence(bytes);
+         amqMsg.setContent(content);
       }
       return amqMsg;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6792dcdf/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index ff31c18..4c5b957 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -76,6 +76,10 @@ import org.junit.Test;
 
 public class SimpleOpenWireTest extends BasicOpenWireTest {
 
+   private final String testString = "simple test string";
+   private final String testProp = "BASE_DATE";
+   private final String propValue = "2017-11-01";
+
    @Override
    @Before
    public void setUp() throws Exception {
@@ -331,6 +335,95 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
    }
 
    @Test
+   public void testCompression() throws Exception {
+
+      Connection cconnection = null;
+      Connection connection = null;
+      try {
+         ActiveMQConnectionFactory cfactory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "");
+         cconnection = cfactory.createConnection();
+         cconnection.start();
+         Session csession = cconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue cQueue = csession.createQueue(queueName);
+         MessageConsumer consumer = csession.createConsumer(cQueue);
+
+         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "?jms.useCompression=true");
+         connection = factory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(queueName);
+
+         MessageProducer producer = session.createProducer(queue);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         //text
+         TextMessage textMessage = session.createTextMessage();
+         textMessage.setText(testString);
+         TextMessage receivedMessage = sendAndReceive(textMessage, producer, consumer);
+
+         String receivedText = receivedMessage.getText();
+         assertEquals(testString, receivedText);
+
+         //MapMessage
+         MapMessage mapMessage = session.createMapMessage();
+         mapMessage.setString(testProp, propValue);
+         MapMessage receivedMapMessage = sendAndReceive(mapMessage, producer, consumer);
+         String value = receivedMapMessage.getString(testProp);
+         assertEquals(propValue, value);
+
+         //Object
+         ObjectMessage objMessage = session.createObjectMessage();
+         objMessage.setObject(testString);
+         ObjectMessage receivedObjMessage = sendAndReceive(objMessage, producer, consumer);
+         String receivedObj = (String) receivedObjMessage.getObject();
+         assertEquals(testString, receivedObj);
+
+         //Stream
+         StreamMessage streamMessage = session.createStreamMessage();
+         streamMessage.writeString(testString);
+         StreamMessage receivedStreamMessage = sendAndReceive(streamMessage, producer, consumer);
+         String streamValue = receivedStreamMessage.readString();
+         assertEquals(testString, streamValue);
+
+         //byte
+         BytesMessage byteMessage = session.createBytesMessage();
+         byte[] bytes = testString.getBytes();
+         byteMessage.writeBytes(bytes);
+
+         BytesMessage receivedByteMessage = sendAndReceive(byteMessage, producer, consumer);
+         long receivedBodylength = receivedByteMessage.getBodyLength();
+
+         assertEquals("bodylength Correct", bytes.length, receivedBodylength);
+
+         byte[] receivedBytes = new byte[(int) receivedBodylength];
+         receivedByteMessage.readBytes(receivedBytes);
+
+         String receivedString = new String(receivedBytes);
+         assertEquals(testString, receivedString);
+
+         //Message
+         Message m = session.createMessage();
+         sendAndReceive(m, producer, consumer);
+      } finally {
+         if (cconnection != null) {
+            connection.close();
+         }
+         if (connection != null) {
+            cconnection.close();
+         }
+      }
+
+   }
+
+   private <T extends Message> T sendAndReceive(T m, MessageProducer producer, MessageConsumer consumer) throws JMSException {
+      m.setStringProperty(testProp, propValue);
+      producer.send(m);
+      T receivedMessage = (T) consumer.receive(1000);
+      String receivedProp = receivedMessage.getStringProperty(testProp);
+      assertEquals(propValue, receivedProp);
+      return receivedMessage;
+   }
+
+   @Test
    public void testSimpleQueue() throws Exception {
       connection.start();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -1523,6 +1616,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
          //close first connection, let temp queue die
          connection1.close();
 
+         waitForBindings(this.server, tempQueue.getQueueName(), true, 0, 0, 5000);
          //send again
          try {
             producer.send(m);