You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/10/02 18:58:04 UTC
[1/2] git commit: add basic MapMessage integration test
Repository: qpid-jms
Updated Branches:
refs/heads/master d4218476a -> 58b48e5a8
add basic MapMessage integration test
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/58b48e5a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/58b48e5a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/58b48e5a
Branch: refs/heads/master
Commit: 58b48e5a894a8751ea77e476f74c6bf97999c7fb
Parents: 5e1834b
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Oct 2 17:53:49 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Oct 2 17:57:02 2014 +0100
----------------------------------------------------------------------
.../integration/MapMessageIntegrationTest.java | 225 +++++++++++++++++++
1 file changed, 225 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/58b48e5a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
new file mode 100644
index 0000000..40022da
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class MapMessageIntegrationTest extends QpidJmsTestCase {
+ private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+ /**
+ * Test that a message received from the test peer with an AmqpValue section containing
+ * a map which holds entries of the various supported entry types is returned as a
+ * {@link MapMessage}, and verify the values can all be retrieved as expected.
+ */
+ @Test
+ public void testReceiveBasicMapMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Prepare an AMQP message for the test peer to send, containing an
+ // AmqpValue section holding a map with entries for each supported type,
+ // and annotated as a JMS map message.
+ String myBoolKey = "myBool";
+ boolean myBool = true;
+ String myByteKey = "myByte";
+ byte myByte = 4;
+ String myBytesKey = "myBytes";
+ byte[] myBytes = myBytesKey.getBytes();
+ String myCharKey = "myChar";
+ char myChar = 'd';
+ String myDoubleKey = "myDouble";
+ double myDouble = 1234567890123456789.1234;
+ String myFloatKey = "myFloat";
+ float myFloat = 1.1F;
+ String myIntKey = "myInt";
+ int myInt = Integer.MAX_VALUE;
+ String myLongKey = "myLong";
+ long myLong = Long.MAX_VALUE;
+ String myShortKey = "myShort";
+ short myShort = 25;
+ String myStringKey = "myString";
+ String myString = myStringKey;
+
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ map.put(myBoolKey, myBool);
+ map.put(myByteKey, myByte);
+ map.put(myBytesKey, new Binary(myBytes));// the underlying AMQP message uses Binary rather than byte[] directly.
+ map.put(myCharKey, myChar);
+ map.put(myDoubleKey, myDouble);
+ map.put(myFloatKey, myFloat);
+ map.put(myIntKey, myInt);
+ map.put(myLongKey, myLong);
+ map.put(myShortKey, myShort);
+ map.put(myStringKey, myString);
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_MAP_MESSAGE);
+
+ DescribedType amqpValueSectionContent = new AmqpValueDescribedType(map);
+
+ // receive the message from the test peer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ // verify the content is as expected
+ assertNotNull("Message was not received", receivedMessage);
+ assertTrue("Message was not a MapMessage", receivedMessage instanceof MapMessage);
+ MapMessage receivedMapMessage = (MapMessage) receivedMessage;
+
+ assertEquals("Unexpected boolean value", myBool, receivedMapMessage.getBoolean(myBoolKey));
+ assertEquals("Unexpected byte value", myByte, receivedMapMessage.getByte(myByteKey));
+ byte[] readBytes = receivedMapMessage.getBytes(myBytesKey);
+ assertTrue("Read bytes were not as expected: " + Arrays.toString(readBytes), Arrays.equals(myBytes, readBytes));
+ assertEquals("Unexpected char value", myChar, receivedMapMessage.getChar(myCharKey));
+ assertEquals("Unexpected double value", myDouble, receivedMapMessage.getDouble(myDoubleKey), 0.0);
+ assertEquals("Unexpected float value", myFloat, receivedMapMessage.getFloat(myFloatKey), 0.0);
+ assertEquals("Unexpected int value", myInt, receivedMapMessage.getInt(myIntKey));
+ assertEquals("Unexpected long value", myLong, receivedMapMessage.getLong(myLongKey));
+ assertEquals("Unexpected short value", myShort, receivedMapMessage.getShort(myShortKey));
+ assertEquals("Unexpected UTF value", myString, receivedMapMessage.getString(myStringKey));
+ }
+ }
+
+ /*
+ * TODO: decide what to do about this
+ *
+ * The test below fails if a char is added and matched, unless we cast the matcher to expect an int.
+ * This is because the DataImpl-based decoder used by the test peer decodes the char to an Integer object
+ * and thus the EncodedAmqpValueMatcher would fail the comparison of its contained map due to the differing types.
+ * This doesn't happen in the above test as the reversed roles mean it is protons DecoderImpl doing the decoding
+ * and it does a similarly ugly cast on the integer value to char before output.
+ */
+ @Test
+ public void testSendBasicMapMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ String myBoolKey = "myBool";
+ boolean myBool = true;
+ String myByteKey = "myByte";
+ byte myByte = 4;
+ String myBytesKey = "myBytes";
+ byte[] myBytes = myBytesKey.getBytes();
+ String myCharKey = "myChar";
+ char myChar = 'd';
+ String myDoubleKey = "myDouble";
+ double myDouble = 1234567890123456789.1234;
+ String myFloatKey = "myFloat";
+ float myFloat = 1.1F;
+ String myIntKey = "myInt";
+ int myInt = Integer.MAX_VALUE;
+ String myLongKey = "myLong";
+ long myLong = Long.MAX_VALUE;
+ String myShortKey = "myShort";
+ short myShort = 25;
+ String myStringKey = "myString";
+ String myString = myStringKey;
+
+ // Prepare a MapMessage to send to the test peer to send
+ MapMessage mapMessage = session.createMapMessage();
+
+ mapMessage.setBoolean(myBoolKey, myBool);
+ mapMessage.setByte(myByteKey, myByte);
+ mapMessage.setBytes(myBytesKey, myBytes);
+ mapMessage.setChar(myCharKey, myChar);
+ mapMessage.setDouble(myDoubleKey, myDouble);
+ mapMessage.setFloat(myFloatKey, myFloat);
+ mapMessage.setInt(myIntKey, myInt);
+ mapMessage.setLong(myLongKey, myLong);
+ mapMessage.setShort(myShortKey, myShort);
+ mapMessage.setString(myStringKey, myString);
+
+ // prepare a matcher for the test peer to use to receive and verify the message
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ map.put(myBoolKey, myBool);
+ map.put(myByteKey, myByte);
+ map.put(myBytesKey, new Binary(myBytes));// the underlying AMQP message uses Binary rather than byte[] directly.
+ // TODO: see note above to explain the ugly cast
+ map.put(myCharKey, (int) myChar);
+ map.put(myDoubleKey, myDouble);
+ map.put(myFloatKey, myFloat);
+ map.put(myIntKey, myInt);
+ map.put(myLongKey, myLong);
+ map.put(myShortKey, myShort);
+ map.put(myStringKey, myString);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE), equalTo(AmqpMessageSupport.JMS_MAP_MESSAGE));
+ MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
+
+ // send the message
+ testPeer.expectTransfer(messageMatcher);
+ producer.send(mapMessage);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] git commit: add basic BytesMessage integration test
Posted by ro...@apache.org.
add basic BytesMessage integration test
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5e1834b0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5e1834b0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5e1834b0
Branch: refs/heads/master
Commit: 5e1834b0f60172eb751211f9a2f0d8ed60c601a6
Parents: d421847
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Oct 2 17:32:04 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Oct 2 17:57:02 2014 +0100
----------------------------------------------------------------------
.../BytesMessageIntegrationTest.java | 342 +++++++++++++++++++
1 file changed, 342 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5e1834b0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
new file mode 100644
index 0000000..1c5648d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class BytesMessageIntegrationTest extends QpidJmsTestCase {
+ private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+ @Test
+ public void testSendBasicBytesMessageWithContent() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ byte[] content = "myBytes".getBytes();
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE), equalTo(AmqpMessageSupport.JMS_BYTES_MESSAGE));
+ MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+ propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new Binary(content)));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(content);
+
+ producer.send(message);
+ }
+ }
+
+ @Test
+ public void testReceiveBasicBytesMessageWithContentUsingDataSection() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+ properties.setContentType(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE);
+
+ final byte[] expectedContent = "expectedContent".getBytes();
+ DescribedType dataContent = new DataDescribedType(new Binary(expectedContent));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof BytesMessage);
+ BytesMessage bytesMessage = (BytesMessage) receivedMessage;
+ assertEquals(expectedContent.length, bytesMessage.getBodyLength());
+ byte[] recievedContent = new byte[expectedContent.length];
+ int readBytes = bytesMessage.readBytes(recievedContent);
+ assertEquals(recievedContent.length, readBytes);
+ assertTrue(Arrays.equals(expectedContent, recievedContent));
+ }
+ }
+
+ /**
+ * Test that a message received from the test peer with a Data section and content type of
+ * {@link AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} is returned as a BytesMessage, verify it
+ * gives the expected data values when read, and when reset and left mid-stream before being
+ * resent that it results in the expected AMQP data body section and properties content type
+ * being received by the test peer.
+ */
+ @Test
+ public void testReceiveBytesMessageAndResendAfterResetAndPartialRead() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Prepare an AMQP message for the test peer to send, containing the content type and
+ // a data body section populated with expected bytes for use as a JMS BytesMessage
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+ Symbol contentType = Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
+ properties.setContentType(contentType);
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE);
+
+ boolean myBool = true;
+ byte myByte = 4;
+ byte[] myBytes = "myBytes".getBytes();
+ char myChar = 'd';
+ double myDouble = 1234567890123456789.1234;
+ float myFloat = 1.1F;
+ int myInt = Integer.MAX_VALUE;
+ long myLong = Long.MAX_VALUE;
+ short myShort = 25;
+ String myUTF = "myString";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ dos.writeBoolean(myBool);
+ dos.writeByte(myByte);
+ dos.write(myBytes);
+ dos.writeChar(myChar);
+ dos.writeDouble(myDouble);
+ dos.writeFloat(myFloat);
+ dos.writeInt(myInt);
+ dos.writeLong(myLong);
+ dos.writeShort(myShort);
+ dos.writeUTF(myUTF);
+
+ byte[] bytesPayload = baos.toByteArray();
+ Binary binaryPayload = new Binary(bytesPayload);
+ DescribedType dataSectionContent = new DataDescribedType(binaryPayload);
+
+ // receive the message from the test peer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataSectionContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ // verify the content is as expected
+ assertNotNull("Message was not received", receivedMessage);
+ assertTrue("Message was not a BytesMessage", receivedMessage instanceof BytesMessage);
+ BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
+
+ assertEquals("Unexpected boolean value", myBool, receivedBytesMessage.readBoolean());
+ assertEquals("Unexpected byte value", myByte, receivedBytesMessage.readByte());
+ byte[] readBytes = new byte[myBytes.length];
+ assertEquals("Did not read the expected number of bytes", myBytes.length, receivedBytesMessage.readBytes(readBytes));
+ assertTrue("Read bytes were not as expected: " + Arrays.toString(readBytes), Arrays.equals(myBytes, readBytes));
+ assertEquals("Unexpected char value", myChar, receivedBytesMessage.readChar());
+ assertEquals("Unexpected double value", myDouble, receivedBytesMessage.readDouble(), 0.0);
+ assertEquals("Unexpected float value", myFloat, receivedBytesMessage.readFloat(), 0.0);
+ assertEquals("Unexpected int value", myInt, receivedBytesMessage.readInt());
+ assertEquals("Unexpected long value", myLong, receivedBytesMessage.readLong());
+ assertEquals("Unexpected short value", myShort, receivedBytesMessage.readShort());
+ assertEquals("Unexpected UTF value", myUTF, receivedBytesMessage.readUTF());
+
+ // reset and read the first item, leaving message marker in the middle of its content
+ receivedBytesMessage.reset();
+ assertEquals("Unexpected boolean value after reset", myBool, receivedBytesMessage.readBoolean());
+
+ // Send the received message back to the test peer and have it check the result is as expected
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ propsMatcher.withContentType(equalTo(contentType));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(binaryPayload));
+ testPeer.expectTransfer(messageMatcher);
+
+ producer.send(receivedBytesMessage);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ /**
+ * Test that a message received from the test peer with an AmqpValue section containing
+ * Binary and no content type is returned as a BytesMessage, verify it gives the
+ * expected data values when read, and when sent to the test peer it results in an
+ * AMQP message containing a data body section and content type of
+ * {@link AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE}
+ */
+ @Test
+ public void testReceiveBytesMessageWithAmqpValueAndResendResultsInData() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Prepare an AMQP message for the test peer to send, containing an amqp-value
+ // body section populated with expected bytes for use as a JMS BytesMessage,
+ // and do not set content type, or the message type annotation
+
+ boolean myBool = true;
+ byte myByte = 4;
+ byte[] myBytes = "myBytes".getBytes();
+ char myChar = 'd';
+ double myDouble = 1234567890123456789.1234;
+ float myFloat = 1.1F;
+ int myInt = Integer.MAX_VALUE;
+ long myLong = Long.MAX_VALUE;
+ short myShort = 25;
+ String myUTF = "myString";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ dos.writeBoolean(myBool);
+ dos.writeByte(myByte);
+ dos.write(myBytes);
+ dos.writeChar(myChar);
+ dos.writeDouble(myDouble);
+ dos.writeFloat(myFloat);
+ dos.writeInt(myInt);
+ dos.writeLong(myLong);
+ dos.writeShort(myShort);
+ dos.writeUTF(myUTF);
+
+ byte[] bytesPayload = baos.toByteArray();
+ Binary binaryPayload = new Binary(bytesPayload);
+
+ DescribedType amqpValueSectionContent = new AmqpValueDescribedType(binaryPayload);
+
+ // receive the message from the test peer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueSectionContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ // verify the content is as expected
+ assertNotNull("Message was not received", receivedMessage);
+ assertTrue("Message was not a BytesMessage", receivedMessage instanceof BytesMessage);
+ BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
+
+ assertEquals("Unexpected boolean value", myBool, receivedBytesMessage.readBoolean());
+ assertEquals("Unexpected byte value", myByte, receivedBytesMessage.readByte());
+ byte[] readBytes = new byte[myBytes.length];
+ assertEquals("Did not read the expected number of bytes", myBytes.length, receivedBytesMessage.readBytes(readBytes));
+ assertTrue("Read bytes were not as expected: " + Arrays.toString(readBytes), Arrays.equals(myBytes, readBytes));
+ assertEquals("Unexpected char value", myChar, receivedBytesMessage.readChar());
+ assertEquals("Unexpected double value", myDouble, receivedBytesMessage.readDouble(), 0.0);
+ assertEquals("Unexpected float value", myFloat, receivedBytesMessage.readFloat(), 0.0);
+ assertEquals("Unexpected int value", myInt, receivedBytesMessage.readInt());
+ assertEquals("Unexpected long value", myLong, receivedBytesMessage.readLong());
+ assertEquals("Unexpected short value", myShort, receivedBytesMessage.readShort());
+ assertEquals("Unexpected UTF value", myUTF, receivedBytesMessage.readUTF());
+
+ // reset and read the first item, leaving message marker in the middle of its content
+ receivedBytesMessage.reset();
+ assertEquals("Unexpected boolean value after reset", myBool, receivedBytesMessage.readBoolean());
+
+ // Send the received message back to the test peer and have it check the result is as expected
+ testPeer.expectSenderAttach();
+ MessageProducer producer = session.createProducer(queue);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.JMS_MSG_TYPE), equalTo(AmqpMessageSupport.JMS_BYTES_MESSAGE));
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ propsMatcher.withContentType(equalTo(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(binaryPayload));
+ testPeer.expectTransfer(messageMatcher);
+
+ producer.send(receivedBytesMessage);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org