You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:06 UTC
[5/7] qpid-jms git commit: QPIDJMS-207 Adds dependency on JMS 2.0 API
and initial implementation.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 418d85c..bb2eb0b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms.message;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkPropertyNameIsValid;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkValidObject;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_DELIVERY_COUNT;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPID;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
@@ -42,7 +44,6 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
-import javax.jms.MessageFormatException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.util.TypeConversionSupport;
@@ -808,74 +809,4 @@ public class JmsMessagePropertyIntercepter {
return names;
}
-
- //----- Property Validation Methods --------------------------------------//
-
- private static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
- if (propertyName == null) {
- throw new IllegalArgumentException("Property name must not be null");
- } else if (propertyName.length() == 0) {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- if (validateNames) {
- checkIdentifierLetterAndDigitRequirements(propertyName);
- checkIdentifierIsntNullTrueFalse(propertyName);
- checkIdentifierIsntLogicOperator(propertyName);
- }
- }
-
- private static void checkIdentifierIsntLogicOperator(String identifier) {
- // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
- if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
- "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
- "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
-
- throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
- }
- }
-
- private static void checkIdentifierIsntNullTrueFalse(String identifier) {
- // Identifiers cannot be the names NULL, TRUE, and FALSE.
- if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
- throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
- }
- }
-
- private static void checkIdentifierLetterAndDigitRequirements(String identifier) {
- // An identifier is an unlimited-length sequence of letters and digits, the first of
- // which must be a letter. A letter is any character for which the method
- // Character.isJavaLetter returns true. This includes '_' and '$'. A letter or digit
- // is any character for which the method Character.isJavaLetterOrDigit returns true.
- char startChar = identifier.charAt(0);
- if (!(Character.isJavaIdentifierStart(startChar))) {
- throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
- }
-
- // JMS part character
- int length = identifier.length();
- for (int i = 1; i < length; i++) {
- char ch = identifier.charAt(i);
- if (!(Character.isJavaIdentifierPart(ch))) {
- throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
- }
- }
- }
-
- private static void checkValidObject(Object value) throws MessageFormatException {
- boolean valid = value instanceof Boolean ||
- value instanceof Byte ||
- value instanceof Short ||
- value instanceof Integer ||
- value instanceof Long ||
- value instanceof Float ||
- value instanceof Double ||
- value instanceof Character ||
- value instanceof String ||
- value == null;
-
- if (!valid) {
- throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
new file mode 100644
index 0000000..2c3576c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.util.TypeConversionSupport;
+
+/**
+ * Provides methods for use when working with JMS Message Properties and their values.
+ */
+public class JmsMessagePropertySupport {
+
+ //----- Conversions Validation for Message Properties --------------------//
+
+ @SuppressWarnings("unchecked")
+ public static <T> T convertPropertyTo(String name, Object value, Class<T> target) throws JMSException {
+ if (value == null) {
+ if (Boolean.class.equals(target)) {
+ return (T) Boolean.FALSE;
+ } else if (Float.class.equals(target) || Double.class.equals(target)) {
+ throw new NullPointerException("property " + name + " was null");
+ } else if (Number.class.isAssignableFrom(target)) {
+ throw new NumberFormatException("property " + name + " was null");
+ } else {
+ return null;
+ }
+ }
+
+ T rc = (T) TypeConversionSupport.convert(value, target);
+ if (rc == null) {
+ throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a " + target.getName());
+ }
+
+ return rc;
+ }
+
+ //----- Property Name Validation Methods ---------------------------------//
+
+ public static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
+ if (propertyName == null) {
+ throw new IllegalArgumentException("Property name must not be null");
+ } else if (propertyName.length() == 0) {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ if (validateNames) {
+ checkIdentifierLetterAndDigitRequirements(propertyName);
+ checkIdentifierIsntNullTrueFalse(propertyName);
+ checkIdentifierIsntLogicOperator(propertyName);
+ }
+ }
+
+ public static void checkIdentifierIsntLogicOperator(String identifier) {
+ // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
+ if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
+ "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
+ "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
+
+ throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+ }
+ }
+
+ public static void checkIdentifierIsntNullTrueFalse(String identifier) {
+ // Identifiers cannot be the names NULL, TRUE, and FALSE.
+ if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
+ throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+ }
+ }
+
+ public static void checkIdentifierLetterAndDigitRequirements(String identifier) {
+ // An identifier is an unlimited-length sequence of letters and digits, the first of
+ // which must be a letter. A letter is any character for which the method
+ // Character.isJavaLetter returns true. This includes '_' and '$'. A letter or digit
+ // is any character for which the method Character.isJavaLetterOrDigit returns true.
+ char startChar = identifier.charAt(0);
+ if (!(Character.isJavaIdentifierStart(startChar))) {
+ throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
+ }
+
+ // JMS part character
+ int length = identifier.length();
+ for (int i = 1; i < length; i++) {
+ char ch = identifier.charAt(i);
+ if (!(Character.isJavaIdentifierPart(ch))) {
+ throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
+ }
+ }
+ }
+
+ //----- Property Type Validation Methods ---------------------------------//
+
+ public static void checkValidObject(Object value) throws MessageFormatException {
+ boolean valid = value instanceof Boolean ||
+ value instanceof Byte ||
+ value instanceof Short ||
+ value instanceof Integer ||
+ value instanceof Long ||
+ value instanceof Float ||
+ value instanceof Double ||
+ value instanceof Character ||
+ value instanceof String ||
+ value == null;
+
+ if (!valid) {
+ throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
index 902719b..e8e0d22 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
@@ -60,6 +60,25 @@ public class JmsObjectMessage extends JmsMessage implements ObjectMessage {
}
@Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ if (!facade.hasBody()) {
+ return true;
+ }
+
+ return Serializable.class == target || Object.class == target || target.isInstance(getObject());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ try {
+ return (T) getObject();
+ } catch (JMSException e) {
+ throw new MessageFormatException("Failed to read Object: " + e.getMessage());
+ }
+ }
+
+ @Override
public String toString() {
return "JmsObjectMessageFacade { " + facade.toString() + " }";
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
index b334249..c9765f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
@@ -475,6 +475,11 @@ public class JmsStreamMessage extends JmsMessage implements StreamMessage {
return "JmsStreamMessage { " + facade.toString() + " }";
}
+ @Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ return false;
+ }
+
private void checkBytesInFlight() throws MessageFormatException {
if (remainingBytes != NO_BYTES_IN_FLIGHT) {
throw new MessageFormatException(
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
index 69aefe8..9647b57 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
@@ -22,6 +22,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
+@SuppressWarnings("unchecked")
public class JmsTextMessage extends JmsMessage implements TextMessage {
private final JmsTextMessageFacade facade;
@@ -57,4 +58,14 @@ public class JmsTextMessage extends JmsMessage implements TextMessage {
public String toString() {
return "JmsTextMessage { " + facade.toString() + " }";
}
+
+ @Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ return facade.hasBody() ? target.isAssignableFrom(String.class) : true;
+ }
+
+ @Override
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ return (T) getText();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
index 7321a8e..73117b9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
@@ -86,4 +86,10 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade {
* @return the number of bytes contained in the body of the message.
*/
int getBodyLength();
+
+ /**
+ * @return a copy of the bytes contained in the body of the message.
+ */
+ byte[] copyBody();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index 1741251..d5616f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -315,6 +315,25 @@ public interface JmsMessageFacade {
void setExpiration(long expiration);
/**
+ * Returns the set delivery time for this message.
+ *
+ * The value should be returned as an absolute time given in GMT time.
+ *
+ * @return the earliest time that the message should be made available for delivery.
+ */
+ long getDeliveryTime();
+
+ /**
+ * Sets an desired delivery time on this message.
+ *
+ * The delivery time will be given as an absolute time in GMT time.
+ *
+ * @param deliveryTime
+ * the earliest time that the message should be made available for delivery.
+ */
+ void setDeliveryTime(long deliveryTime);
+
+ /**
* Gets the Destination value that was assigned to this message at the time it was
* sent.
*
@@ -429,4 +448,11 @@ public interface JmsMessageFacade {
*/
void setProviderMessageIdObject(Object messageId);
+ /**
+ * Returns true if the underlying message has a body, false if the body is empty.
+ *
+ * @return true if the underlying message has a body, false if the body is empty.
+ */
+ boolean hasBody();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
index 369dfa9..6b63fb7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -19,10 +19,6 @@ package org.apache.qpid.jms.provider.amqp.message;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
@@ -40,6 +36,11 @@ import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+
/**
* A JmsBytesMessageFacade that wraps around Proton AMQP Message instances to provide
* access to the underlying bytes contained in the message.
@@ -216,6 +217,21 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
}
@Override
+ public boolean hasBody() {
+ return getBinaryFromBody().getLength() != 0;
+ }
+
+ @Override
+ public byte[] copyBody() {
+ Binary content = getBinaryFromBody();
+ byte[] result = new byte[content.getLength()];
+
+ System.arraycopy(content.getArray(), content.getArrayOffset(), result, 0, content.getLength());
+
+ return result;
+ }
+
+ @Override
public void onSend(long producerTtl) throws JMSException {
super.onSend(producerTtl);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
index dc2cf0b..fe242f7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -141,6 +141,11 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
messageBodyMap.clear();
}
+ @Override
+ public boolean hasBody() {
+ return !messageBodyMap.isEmpty();
+ }
+
private void initializeEmptyBody() {
// Using LinkedHashMap because AMQP map equality considers order,
// so we should behave in as predictable a manner as possible
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index f98dccc..82f63e0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -553,6 +553,18 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
}
+ @Override
+ public long getDeliveryTime() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void setDeliveryTime(long deliveryTime) {
+ // TODO Auto-generated method stub
+
+ }
+
/**
* Sets a value which will be used to override any ttl value that may otherwise be set
* based on the expiration value when sending the underlying AMQP message. A value of 0
@@ -702,6 +714,11 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
}
+ @Override
+ public boolean hasBody() {
+ return message.getBody() == null;
+ }
+
/**
* @return the true AMQP Message instance wrapped by this Facade.
*/
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index fabefed..f4b541f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -125,6 +125,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
}
@Override
+ public boolean hasBody() {
+ return delegate.hasBody();
+ }
+
+ @Override
public void onSend(long producerTtl) throws JMSException {
super.onSend(producerTtl);
delegate.onSend();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index d63d4a3..64f1fbd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -161,6 +161,11 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
position = 0;
}
+ @Override
+ public boolean hasBody() {
+ return !list.isEmpty();
+ }
+
private List<Object> initializeEmptyBodyList(boolean useSequenceBody) {
List<Object> emptyList = new ArrayList<Object>();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index e3106e6..44ed9e6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -135,6 +135,15 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
setText(null);
}
+ @Override
+ public boolean hasBody() {
+ try {
+ return getText() != null;
+ } catch (JMSException e) {
+ return false;
+ }
+ }
+
Charset getCharset() {
return charset;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
index 7657343..3c1fa12 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
@@ -64,4 +64,7 @@ public interface AmqpObjectTypeDelegate {
void copyInto(AmqpObjectTypeDelegate copy) throws Exception;
boolean isAmqpTypeEncoded();
+
+ boolean hasBody();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
index 618d123..ec73ba9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -194,4 +194,13 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
return true;
}
}
+
+ @Override
+ public boolean hasBody() {
+ try {
+ return getObject() != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
index cc1038f..1296eaa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -168,6 +168,15 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
return true;
}
+ @Override
+ public boolean hasBody() {
+ try {
+ return getObject() != null;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
//----- Internal implementation ------------------------------------------//
private boolean isSupportedAmqpValueObjectType(Serializable serializable) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 3f5b84b..4928faa 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -266,8 +266,14 @@ public class JmsConnectionTest {
}
@Test(timeout=30000, expected=JMSException.class)
- public void testCreateDurableConnectionConsumer() throws Exception {
+ public void testCreateSharedConnectionConsumer() throws Exception {
connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
- connection.createDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
+ connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1);
+ }
+
+ @Test(timeout=30000, expected=JMSException.class)
+ public void testCreateSharedDurableConnectionConsumer() throws Exception {
+ connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
index 0f4a9ce..3986f8b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms;
import java.net.URI;
+import javax.jms.JMSContext;
+
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.mock.MockProviderFactory;
@@ -51,6 +53,13 @@ public class JmsConnectionTestSupport extends QpidJmsTestCase {
});
}
+ protected JmsContext createJMSContextToMockProvider() throws Exception {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+ JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
+
+ return context;
+ }
+
protected JmsConnection createConnectionToMockProvider() throws Exception {
return new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index a3a6ce2..2e559ac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -106,6 +106,30 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateAutoAckSessionByDefault() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ Session session = connection.createSession();
+ assertNotNull("Session should not be null", session);
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateAutoAckSessionUsingAckModeOnlyMethod() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ assertNotNull("Session should not be null", session);
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateTransactedSession() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
@@ -132,6 +156,32 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testCreateTransactedSessionUsingAckModeOnlyMethod() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin();
+ // Expect the session, with an immediate link to the transaction coordinator
+ // using a target with the expected capabilities only.
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ Session session = connection.createSession(Session.SESSION_TRANSACTED);
+ assertNotNull("Session should not be null", session);
+
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
index c0a3cbc..7c33fc1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
@@ -24,6 +24,7 @@ import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsConnectionFactory;
@@ -61,22 +62,80 @@ public class IntegrationTestFixture {
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
- String scheme = ssl ? "amqps" : "amqp";
- final String baseURI = scheme + "://localhost:" + testPeer.getServerPort();
- String remoteURI = baseURI;
- if (optionsString != null) {
- remoteURI = baseURI + optionsString;
- }
+ String remoteURI = buildURI(testPeer, ssl, optionsString);
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection("guest", "guest");
- if(setClientId) {
+ if (setClientId) {
// Set a clientId to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
}
assertNull(testPeer.getThrowable());
+
return connection;
}
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer) throws JMSException {
+ return createJMSContext(testPeer, null, null, null);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, int sessionMode) throws JMSException {
+ return createJMSContext(testPeer, false, null, null, null, true, sessionMode);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, String optionsString) throws JMSException {
+ return createJMSContext(testPeer, optionsString, null, null);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, Symbol[] serverCapabilities) throws JMSException {
+ return createJMSContext(testPeer, null, serverCapabilities, null);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties) throws JMSException {
+ return createJMSContext(testPeer, null, serverCapabilities, serverProperties);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties) throws JMSException {
+ return createJMSContext(testPeer, false, optionsString, serverCapabilities, serverProperties, true, JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException {
+ return createJMSContext(testPeer, false, optionsString, serverCapabilities, serverProperties, setClientId, JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId, int sessionMode) throws JMSException {
+ Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
+
+ testPeer.expectSaslPlainConnect("guest", "guest", desiredCapabilities, serverCapabilities, serverProperties);
+
+ // Each connection creates a session for managing temporary destinations etc
+ testPeer.expectBegin();
+
+ String remoteURI = buildURI(testPeer, ssl, optionsString);
+
+ ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+ JMSContext context = factory.createContext("guest", "guest", sessionMode);
+
+ if (setClientId) {
+ // Set a clientId to provoke the actual AMQP connection process to occur.
+ context.setClientID("clientName");
+ }
+
+ assertNull(testPeer.getThrowable());
+
+ return context;
+ }
+
+ String buildURI(TestAmqpPeer testPeer, boolean ssl, String optionsString) {
+ String scheme = ssl ? "amqps" : "amqp";
+ final String baseURI = scheme + "://localhost:" + testPeer.getServerPort();
+ String remoteURI = baseURI;
+ if (optionsString != null) {
+ remoteURI = baseURI + optionsString;
+ }
+
+ return remoteURI;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
new file mode 100644
index 0000000..adc546d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSConsumerIntegrationTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerIntegrationTest.class);
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout = 20000)
+ public void testCreateConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ Queue queue = context.createQueue("test");
+ JMSConsumer consumer = context.createConsumer(queue);
+ assertNotNull(consumer);
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testRemotelyCloseJMSConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ // Create a consumer, then remotely end it afterwards.
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, "resource closed");
+
+ Queue queue = context.createQueue("myQueue");
+ final JMSConsumer consumer = context.createConsumer(queue);
+
+ // Verify the consumer gets marked closed
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertTrue("JMSConsumer never closed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ consumer.getMessageListener();
+ } catch (IllegalStateRuntimeException jmsise) {
+ return true;
+ }
+ return false;
+ }
+ }, 10000, 10));
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ consumer.close();
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveMessageWithReceiveZeroTimeout() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue queue = context.createQueue("myQueue");
+
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ Message receivedMessage = messageConsumer.receive(0);
+
+ assertNotNull("A message should have been recieved", receivedMessage);
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testConsumerReceiveNoWaitThrowsIfConnectionLost() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue queue = context.createQueue("queue");
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow(false, notNullValue(UnsignedInteger.class));
+ testPeer.expectLinkFlow(true, notNullValue(UnsignedInteger.class));
+ testPeer.dropAfterLastHandler();
+
+ final JMSConsumer consumer = context.createConsumer(queue);
+
+ try {
+ consumer.receiveNoWait();
+ fail("An exception should have been thrown");
+ } catch (JMSRuntimeException jmsre) {
+ // Expected
+ }
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+ context.setAutoStart(false);
+
+ testPeer.expectBegin();
+
+ Queue destination = context.createQueue(getTestName());
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ JMSConsumer consumer = context.createConsumer(destination);
+
+ assertNull(consumer.receive(100));
+
+ context.start();
+
+ assertNotNull(consumer.receive(2000));
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testSyncReceiveFailsWhenListenerSet() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue destination = context.createQueue(getTestName());
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ JMSConsumer consumer = context.createConsumer(destination);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ LOG.warn("Async consumer got unexpected Message: {}", m);
+ }
+ });
+
+ try {
+ consumer.receive();
+ fail("Should have thrown an exception.");
+ } catch (JMSRuntimeException ex) {
+ }
+
+ try {
+ consumer.receive(1000);
+ fail("Should have thrown an exception.");
+ } catch (JMSRuntimeException ex) {
+ }
+
+ try {
+ consumer.receiveNoWait();
+ fail("Should have thrown an exception.");
+ } catch (JMSRuntimeException ex) {
+ }
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyMapMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue queue = context.createQueue("myQueue");
+
+ // Prepare an AMQP message for the test peer to send, containing an
+ // AmqpValue section holding a map with entries for each supported type,
+ // and annotated as a JMS map message.
+ String myBoolKey = "myBool";
+ boolean myBool = true;
+ String myByteKey = "myByte";
+ byte myByte = 4;
+ String myBytesKey = "myBytes";
+ byte[] myBytes = myBytesKey.getBytes();
+ String myCharKey = "myChar";
+ char myChar = 'd';
+ String myDoubleKey = "myDouble";
+ double myDouble = 1234567890123456789.1234;
+ String myFloatKey = "myFloat";
+ float myFloat = 1.1F;
+ String myIntKey = "myInt";
+ int myInt = Integer.MAX_VALUE;
+ String myLongKey = "myLong";
+ long myLong = Long.MAX_VALUE;
+ String myShortKey = "myShort";
+ short myShort = 25;
+ String myStringKey = "myString";
+ String myString = myStringKey;
+
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ map.put(myBoolKey, myBool);
+ map.put(myByteKey, myByte);
+ map.put(myBytesKey, new Binary(myBytes));// the underlying AMQP message uses Binary rather than byte[] directly.
+ map.put(myCharKey, myChar);
+ map.put(myDoubleKey, myDouble);
+ map.put(myFloatKey, myFloat);
+ map.put(myIntKey, myInt);
+ map.put(myLongKey, myLong);
+ map.put(myShortKey, myShort);
+ map.put(myStringKey, myString);
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_MAP_MESSAGE);
+
+ DescribedType amqpValueSectionContent = new AmqpValueDescribedType(map);
+
+ // receive the message from the test peer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> receivedMap = messageConsumer.receiveBody(Map.class, 3000);
+
+ // verify the content is as expected
+ assertNotNull("Map was not received", receivedMap);
+
+ assertEquals("Unexpected boolean value", myBool, receivedMap.get(myBoolKey));
+ assertEquals("Unexpected byte value", myByte, receivedMap.get(myByteKey));
+ byte[] readBytes = (byte[]) receivedMap.get(myBytesKey);
+ assertTrue("Read bytes were not as expected: " + Arrays.toString(readBytes), Arrays.equals(myBytes, readBytes));
+ assertEquals("Unexpected char value", myChar, receivedMap.get(myCharKey));
+ assertEquals("Unexpected double value", myDouble, (double) receivedMap.get(myDoubleKey), 0.0);
+ assertEquals("Unexpected float value", myFloat, (float) receivedMap.get(myFloatKey), 0.0);
+ assertEquals("Unexpected int value", myInt, receivedMap.get(myIntKey));
+ assertEquals("Unexpected long value", myLong, receivedMap.get(myLongKey));
+ assertEquals("Unexpected short value", myShort, receivedMap.get(myShortKey));
+ assertEquals("Unexpected UTF value", myString, receivedMap.get(myStringKey));
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyTextMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ final String content = "Message-Content";
+ Queue queue = context.createQueue("myQueue");
+
+ DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ String received = messageConsumer.receiveBody(String.class, 3000);
+
+ assertNotNull(received);
+ assertEquals(content, received);
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyObjectMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue queue = context.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+ properties.setContentType(Symbol.valueOf(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE));
+
+ String expectedContent = "expectedContent";
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(expectedContent);
+ oos.flush();
+ oos.close();
+ byte[] bytes = baos.toByteArray();
+
+ MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_OBJECT_MESSAGE);
+
+ DescribedType dataContent = new DataDescribedType(new Binary(bytes));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ String received = messageConsumer.receiveBody(String.class, 3000);
+
+ assertNotNull(received);
+ assertEquals(expectedContent, received);
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyBytesMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ Queue queue = context.createQueue("myQueue");
+
+ PropertiesDescribedType properties = new PropertiesDescribedType();
+ properties.setContentType(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+
+ MessageAnnotationsDescribedType msgAnnotations = null;
+ msgAnnotations = new MessageAnnotationsDescribedType();
+ msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE);
+
+ final byte[] expectedContent = "expectedContent".getBytes();
+ DescribedType dataContent = new DataDescribedType(new Binary(expectedContent));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ byte[] received = messageConsumer.receiveBody(byte[].class, 3000);
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ assertNotNull(received);
+ assertTrue(Arrays.equals(expectedContent, received));
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsDoesNotAcceptMessageAutoAck() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsDoesNotAcceptMessageDupsOk() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsDoesNotAcceptMessageClientAck() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void doTestReceiveBodyFailsDoesNotAcceptMessage(int sessionMode) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ final String content = "Message-Content";
+ Queue queue = context.createQueue("myQueue");
+
+ DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ try {
+ messageConsumer.receiveBody(Boolean.class, 3000);
+ fail("Should not read as Boolean type");
+ } catch (MessageFormatRuntimeException mfre) {
+ }
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsThenAcceptsOnSuccessfullyNextCallAutoAck() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsThenAcceptsOnSuccessfullyNextCallDupsOk() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.DUPS_OK_ACKNOWLEDGE);
+ }
+
+ @Test(timeout = 20000)
+ public void testReceiveBodyFailsThenGetNullOnNextAttemptClientAck() throws Exception {
+ doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void doTestReceiveBodyFailsThenCalledWithCorrectType(int sessionMode) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+
+ testPeer.expectBegin();
+
+ final String content = "Message-Content";
+ Queue queue = context.createQueue("myQueue");
+
+ DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+
+ JMSConsumer messageConsumer = context.createConsumer(queue);
+ try {
+ messageConsumer.receiveBody(Boolean.class, 3000);
+ fail("Should not read as Boolean type");
+ } catch (MessageFormatRuntimeException mfre) {
+ }
+
+ testPeer.waitForAllHandlersToComplete(3000);
+
+ if (sessionMode == JMSContext.AUTO_ACKNOWLEDGE ||
+ sessionMode == JMSContext.DUPS_OK_ACKNOWLEDGE) {
+
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ }
+
+ String received = messageConsumer.receiveBody(String.class, 3000);
+
+ if (sessionMode == JMSContext.AUTO_ACKNOWLEDGE ||
+ sessionMode == JMSContext.DUPS_OK_ACKNOWLEDGE) {
+
+ assertNotNull(received);
+ assertEquals(content, received);
+ } else {
+ assertNull(received);
+ }
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
new file mode 100644
index 0000000..7c3d2d1
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class JMSContextIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ private Symbol[] SERVER_ANONYMOUS_RELAY = new Symbol[]{ANONYMOUS_RELAY};
+
+ @Test(timeout = 20000)
+ public void testCreateAndCloseContext() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateContextWithClientId() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, true);
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateContextAndSetClientID() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, false);
+ context.setClientID(UUID.randomUUID().toString());
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateAutoAckSessionByDefault() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+ assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+ testPeer.expectBegin();
+ context.createTopic("TopicName");
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateContextWithTransactedSessionMode() throws Exception {
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, JMSContext.SESSION_TRANSACTED);
+ assertEquals(JMSContext.SESSION_TRANSACTED, context.getSessionMode());
+
+ // Session should be created and a coordinator should be attached since this
+ // should be a TX session, then a new TX is declared, once closed the TX should
+ // be discharged as a roll back.
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+ testPeer.expectDeclare(txnId);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ context.createTopic("TopicName");
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testCreateContextFromContextWithSessionsActive() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer);
+ assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+ testPeer.expectBegin();
+ context.createTopic("TopicName");
+
+ // Create a second should not create a new session yet, once a new connection is
+ // create on demand then close of the second context should only close the session
+ JMSContext other = context.createContext(JMSContext.CLIENT_ACKNOWLEDGE);
+ assertEquals(JMSContext.CLIENT_ACKNOWLEDGE, other.getSessionMode());
+ testPeer.expectBegin();
+ testPeer.expectEnd();
+ other.createTopic("TopicName");
+ other.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ // Now the connection should close down.
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testOnlyOneProducerCreatedInSingleContext() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ // One producer created should send an attach.
+ JMSProducer producer1 = context.createProducer();
+ assertNotNull(producer1);
+
+ // An additional one should not result in an attach
+ JMSProducer producer2 = context.createProducer();
+ assertNotNull(producer2);
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testEachContextGetsItsOwnProducer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ // One producer created should send an attach.
+ JMSProducer producer1 = context.createProducer();
+ assertNotNull(producer1);
+
+ // An additional one should not result in an attach
+ JMSContext other = context.createContext(JMSContext.AUTO_ACKNOWLEDGE);
+ JMSProducer producer2 = other.createProducer();
+ assertNotNull(producer2);
+
+ testPeer.expectEnd();
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ other.close();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
new file mode 100644
index 0000000..4096c1f
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
+import javax.jms.Message;
+import javax.jms.Queue;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class JMSProducerIntegrationTest extends QpidJmsTestCase {
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ private Symbol[] SERVER_ANONYMOUS_RELAY = new Symbol[]{ANONYMOUS_RELAY};
+
+ private static final String NULL_STRING_PROP = "nullStringProperty";
+ private static final String NULL_STRING_PROP_VALUE = null;
+ private static final String STRING_PROP = "stringProperty";
+ private static final String STRING_PROP_VALUE = "string";
+ private static final String BOOLEAN_PROP = "booleanProperty";
+ private static final boolean BOOLEAN_PROP_VALUE = true;
+ private static final String BYTE_PROP = "byteProperty";
+ private static final byte BYTE_PROP_VALUE = (byte)1;
+ private static final String SHORT_PROP = "shortProperty";
+ private static final short SHORT_PROP_VALUE = (short)1;
+ private static final String INT_PROP = "intProperty";
+ private static final int INT_PROP_VALUE = Integer.MAX_VALUE;
+ private static final String LONG_PROP = "longProperty";
+ private static final long LONG_PROP_VALUE = Long.MAX_VALUE;
+ private static final String FLOAT_PROP = "floatProperty";
+ private static final float FLOAT_PROP_VALUE = Float.MAX_VALUE;
+ private static final String DOUBLE_PROP = "doubleProperty";
+ private static final double DOUBLE_PROP_VALUE = Double.MAX_VALUE;
+
+ @Test(timeout = 20000)
+ public void testCreateProducer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ JMSProducer producer = context.createProducer();
+ assertNotNull(producer);
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJMSProducerHasDefaultConfiguration() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ JMSProducer producer = context.createProducer();
+ assertNotNull(producer);
+
+ assertEquals(Message.DEFAULT_DELIVERY_DELAY, producer.getDeliveryDelay());
+ assertEquals(Message.DEFAULT_DELIVERY_MODE, producer.getDeliveryMode());
+ assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
+ assertEquals(Message.DEFAULT_TIME_TO_LIVE, producer.getTimeToLive());
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJMSProducerSetPropertySendsApplicationProperties() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ String queueName = "myQueue";
+ Queue queue = context.createQueue(queueName);
+ JMSProducer producer = context.createProducer();
+
+ ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
+ appPropsMatcher.withEntry(NULL_STRING_PROP, nullValue());
+ appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
+ appPropsMatcher.withEntry(BOOLEAN_PROP, equalTo(BOOLEAN_PROP_VALUE));
+ appPropsMatcher.withEntry(BYTE_PROP, equalTo(BYTE_PROP_VALUE));
+ appPropsMatcher.withEntry(SHORT_PROP, equalTo(SHORT_PROP_VALUE));
+ appPropsMatcher.withEntry(INT_PROP, equalTo(INT_PROP_VALUE));
+ appPropsMatcher.withEntry(LONG_PROP, equalTo(LONG_PROP_VALUE));
+ appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
+ appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ producer.setProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE);
+ producer.setProperty(STRING_PROP, STRING_PROP_VALUE);
+ producer.setProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE);
+ producer.setProperty(BYTE_PROP, BYTE_PROP_VALUE);
+ producer.setProperty(SHORT_PROP, SHORT_PROP_VALUE);
+ producer.setProperty(INT_PROP, INT_PROP_VALUE);
+ producer.setProperty(LONG_PROP, LONG_PROP_VALUE);
+ producer.setProperty(FLOAT_PROP, FLOAT_PROP_VALUE);
+ producer.setProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE);
+
+ producer.send(queue, "test");
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testJMSProducerPropertyOverridesMessageValue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ String queueName = "myQueue";
+ Queue queue = context.createQueue(queueName);
+ Message message = context.createMessage();
+ JMSProducer producer = context.createProducer();
+
+ ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
+ appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ message.setStringProperty(STRING_PROP, "ThisShouldNotBeTransmitted");
+ producer.setProperty(STRING_PROP, STRING_PROP_VALUE);
+ producer.send(queue, message);
+
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ context.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 8141751..10bcffd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -20,8 +20,10 @@ package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.LinkedHashMap;
@@ -31,6 +33,7 @@ import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -116,10 +119,10 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
+ testPeer.expectClose();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
- testPeer.waitForAllHandlersToComplete(3000);
// verify the content is as expected
assertNotNull("Message was not received", receivedMessage);
@@ -137,6 +140,23 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
assertEquals("Unexpected long value", myLong, receivedMapMessage.getLong(myLongKey));
assertEquals("Unexpected short value", myShort, receivedMapMessage.getShort(myShortKey));
assertEquals("Unexpected UTF value", myString, receivedMapMessage.getString(myStringKey));
+
+ assertTrue(receivedMapMessage.isBodyAssignableTo(Map.class));
+ assertTrue(receivedMapMessage.isBodyAssignableTo(Object.class));
+ assertFalse(receivedMapMessage.isBodyAssignableTo(Boolean.class));
+ assertFalse(receivedMapMessage.isBodyAssignableTo(byte[].class));
+
+ assertNotNull(receivedMapMessage.getBody(Object.class));
+ assertNotNull(receivedMapMessage.getBody(Map.class));
+ try {
+ receivedMapMessage.getBody(byte[].class);
+ fail("Cannot read TextMessage with this type.");
+ } catch (MessageFormatException mfe) {
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
}
}
@@ -219,9 +239,28 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
messageMatcher.setPropertiesMatcher(propertiesMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
- // send the message
testPeer.expectTransfer(messageMatcher);
+ testPeer.expectClose();
+
+ // send the message
producer.send(mapMessage);
+
+ assertTrue(mapMessage.isBodyAssignableTo(Map.class));
+ assertTrue(mapMessage.isBodyAssignableTo(Object.class));
+ assertFalse(mapMessage.isBodyAssignableTo(Boolean.class));
+ assertFalse(mapMessage.isBodyAssignableTo(byte[].class));
+
+ assertNotNull(mapMessage.getBody(Object.class));
+ assertNotNull(mapMessage.getBody(Map.class));
+ try {
+ mapMessage.getBody(byte[].class);
+ fail("Cannot read TextMessage with this type.");
+ } catch (MessageFormatException mfe) {
+ }
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(3000);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org