You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/12/23 15:08:46 UTC
[1/2] activemq-artemis git commit: ARTEMIS-902 OpenWire Compression
Issue
Repository: activemq-artemis
Updated Branches:
refs/heads/master 22f0fcf08 -> 79b48767d
ARTEMIS-902 OpenWire Compression Issue
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/eecbbb18
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/eecbbb18
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/eecbbb18
Branch: refs/heads/master
Commit: eecbbb18db195d97bb0135362be67acb0cc6a793
Parents: 22f0fcf
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Dec 23 21:29:28 2016 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Fri Dec 23 21:29:28 2016 +0800
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 16 +++--
.../openwire/interop/CompressedInteropTest.java | 69 +++++++++++---------
2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/eecbbb18/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index f49c972..950210b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -490,10 +490,11 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
- out = new DeflaterOutputStream(out);
+ out = new DeflaterOutputStream(out, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
MarshallingSupport.writeUTF8(dataOut, text.toString());
+ dataOut.flush();
bytes = bytesOut.toByteArray();
}
}
@@ -506,10 +507,11 @@ public class OpenWireMessageConverter implements MessageConverter {
ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
OutputStream os = out;
if (isCompressed) {
- os = new DeflaterOutputStream(os);
+ os = new DeflaterOutputStream(os, true);
}
try (DataOutputStream dataOut = new DataOutputStream(os)) {
MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.flush();
}
bytes = out.toByteArray();
}
@@ -520,8 +522,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes);
if (isCompressed) {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
+ try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
+ out.flush();
}
bytes = bytesOut.toByteArray();
}
@@ -529,7 +532,7 @@ public class OpenWireMessageConverter implements MessageConverter {
org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
OutputStream out = bytesOut;
if (isCompressed) {
- out = new DeflaterOutputStream(bytesOut);
+ out = new DeflaterOutputStream(bytesOut, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
@@ -583,6 +586,7 @@ public class OpenWireMessageConverter implements MessageConverter {
stop = true;
break;
}
+ dataOut.flush();
}
}
bytes = bytesOut.toByteArray();
@@ -602,6 +606,7 @@ public class OpenWireMessageConverter implements MessageConverter {
int count = deflater.deflate(bytesBuf);
compressed.write(bytesBuf, 0, count);
}
+ compressed.flush();
ByteSequence byteSeq = compressed.toByteSequence();
ByteSequenceData.writeIntBig(byteSeq, length);
bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
@@ -615,8 +620,9 @@ public class OpenWireMessageConverter implements MessageConverter {
buffer.readBytes(bytes);
if (isCompressed) {
try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- DeflaterOutputStream out = new DeflaterOutputStream(bytesOut)) {
+ DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
+ out.flush();
bytes = bytesOut.toByteArray();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/eecbbb18/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
index ada2e55..1966609 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/CompressedInteropTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.interop;
import javax.jms.BytesMessage;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -63,24 +64,32 @@ public class CompressedInteropTest extends BasicOpenWireTest {
xaFactory.setUseCompression(true);
}
-
@Test
public void testCoreReceiveOpenWireCompressedMessages() throws Exception {
+ testCompressedMessageSendReceive(true);
+ }
+
+ @Test
+ public void testOpenWireReceiveOpenWireCompressedMessages() throws Exception {
+ testCompressedMessageSendReceive(false);
+ }
+
+ private void testCompressedMessageSendReceive(boolean useCore) throws Exception {
//TextMessage
sendCompressedTextMessageUsingOpenWire();
- receiveTextMessageUsingCore();
+ receiveTextMessage(useCore);
//BytesMessage
sendCompressedBytesMessageUsingOpenWire();
- receiveBytesMessageUsingCore();
+ receiveBytesMessage(useCore);
//MapMessage
sendCompressedMapMessageUsingOpenWire();
- receiveMapMessageUsingCore();
+ receiveMapMessage(useCore);
//StreamMessage
sendCompressedStreamMessageUsingOpenWire();
- receiveStreamMessageUsingCore();
+ receiveStreamMessage(useCore);
//ObjectMessage
sendCompressedObjectMessageUsingOpenWire();
- receiveObjectMessageUsingCore();
+ receiveObjectMessage(useCore);
}
private void sendCompressedStreamMessageUsingOpenWire() throws Exception {
@@ -106,8 +115,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(streamMessage);
}
- private void receiveStreamMessageUsingCore() throws Exception {
- StreamMessage streamMessage = (StreamMessage) receiveMessageUsingCore();
+ private void receiveStreamMessage(boolean useCore) throws Exception {
+ StreamMessage streamMessage = (StreamMessage) receiveMessage(useCore);
boolean booleanVal = streamMessage.readBoolean();
assertTrue(booleanVal);
byte byteVal = streamMessage.readByte();
@@ -149,8 +158,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(objectMessage);
}
- private void receiveObjectMessageUsingCore() throws Exception {
- ObjectMessage objectMessage = (ObjectMessage) receiveMessageUsingCore();
+ private void receiveObjectMessage(boolean useCore) throws Exception {
+ ObjectMessage objectMessage = (ObjectMessage) receiveMessage(useCore);
Object objectVal = objectMessage.getObject();
assertEquals(TEXT, objectVal);
}
@@ -178,8 +187,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(mapMessage);
}
- private void receiveMapMessageUsingCore() throws Exception {
- MapMessage mapMessage = (MapMessage) receiveMessageUsingCore();
+ private void receiveMapMessage(boolean useCore) throws Exception {
+ MapMessage mapMessage = (MapMessage) receiveMessage(useCore);
boolean booleanVal = mapMessage.getBoolean("boolean-type");
assertTrue(booleanVal);
@@ -222,8 +231,8 @@ public class CompressedInteropTest extends BasicOpenWireTest {
producer.send(bytesMessage);
}
- private void receiveBytesMessageUsingCore() throws Exception {
- BytesMessage bytesMessage = (BytesMessage) receiveMessageUsingCore();
+ private void receiveBytesMessage(boolean useCore) throws Exception {
+ BytesMessage bytesMessage = (BytesMessage) receiveMessage(useCore);
byte[] bytes = new byte[TEXT.getBytes(StandardCharsets.UTF_8).length];
bytesMessage.readBytes(bytes);
@@ -233,16 +242,28 @@ public class CompressedInteropTest extends BasicOpenWireTest {
assertEquals(TEXT, rcvString);
}
- private void receiveTextMessageUsingCore() throws Exception {
- TextMessage txtMessage = (TextMessage) receiveMessageUsingCore();
+ private void receiveTextMessage(boolean useCore) throws Exception {
+ TextMessage txtMessage = (TextMessage) receiveMessage(useCore);
assertEquals(TEXT, txtMessage.getText());
}
- private Message receiveMessageUsingCore() throws Exception {
+ private void sendCompressedTextMessageUsingOpenWire() throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
+
+ TextMessage textMessage = session.createTextMessage(TEXT);
+
+ producer.send(textMessage);
+ }
+
+ private Message receiveMessage(boolean useCore) throws Exception {
+ ConnectionFactory factoryToUse = useCore ? coreCf : factory;
Connection jmsConn = null;
Message message = null;
try {
- jmsConn = coreCf.createConnection();
+ jmsConn = factoryToUse.createConnection();
jmsConn.start();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -257,16 +278,4 @@ public class CompressedInteropTest extends BasicOpenWireTest {
}
return message;
}
-
- private void sendCompressedTextMessageUsingOpenWire() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
-
- final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
-
- TextMessage textMessage = session.createTextMessage(TEXT);
-
- producer.send(textMessage);
- }
-
}
[2/2] activemq-artemis git commit: This closes #942 ARTEMIS-902
OpenWire Compression Issue
Posted by an...@apache.org.
This closes #942 ARTEMIS-902 OpenWire Compression Issue
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/79b48767
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/79b48767
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/79b48767
Branch: refs/heads/master
Commit: 79b48767d4e0e25306c9e357f342166a88269417
Parents: 22f0fcf eecbbb1
Author: Andy Taylor <an...@gmail.com>
Authored: Fri Dec 23 15:08:03 2016 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Fri Dec 23 15:08:03 2016 +0000
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 16 +++--
.../openwire/interop/CompressedInteropTest.java | 69 +++++++++++---------
2 files changed, 50 insertions(+), 35 deletions(-)
----------------------------------------------------------------------