You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/11/20 15:34:56 UTC
svn commit: r1543828 [1/3] - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/
test/java/org/apache/qpid/jms/impl/ test/java/org/apache...
Author: robbie
Date: Wed Nov 20 14:34:55 2013
New Revision: 1543828
URL: http://svn.apache.org/r1543828
Log:
QPIDJMS-9: initial work on Message implementation and mapping to/from AMQP messages
- Add some initial stub Message implementations
- Initial support constructing appropriate Message type instances based on the incoming AMQP payload
- Add abilities to TestAmqpPeer for sending and matching additional message sections such as MessageAnnotations, Properties, and Data.
- Add some integration tests for sending/receiving TextMessage and BytesMessage
Added:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
- copied, changed from r1529205, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/MapDescribedType.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/AmqpValueDescribedType.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/DataDescribedType.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/HeaderDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/MessageAnnotationsDescribedType.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/PropertiesDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/generate-list-sections.xsl
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/AbstractMessageSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageHeaderSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageListSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageMapSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessagePropertiesSectionMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/generate-message-section-matchers.xsl
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpTypeMatcher.java
- copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/EncodedAmqpValueMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedDataMatcher.java
Removed:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/EncodedAmqpValueMatcher.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpBytesMessage extends AmqpMessage
+{
+ public static final String CONTENT_TYPE = "application/octet-stream";
+ private long _length;
+
+ public AmqpBytesMessage()
+ {
+ super();
+ setContentType(CONTENT_TYPE);
+ }
+
+ public AmqpBytesMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
+ }
+
+ public void setBytes(byte[] bytes)
+ {
+ getMessage().setBody(new Data(new Binary(bytes)));
+ }
+
+ public ByteArrayInputStream getByteArrayInputStream()
+ {
+ Section body = getMessage().getBody();
+
+ if(body == null)
+ {
+ _length = 0;
+ return createEmptyByteArrayInputStream();
+ }
+ else if(body instanceof AmqpValue)
+ {
+ Object value = ((AmqpValue) body).getValue();
+
+ if(value == null)
+ {
+ _length = 0;
+ return createEmptyByteArrayInputStream();
+ }
+ if(value instanceof Binary)
+ {
+ Binary b = (Binary)value;
+ _length = b.getLength();
+ return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected body content type: " + value.getClass().getSimpleName());
+ }
+ }
+ else if(body instanceof Data)
+ {
+ Binary b = ((Data) body).getValue();
+ _length = b.getLength();
+ return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+
+ private ByteArrayInputStream createEmptyByteArrayInputStream()
+ {
+ return new ByteArrayInputStream(new byte[0]);
+ }
+
+ public long getBytesLength()
+ {
+ getByteArrayInputStream();
+ return _length;
+ }
+
+ //TODO: methods to access/set content
+}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpGenericMessage extends AmqpMessage
{
- static final int PORT = 25672;
-
- Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+ public AmqpGenericMessage()
{
- testPeer.expectPlainConnect("guest", "guest", true);
-
- Connection connection = new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+ super();
+ }
- assertNull(testPeer.getException());
- return connection;
+ public AmqpGenericMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
}
+
+ //TODO: add some methods to access the body in some fashion?
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Wed Nov 20 14:34:55 2013
@@ -37,11 +37,11 @@ public abstract class AmqpLink
private boolean _closed;
- public AmqpLink(AmqpSession amqpSession, Link protonLink)
+ public AmqpLink(AmqpSession amqpSession, Link protonLink, AmqpConnection amqpConnection)
{
_amqpSession = amqpSession;
_protonLink = protonLink;
- _amqpConnection = _amqpSession.getAmqpConnection();
+ _amqpConnection = amqpConnection;
}
public boolean isEstablished()
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpListMessage extends AmqpMessage
{
- static final int PORT = 25672;
-
- Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+ public AmqpListMessage()
{
- testPeer.expectPlainConnect("guest", "guest", true);
-
- Connection connection = new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+ super();
+ }
- assertNull(testPeer.getException());
- return connection;
+ public AmqpListMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
}
+
+ //TODO: methods to access/set content
}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpMapMessage extends AmqpMessage
{
- static final int PORT = 25672;
-
- Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+ public AmqpMapMessage()
{
- testPeer.expectPlainConnect("guest", "guest", true);
-
- Connection connection = new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+ super();
+ }
- assertNull(testPeer.getException());
- return connection;
+ public AmqpMapMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
}
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Wed Nov 20 14:34:55 2013
@@ -20,9 +20,12 @@
*/
package org.apache.qpid.jms.engine;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
@@ -31,28 +34,40 @@ import org.apache.qpid.proton.message.Me
* Thread-safe (all state is guarded by the corresponding {@link AmqpConnection} monitor)
*
*/
-public class AmqpMessage
+public abstract class AmqpMessage
{
+ public static final String MESSAGE_ANNOTATION_TYPE_KEY_NAME = "x-opt-jms-message-type";
- private final AmqpReceiver _amqpReceiver;
private final Delivery _delivery;
private final Message _message;
+ private final AmqpConnection _amqpConnection;
+
+ private volatile MessageAnnotations _messageAnnotations;
+ private volatile Map<Object,Object> _messageAnnotationsMap;
- public AmqpMessage(Delivery delivery, Message message, AmqpReceiver amqpReceiver)
+ /**
+ * Used when creating a message that we intend to send
+ */
+ public AmqpMessage()
{
- _delivery = delivery;
- _amqpReceiver = amqpReceiver;
- _message = message;
+ this(Proton.message(), null, null);
}
/**
- * Currently used when creating a message that we intend to send
+ * Used when creating a message that has been received
*/
- public AmqpMessage()
+ @SuppressWarnings("unchecked")
+ public AmqpMessage(Message message, Delivery delivery, AmqpConnection amqpConnection)
{
- _message = Proton.message();
- _amqpReceiver = null;
- _delivery = null;
+ _delivery = delivery;
+ _amqpConnection = amqpConnection;
+ _message = message;
+
+ _messageAnnotations = _message.getMessageAnnotations();
+ if(_messageAnnotations != null)
+ {
+ _messageAnnotationsMap = _messageAnnotations.getValue();
+ }
}
Message getMessage()
@@ -62,19 +77,19 @@ public class AmqpMessage
public void accept(boolean settle)
{
- synchronized (_amqpReceiver.getAmqpConnection())
+ synchronized (_amqpConnection)
{
_delivery.disposition(Accepted.getInstance());
if(settle)
{
- _delivery.settle();
+ settle();
}
}
}
public void settle()
{
- synchronized (_amqpReceiver.getAmqpConnection())
+ synchronized (_amqpConnection)
{
_delivery.settle();
}
@@ -88,15 +103,78 @@ public class AmqpMessage
*/
public boolean isSettled()
{
- synchronized (_amqpReceiver.getAmqpConnection())
+ synchronized (_amqpConnection)
{
return _delivery.isSettled() || ((_delivery instanceof DeliveryImpl && ((DeliveryImpl)_delivery).remotelySettled()));
}
}
- public void setText(String string)
+ public void setContentType(String contentType)
+ {
+ //TODO: do we need to synchronise this?
+ _message.setContentType(contentType);
+ }
+
+ public boolean messageAnnotationExists(Object key)
+ {
+ //TODO: this isn't thread-safe, does it need to be?
+ Map<Object,Object> msgAnnotations = _messageAnnotationsMap;
+ if(msgAnnotations == null)
+ {
+ return false;
+ }
+
+ return msgAnnotations.containsKey(key);
+ }
+
+ public void clearMessageAnnotation(Object key)
+ {
+ //TODO: this isnt thread-safe, does it need to be?
+ if(_messageAnnotationsMap == null)
+ {
+ return;
+ }
+
+ _messageAnnotationsMap.remove(key);
+
+ //If there are now no annotations, clear the field on
+ //the Proton message to avoid encoding an empty map
+ if(_messageAnnotationsMap.isEmpty())
+ {
+ clearAllMessageAnnotations();
+ }
+ }
+
+ public void clearAllMessageAnnotations()
+ {
+ //TODO: this isnt thread-safe, does it need to be?
+ _messageAnnotations = null;
+ _message.setMessageAnnotations(null);
+ }
+
+ public void setMessageAnnotation(Object key, Object value)
+ {
+ //TODO: this isnt thread-safe, does it need to be?
+ if(_messageAnnotationsMap == null)
+ {
+ _messageAnnotationsMap = new HashMap<Object,Object>();
+ }
+
+ _messageAnnotationsMap.put(key, value);
+
+ //If there were previously no annotations, we need to
+ //set the related field on the Proton message now
+ if(_messageAnnotations == null)
+ {
+ setMessageAnnotations();
+ }
+ }
+
+ private void setMessageAnnotations()
{
- AmqpValue body = new AmqpValue(string);
- _message.setBody(body);
+ //TODO: this isnt thread-safe, does it need to be?
+ _messageAnnotations = new MessageAnnotations(_messageAnnotationsMap);
+ _message.setMessageAnnotations(_messageAnnotations);
}
+
}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessageFactory
+{
+ //TODO: use switch statements
+ @SuppressWarnings("unchecked")
+ AmqpMessage createAmqpMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ Section body = message.getBody();
+
+ Map<Object,Object> messageAnnotationsMap = null;
+ if(message.getMessageAnnotations() != null)
+ {
+ messageAnnotationsMap = message.getMessageAnnotations().getValue();
+ };
+
+ if(body == null)
+ {
+ if(isJMSMessageType(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE, messageAnnotationsMap))
+ {
+ return new AmqpTextMessage(delivery, message, amqpConnection);
+ }
+ else if(isContentType(AmqpTextMessage.CONTENT_TYPE, message))
+ {
+ return new AmqpTextMessage(delivery, message, amqpConnection);
+ }
+ else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+ {
+ return new AmqpObjectMessage(delivery, message, amqpConnection);
+ }
+ else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) || isContentType(null, message))
+ {
+ return new AmqpBytesMessage(delivery, message, amqpConnection);
+ }
+ }
+ else if(body instanceof Data)
+ {
+ if(isContentType(AmqpTextMessage.CONTENT_TYPE, message))
+ {
+ return new AmqpTextMessage(delivery, message, amqpConnection);
+ }
+ else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) || isContentType(null, message))
+ {
+ return new AmqpBytesMessage(delivery, message, amqpConnection);
+ }
+ else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+ {
+ return new AmqpObjectMessage(delivery, message, amqpConnection);
+ }
+ }
+ else if(body instanceof AmqpValue)
+ {
+ Object value = ((AmqpValue) body).getValue();
+
+ if(value == null)
+ {
+ if(isJMSMessageType(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE, messageAnnotationsMap))
+ {
+ return new AmqpTextMessage(delivery, message, amqpConnection);
+ }
+ }
+ else if(value instanceof String)
+ {
+ return new AmqpTextMessage(delivery, message, amqpConnection);
+ }
+ else if(value instanceof Map)
+ {
+ return new AmqpMapMessage(delivery, message, amqpConnection);
+ }
+ else if(value instanceof List)
+ {
+ return new AmqpListMessage(delivery, message, amqpConnection);
+ }
+ else if(value instanceof Binary)
+ {
+ return new AmqpBytesMessage(delivery, message, amqpConnection);
+ }
+ }
+
+ //Unable to determine a specific message type, return the generic message
+ return new AmqpGenericMessage(delivery, message, amqpConnection);
+ }
+
+ private boolean isJMSMessageType(String messageType, Map<Object, Object> messageAnnotationsMap)
+ {
+ Symbol key = Symbol.valueOf(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME);
+ Object value = getAnnotation(messageAnnotationsMap, key);
+
+ return messageType.equals(value);
+ }
+
+ private Object getAnnotation(Map<Object,Object> annotations, Symbol symbolKey)
+ {
+ if(annotations == null || !annotations.containsKey(symbolKey))
+ {
+ return null;
+ }
+
+ return annotations.get(symbolKey);
+ }
+
+ private boolean isContentType(String contentType, Message message)
+ {
+ if(contentType == null)
+ {
+ return message.getContentType() == null;
+ }
+ else
+ {
+ return contentType.equals(message.getContentType());
+ }
+ }
+
+}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpObjectMessage extends AmqpMessage
{
- static final int PORT = 25672;
+ public static final String CONTENT_TYPE = "application/x-java-serialized-object";
- Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+ public AmqpObjectMessage()
{
- testPeer.expectPlainConnect("guest", "guest", true);
-
- Connection connection = new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+ super();
+ }
- assertNull(testPeer.getException());
- return connection;
+ public AmqpObjectMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
}
+
+ //TODO: methods to access/set content
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Wed Nov 20 14:34:55 2013
@@ -29,9 +29,9 @@ public class AmqpReceiver extends AmqpLi
private Receiver _protonReceiver;
private byte[] _buffer = new byte[1024];
- public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver)
+ public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver, AmqpConnection amqpConnection)
{
- super(amqpSession, protonReceiver);
+ super(amqpSession, protonReceiver, amqpConnection);
_protonReceiver = protonReceiver;
}
@@ -74,10 +74,12 @@ public class AmqpReceiver extends AmqpLi
break;
}
}
+
Message message = getAmqpConnection().getMessageFactory().createMessage();
message.decode(_buffer, 0, total);
- AmqpMessage amqpMessage = new AmqpMessage(currentDelivery, message, this);
+ //TODO: dont create a new factory for every message
+ AmqpMessage amqpMessage = new AmqpMessageFactory().createAmqpMessage(currentDelivery, message, getAmqpConnection());
currentDelivery.setContext(amqpMessage);
_protonReceiver.advance();
return amqpMessage;
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Wed Nov 20 14:34:55 2013
@@ -32,9 +32,9 @@ public class AmqpSender extends AmqpLink
private byte[] _buffer = new byte[1024];
private final Sender _protonSender;
- public AmqpSender(AmqpSession amqpSession, Sender protonSender)
+ public AmqpSender(AmqpSession amqpSession, Sender protonSender, AmqpConnection amqpConnection)
{
- super(amqpSession, protonSender);
+ super(amqpSession, protonSender, amqpConnection);
_protonSender = protonSender;
}
@@ -46,7 +46,7 @@ public class AmqpSender extends AmqpLink
byte[] bufferBytes = new byte[8];
ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
- buffer.putLong(0,tag++);
+ buffer.putLong(0, tag++);
Delivery del = _protonSender.delivery(bufferBytes);
@@ -66,10 +66,10 @@ public class AmqpSender extends AmqpLink
_protonSender.send(_buffer, 0, encoded);
_protonSender.advance();
- AmqpSentMessageToken amqpSentMessage = new AmqpSentMessageToken(del, this);
- del.setContext(amqpSentMessage);
+ AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this);
+ del.setContext(amqpSentMessageToken);
- return amqpSentMessage;
+ return amqpSentMessageToken;
}
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Wed Nov 20 14:34:55 2013
@@ -39,9 +39,9 @@ public class AmqpSession
private boolean _closed;
- public AmqpSession(AmqpConnection amqpConn, Session protonSession)
+ public AmqpSession(AmqpConnection amqpConnection, Session protonSession)
{
- _amqpConnection = amqpConn;
+ _amqpConnection = amqpConnection;
_protonSession = protonSession;
}
@@ -90,7 +90,7 @@ public class AmqpSession
protonSender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- AmqpSender amqpSender = new AmqpSender(this, protonSender);
+ AmqpSender amqpSender = new AmqpSender(this, protonSender, _amqpConnection);
protonSender.setContext(amqpSender);
protonSender.open();
_amqpConnection.addPendingLink(protonSender);
@@ -113,7 +113,7 @@ public class AmqpSession
protonReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
protonReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver);
+ AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver, _amqpConnection);
protonReceiver.setContext(amqpReceiver);
protonReceiver.open();
_amqpConnection.addPendingLink(protonReceiver);
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpTextMessage extends AmqpMessage
+{
+ /**
+ * Content type, only to be used when message uses a data
+ * body section, and not when using an amqp-value body section
+ */
+ public static final String CONTENT_TYPE = "text/plain";
+
+ /**
+ * Message type annotation value, only to be used when message uses
+ * an amqp-value body section containing a null value, not otherwise.
+ */
+ public static final String MSG_TYPE_ANNOTATION_VALUE = "TextMessage";
+
+ public AmqpTextMessage()
+ {
+ super();
+ setText(null);
+ }
+
+ public AmqpTextMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+ {
+ super(message, delivery, amqpConnection);
+ }
+
+ public void setText(String text)
+ {
+ AmqpValue body = new AmqpValue(text);
+ getMessage().setBody(body);
+
+ Symbol msgTypeAnnotationKey = Symbol.valueOf(MESSAGE_ANNOTATION_TYPE_KEY_NAME);
+ if(text == null && !messageAnnotationExists(msgTypeAnnotationKey))
+ {
+ setMessageAnnotation(msgTypeAnnotationKey, MSG_TYPE_ANNOTATION_VALUE);
+ }
+ else if(text != null && messageAnnotationExists(msgTypeAnnotationKey))
+ {
+ clearMessageAnnotation(msgTypeAnnotationKey);
+ }
+ }
+
+ public String getText()
+ {
+ Section body = getMessage().getBody();
+
+ if(body == null)
+ {
+ return null;
+ }
+ else if(body instanceof Data)
+ {
+ //TODO
+ return null;
+ }
+ else if(body instanceof AmqpValue)
+ {
+ Object value = ((AmqpValue) body).getValue();
+
+ if(value == null || value instanceof String)
+ {
+ return (String) value;
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected body content type: " + value.getClass().getSimpleName());
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+
+public class BytesMessageImpl extends MessageImpl<AmqpBytesMessage> implements BytesMessage
+{
+ private ByteArrayOutputStream _bytesOut;
+ private DataOutputStream _dataAsOutput;
+ private DataInputStream _dataIn;
+ private ByteArrayInputStream _bytesIn;
+
+ //message to be sent
+ public BytesMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ this(new AmqpBytesMessage(), sessionImpl, connectionImpl);
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ }
+
+ //message just received
+ public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ super(amqpMessage, sessionImpl, connectionImpl);
+ _dataIn = new DataInputStream(amqpMessage.getByteArrayInputStream());
+ }
+
+ @Override
+ protected AmqpBytesMessage prepareUnderlyingAmqpMessageForSending(AmqpBytesMessage amqpMessage)
+ {
+ amqpMessage.setBytes(_bytesOut.toByteArray());
+
+ //TODO: do we need to do anything later with properties/headers etc?
+ return amqpMessage;
+ }
+
+ private JMSException handleInputException(final IOException e)
+ {
+ JMSException ex;
+ if(e instanceof EOFException)
+ {
+ ex = new MessageEOFException(e.getMessage());
+ }
+ else
+ {
+ ex = new MessageFormatException(e.getMessage());
+ }
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ return ex;
+ }
+
+ private JMSException handleOutputException(final IOException e)
+ {
+ return new QpidJmsException(e.getMessage(), e);
+ }
+
+ //======= JMS Methods =======
+
+ @Override
+ public long getBodyLength() throws JMSException
+ {
+ return getUnderlyingAmqpMessage(false).getBytesLength();
+ }
+
+ @Override
+ public boolean readBoolean() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public byte readByte() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readUnsignedByte() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public short readShort() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readUnsignedShort() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public char readChar() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readInt() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public long readLong() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public float readFloat() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public double readDouble() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public String readUTF() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readBytes(byte[] value) throws JMSException
+ {
+ return readBytes(value, value.length);
+ }
+
+ @Override
+ public int readBytes(byte[] value, int length) throws JMSException
+ {
+ //TODO: checkReadable();
+
+ try
+ {
+ int offset = 0;
+ while(offset < length)
+ {
+ int read = _dataIn.read(value, offset, length - offset);
+ if(read < 0)
+ {
+ break;
+ }
+ offset += read;
+ }
+
+ if(offset == 0 && length != 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return offset;
+ }
+ }
+ catch (IOException e)
+ {
+ throw handleInputException(e);
+ }
+ }
+
+ @Override
+ public void writeBoolean(boolean value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeByte(byte value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeShort(short value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeChar(char value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeInt(int value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeLong(long value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeFloat(float value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeDouble(double value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeUTF(String value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeBytes(byte[] bytes) throws JMSException
+ {
+ //TODO: checkWritable();
+ try
+ {
+ _dataAsOutput.write(bytes);
+ }
+ catch (IOException e)
+ {
+ throw handleOutputException(e);
+ }
+ }
+
+ @Override
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeObject(Object value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void reset() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+}
Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java (from r1529205, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -19,32 +19,27 @@
package org.apache.qpid.jms.impl;
import javax.jms.JMSException;
-import javax.jms.TextMessage;
+import javax.jms.Message;
-public class TextMessageImpl extends MessageImpl implements TextMessage
+import org.apache.qpid.jms.engine.AmqpGenericMessage;
+
+public class GenericAmqpMessageImpl extends MessageImpl<AmqpGenericMessage> implements Message
{
- public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public GenericAmqpMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- super(sessionImpl, connectionImpl);
+ this(new AmqpGenericMessage(), sessionImpl, connectionImpl);
}
- public TextMessageImpl(String text, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(sessionImpl, connectionImpl);
- setText(text);
+ super(amqpMessage, sessionImpl, connectionImpl);
}
@Override
- public String getText() throws JMSException
+ protected AmqpGenericMessage prepareUnderlyingAmqpMessageForSending(AmqpGenericMessage amqpMessage)
{
- // TODO Auto-generated method stub
+ //TODO
throw new UnsupportedOperationException("Not Implemented");
}
- @Override
- public void setText(String string) throws JMSException
- {
- getAmqpMessage().setText(string);
- }
-
}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.qpid.jms.engine.AmqpMapMessage;
+
+public class MapMessageImpl extends MessageImpl<AmqpMapMessage> implements MapMessage
+{
+ public MapMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ this(new AmqpMapMessage(), sessionImpl, connectionImpl);
+ }
+
+ public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ super(amqpMessage, sessionImpl, connectionImpl);
+ }
+
+ @Override
+ protected AmqpMapMessage prepareUnderlyingAmqpMessageForSending(AmqpMapMessage amqpMessage)
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ //======= JMS Methods =======
+
+ @Override
+ public boolean getBoolean(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public byte getByte(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public short getShort(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public char getChar(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int getInt(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public long getLong(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public float getFloat(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public double getDouble(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public String getString(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public byte[] getBytes(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public Object getObject(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Enumeration getMapNames() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setBoolean(String name, boolean value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setByte(String name, byte value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setShort(String name, short value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setChar(String name, char value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setInt(String name, int value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setLong(String name, long value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setFloat(String name, float value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setDouble(String name, double value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setString(String name, String value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setBytes(String name, byte[] value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setBytes(String name, byte[] value, int offset, int length)
+ throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void setObject(String name, Object value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public boolean itemExists(String name) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+import org.apache.qpid.jms.engine.AmqpGenericMessage;
+import org.apache.qpid.jms.engine.AmqpListMessage;
+import org.apache.qpid.jms.engine.AmqpMapMessage;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpTextMessage;
+
+public class MessageFactoryImpl
+{
+ Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ if(amqpMessage instanceof AmqpTextMessage)
+ {
+ return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else if(amqpMessage instanceof AmqpBytesMessage)
+ {
+ return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else if(amqpMessage instanceof AmqpObjectMessage)
+ {
+ return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else if(amqpMessage instanceof AmqpListMessage)
+ {
+ return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else if(amqpMessage instanceof AmqpMapMessage)
+ {
+ return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else if(amqpMessage instanceof AmqpGenericMessage)
+ {
+ return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl);
+ }
+ else
+ {
+ //TODO: support other message types
+ throw new QpidJmsException("Unknown Message Type");
+ }
+ }
+}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Wed Nov 20 14:34:55 2013
@@ -26,29 +26,31 @@ import javax.jms.Message;
import org.apache.qpid.jms.engine.AmqpMessage;
-public class MessageImpl implements Message
+public abstract class MessageImpl<T extends AmqpMessage> implements Message
{
- private final AmqpMessage _amqpMessage;
- private final SessionImpl _sessionImpl;
- private final ConnectionImpl _connectionImpl;
+ private final T _amqpMessage;
- public MessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl)
- {
- this(new AmqpMessage(), sessionImpl, connectionImpl);
- }
-
- public MessageImpl(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
+ public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
{
_amqpMessage = amqpMessage;
- _sessionImpl = sessionImpl;
- _connectionImpl = connectionImpl;
}
- AmqpMessage getAmqpMessage()
+ T getUnderlyingAmqpMessage(boolean prepareForSending)
{
- return _amqpMessage;
+ if(prepareForSending)
+ {
+ return prepareUnderlyingAmqpMessageForSending(_amqpMessage);
+ }
+ else
+ {
+ return _amqpMessage;
+ }
}
+ protected abstract T prepareUnderlyingAmqpMessageForSending(T amqpMessage);
+
+ //======= JMS Methods =======
+
@Override
public String getJMSMessageID() throws JMSException
{
@@ -363,5 +365,4 @@ public class MessageImpl implements Mess
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Not Implemented");
}
-
}
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+
+public class ObjectMessageImpl extends MessageImpl<AmqpObjectMessage> implements ObjectMessage
+{
+ public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ this(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+ }
+
+ public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ super(amqpMessage, sessionImpl, connectionImpl);
+ }
+
+ @Override
+ protected AmqpObjectMessage prepareUnderlyingAmqpMessageForSending(AmqpObjectMessage amqpMessage)
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ //======= JMS Methods =======
+
+ @Override
+ public void setObject(Serializable object) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public Serializable getObject() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Wed Nov 20 14:34:55 2013
@@ -61,11 +61,10 @@ public class ReceiverImpl extends LinkIm
MessageReceivedPredicate messageReceievedCondition = new MessageReceivedPredicate();
getConnectionImpl().waitUntil(messageReceievedCondition, timeout);
-
- //TODO: decide what if any particular message impl class to instantiate
-
AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
- MessageImpl receivedMessageImpl = new MessageImpl(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
+
+ //TODO: don't create a new factory for every message
+ Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
//TODO: accepting/settling will be acknowledge-mode dependent
if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
@@ -79,7 +78,7 @@ public class ReceiverImpl extends LinkIm
getConnectionImpl().stateChanged();
- return receivedMessageImpl;
+ return receivedMessage;
}
catch (JmsTimeoutException e)
{
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Wed Nov 20 14:34:55 2013
@@ -66,10 +66,11 @@ public class SenderImpl extends LinkImpl
{
if(message instanceof MessageImpl)
{
- return ((MessageImpl)message).getAmqpMessage();
+ return ((MessageImpl<?>)message).getUnderlyingAmqpMessage(true);
}
else
{
+ //TODO
throw new UnsupportedOperationException("cross-vendor message support has yet to be implemented");
}
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed Nov 20 14:34:55 2013
@@ -203,8 +203,7 @@ public class SessionImpl implements Sess
@Override
public BytesMessage createBytesMessage() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ return new BytesMessageImpl(this, getConnectionImpl());
}
@Override
Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.engine.AmqpListMessage;
+
+public class StreamMessageImpl extends MessageImpl<AmqpListMessage> implements StreamMessage
+{
+ public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ this(new AmqpListMessage(), sessionImpl, connectionImpl);
+ }
+
+ public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ super(amqpMessage, sessionImpl, connectionImpl);
+ }
+
+ @Override
+ protected AmqpListMessage prepareUnderlyingAmqpMessageForSending(AmqpListMessage amqpMessage)
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ //======= JMS Methods =======
+
+ @Override
+ public boolean readBoolean() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public byte readByte() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public short readShort() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public char readChar() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readInt() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public long readLong() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public float readFloat() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public double readDouble() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public String readString() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public int readBytes(byte[] value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public Object readObject() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeBoolean(boolean value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeByte(byte value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeShort(short value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeChar(char value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeInt(int value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeLong(long value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeFloat(float value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeDouble(double value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeString(String value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeBytes(byte[] value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeBytes(byte[] value, int offset, int length)
+ throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void writeObject(Object value) throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+ @Override
+ public void reset() throws JMSException
+ {
+ //TODO
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -21,30 +21,48 @@ package org.apache.qpid.jms.impl;
import javax.jms.JMSException;
import javax.jms.TextMessage;
-public class TextMessageImpl extends MessageImpl implements TextMessage
+import org.apache.qpid.jms.engine.AmqpTextMessage;
+
+public class TextMessageImpl extends MessageImpl<AmqpTextMessage> implements TextMessage
{
public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- super(sessionImpl, connectionImpl);
+ this((String) null, sessionImpl, connectionImpl);
}
public TextMessageImpl(String text, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
{
- this(sessionImpl, connectionImpl);
+ this(new AmqpTextMessage(), sessionImpl, connectionImpl);
setText(text);
}
+ public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+ {
+ super(amqpMessage, sessionImpl, connectionImpl);
+ }
+
@Override
- public String getText() throws JMSException
+ protected AmqpTextMessage prepareUnderlyingAmqpMessageForSending(AmqpTextMessage amqpMessage)
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ //Nothing to do here currently, the message operations are all
+ //already operating on the AmqpMessage directly
+
+ //TODO: do we need to do anything later with properties/headers etc?
+ return amqpMessage;
}
+ //======= JMS Methods =======
+
@Override
- public void setText(String string) throws JMSException
+ public String getText() throws JMSException
{
- getAmqpMessage().setText(string);
+ return getUnderlyingAmqpMessage(false).getText();
}
+ @Override
+ public void setText(String text) throws JMSException
+ {
+ //TODO: checkWritable();
+ getUnderlyingAmqpMessage(false).setText(text);
+ }
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java Wed Nov 20 14:34:55 2013
@@ -36,7 +36,7 @@ public class IntegrationTestFixture
Connection connection = new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
- assertNull(testPeer.getException());
+ assertNull(testPeer.getThrowable());
return connection;
}
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Wed Nov 20 14:34:55 2013
@@ -18,17 +18,39 @@
*/
package org.apache.qpid.jms;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
-import org.apache.qpid.jms.impl.MessageImpl;
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpTextMessage;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
import org.junit.Test;
public class SessionIntegrationTest extends QpidJmsTestCase
@@ -73,7 +95,63 @@ public class SessionIntegrationTest exte
}
@Test
- public void testSendReceiveTextMessageWithContent() throws Exception
+ public void testReceiveTextMessageWithContent() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ final String expectedMessageContent = "myTextMessage";
+
+ DescribedType amqpValueStringContent = new AmqpValueDescribedType(expectedMessageContent);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, amqpValueStringContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(expectedMessageContent,((TextMessage)receivedMessage).getText());
+ }
+ }
+
+ @Test
+ public void testSendTextMessageWithoutContent() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true)
+ .withEntry(Symbol.valueOf(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME), equalTo(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE));
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+
+ producer.send(message);
+ }
+ }
+
+ @Test
+ public void testReceiveTextMessageWithoutContent() throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
{
@@ -85,17 +163,89 @@ public class SessionIntegrationTest exte
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME, AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE);
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
testPeer.expectReceiverAttach();
- testPeer.expectLinkFlowRespondWithTransfer();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
- // TODO check that it's a TextMessage with expected content: String expectedText = "myMessage";
- MessageImpl receivedMessage = (MessageImpl) messageConsumer.receive(1000);
assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertNull(((TextMessage)receivedMessage).getText());
+ }
+ }
+
+ @Test
+ public void testSendBytesMessageWithContent() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ byte[] content = "myBytes".getBytes();
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+ propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE)));
+ messageMatcher.setPropertiesMatcher(propertiesMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new Binary(content)));
+
+ testPeer.expectTransfer(messageMatcher);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(content);
+
+ producer.send(message);
+ }
+ }
- testPeer.waitForAllHandlersToComplete();
+ @Test
+ public void testReceiveBytesMessageWithContentUsingDataSection() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+ properties.setContentType(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE));
+
+ final byte[] expectedContent = "expectedContent".getBytes();
+ DescribedType dataContent = new DataDescribedType(new Binary(expectedContent));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(1000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof BytesMessage);
+ BytesMessage bytesMessage = (BytesMessage)receivedMessage;
+ assertEquals(expectedContent.length, bytesMessage.getBodyLength());
+ byte[] recievedContent = new byte[expectedContent.length];
+ int readBytes = bytesMessage.readBytes(recievedContent);
+ assertEquals(recievedContent.length, readBytes);
+ assertTrue(Arrays.equals(expectedContent, recievedContent));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org