You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2015/08/26 04:51:50 UTC
[2/2] qpid-interop-test git commit: QPIDIT-17: Add JMS test suite
with Qpid-JMS and Proton-Python clients.
QPIDIT-17: Add JMS test suite with Qpid-JMS and Proton-Python clients.
Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/ebdacb0d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/ebdacb0d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/ebdacb0d
Branch: refs/heads/master
Commit: ebdacb0dd9063d73b49b681d39f7450fa05b391b
Parents: 77eba67
Author: Kim van der Riet <kp...@apache.org>
Authored: Tue Aug 25 22:48:34 2015 -0400
Committer: Kim van der Riet <kp...@apache.org>
Committed: Tue Aug 25 22:48:34 2015 -0400
----------------------------------------------------------------------
jars/javax.json-1.0.4.jar | Bin 0 -> 85147 bytes
jars/javax.json-api-1.0.jar | Bin 0 -> 19754 bytes
java-build.sh | 15 +
java-clean.sh | 5 +
shims/qpid-jms/java-build.sh | 7 +-
.../qpid/interop_test/shim/AmqpReceiver.java | 271 ++++++++++++++
.../qpid/interop_test/shim/AmqpSender.java | 260 +++++++++++++
.../qpid/interop_test/shim/JmsReceiverShim.java | 349 ++++++++++++++++++
.../qpid/interop_test/shim/JmsSenderShim.java | 368 +++++++++++++++++++
.../interop_test/shim/ProtonJmsReceiver.java | 269 --------------
.../qpid/interop_test/shim/ProtonJmsSender.java | 258 -------------
shims/qpid-proton-python/src/amqp-receive | 102 +++++
shims/qpid-proton-python/src/amqp-send | 135 +++++++
.../qpid-proton-python/src/jms-receiver-shim.py | 234 ++++++++++++
shims/qpid-proton-python/src/jms-sender-shim.py | 241 ++++++++++++
.../src/proton-python-receive | 106 ------
shims/qpid-proton-python/src/proton-python-send | 136 -------
.../interop_test/obj_util/BytesToJavaObj.java | 83 +++++
.../interop_test/obj_util/JavaObjToBytes.java | 129 +++++++
src/py/qpid-interop-test/__init__.py | 1 +
src/py/qpid-interop-test/interop_test_errors.py | 29 ++
.../qpid-interop-test/jms/jms_message_tests.py | 367 ++++++++++++++++++
src/py/qpid-interop-test/shim_utils.py | 2 +-
src/py/qpid-interop-test/shim_utils.pyc | Bin 14712 -> 0 bytes
.../types/simple_type_tests.py | 42 +--
25 files changed, 2612 insertions(+), 797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/jars/javax.json-1.0.4.jar
----------------------------------------------------------------------
diff --git a/jars/javax.json-1.0.4.jar b/jars/javax.json-1.0.4.jar
new file mode 100644
index 0000000..09967d8
Binary files /dev/null and b/jars/javax.json-1.0.4.jar differ
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/jars/javax.json-api-1.0.jar
----------------------------------------------------------------------
diff --git a/jars/javax.json-api-1.0.jar b/jars/javax.json-api-1.0.jar
new file mode 100644
index 0000000..d276c79
Binary files /dev/null and b/jars/javax.json-api-1.0.jar differ
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/java-build.sh
----------------------------------------------------------------------
diff --git a/java-build.sh b/java-build.sh
new file mode 100755
index 0000000..27de828
--- /dev/null
+++ b/java-build.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+# JARS
+#JMS_API=${HOME}/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar:${HOME}/.m2/repository/org/apache/qpid/qpid-jms-client/0.4.0-SNAPSHOT/qpid-jms-client-0.4.0-SNAPSHOT.jar
+#JSON_API=../../jars/javax.json-api-1.0.jar
+#CLASSPATH=${JMS_API}:${JSON_API}
+CLASSPATH=
+
+BASEPATH=org/apache/qpid/interop_test/obj_util
+SRCPATH=src/main/java/${BASEPATH}
+TARGETPATH=target
+
+mkdir -p ${TARGETPATH}/classes
+javac -cp ${CLASSPATH} -Xlint:unchecked -d ${TARGETPATH}/classes ${SRCPATH}/JavaObjToBytes.java ${SRCPATH}/BytesToJavaObj.java
+jar -cf ${TARGETPATH}/JavaObjUtils.jar -C ${TARGETPATH}/classes ${BASEPATH}/JavaObjToBytes.class -C ${TARGETPATH}/classes ${BASEPATH}/BytesToJavaObj.class
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/java-clean.sh
----------------------------------------------------------------------
diff --git a/java-clean.sh b/java-clean.sh
new file mode 100755
index 0000000..7fc8be1
--- /dev/null
+++ b/java-clean.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+TARGETPATH=target
+
+rm -rf ${TARGETPATH}/*.jar ${TARGETPATH}/classes/*
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/java-build.sh
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/java-build.sh b/shims/qpid-jms/java-build.sh
index 78f5bfa..6afe72f 100755
--- a/shims/qpid-jms/java-build.sh
+++ b/shims/qpid-jms/java-build.sh
@@ -2,12 +2,13 @@
# JARS
JMS_API=${HOME}/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar:${HOME}/.m2/repository/org/apache/qpid/qpid-jms-client/0.4.0-SNAPSHOT/qpid-jms-client-0.4.0-SNAPSHOT.jar
-CLASSPATH=${JMS_API}
+JSON_API=../../jars/javax.json-api-1.0.jar
+CLASSPATH=${JMS_API}:${JSON_API}
BASEPATH=org/apache/qpid/interop_test/shim
SRCPATH=src/main/java/${BASEPATH}
TARGETPATH=target
mkdir -p ${TARGETPATH}/classes
-javac -cp ${CLASSPATH} -d ${TARGETPATH}/classes ${SRCPATH}/ProtonJmsSender.java ${SRCPATH}/ProtonJmsReceiver.java
-jar -cf ${TARGETPATH}/qpid-jms-shim.jar -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver\$MyExceptionListener.class
+javac -cp ${CLASSPATH} -Xlint:unchecked -d ${TARGETPATH}/classes ${SRCPATH}/AmqpSender.java ${SRCPATH}/AmqpReceiver.java ${SRCPATH}/JmsSenderShim.java ${SRCPATH}/JmsReceiverShim.java
+jar -cf ${TARGETPATH}/qpid-jms-shim.jar -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim\$MyExceptionListener.class
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
new file mode 100644
index 0000000..cf3ad81
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+import java.util.Vector;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class AmqpReceiver {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final int TIMEOUT = 1000;
+ private static final String[] SUPPORTED_AMQP_TYPES = {"null",
+ "boolean",
+ "ubyte",
+ "ushort",
+ "uint",
+ "ulong",
+ "byte",
+ "short",
+ "int",
+ "long",
+ "float",
+ "double",
+ "decimal32",
+ "decimal64",
+ "decimal128",
+ "char",
+ "timestamp",
+ "uuid",
+ "binary",
+ "string",
+ "symbol",
+ "list",
+ "map",
+ "array"};
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("AmqpReceiver: Insufficient number of arguments");
+ System.out.println("AmqpReceiver: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String amqpType = args[2];
+ int numTestValues = Integer.parseInt(args[3]);
+ Connection connection = null;
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ connection = factory.createConnection(USER, PASSWORD);
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ Vector<String> outList = new Vector<String>();
+ outList.add(amqpType);
+ if (isSupportedAmqpType(amqpType)) {
+ int actualCount = 0;
+ Message message = null;
+ for (int i = 1; i <= numTestValues; i++, actualCount++) {
+ message = messageConsumer.receive(TIMEOUT);
+ if (message == null)
+ break;
+ switch (amqpType) {
+ case "null":
+ long bodyLength = ((BytesMessage)message).getBodyLength();
+ if (bodyLength == 0L) {
+ outList.add("None");
+ } else {
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Expected 0 bytes, read " + bodyLength);
+ }
+ break;
+ case "boolean":
+ String bs = String.valueOf(((BytesMessage)message).readBoolean());
+ outList.add(Character.toUpperCase(bs.charAt(0)) + bs.substring(1));
+ break;
+ case "ubyte":
+ byte byteValue = ((BytesMessage)message).readByte();
+ short ubyteValue = (short)(byteValue & 0xff);
+ outList.add(String.valueOf(ubyteValue));
+ break;
+ case "ushort":
+ {
+ byte[] byteArray = new byte[2];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 2) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 2 bytes, read " + numBytes);
+ }
+ int ushortValue = 0;
+ for (int j=0; j<byteArray.length; j++) {
+ ushortValue = (ushortValue << 8) + (byteArray[j] & 0xff);
+ }
+ outList.add(String.valueOf(ushortValue));
+ break;
+ }
+ case "uint":
+ {
+ byte[] byteArray = new byte[4];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 4) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 4 bytes, read " + numBytes);
+ }
+ long uintValue = 0;
+ for (int j=0; j<byteArray.length; j++) {
+ uintValue = (uintValue << 8) + (byteArray[j] & 0xff);
+ }
+ outList.add(String.valueOf(uintValue));
+ break;
+ }
+ case "ulong":
+ case "timestamp":
+ {
+ // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
+ byte[] byteArray = new byte[8];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 8) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 8 bytes, read " + numBytes);
+ }
+ // TODO: shortcut in use here - this byte array should go through a Java type that can represent this as a number - such as BigInteger.
+ outList.add(String.format("0x%02x%02x%02x%02x%02x%02x%02x%02x", byteArray[0], byteArray[1],
+ byteArray[2], byteArray[3], byteArray[4], byteArray[5], byteArray[6], byteArray[7]));
+ break;
+ }
+ case "byte":
+ outList.add(String.valueOf(((BytesMessage)message).readByte()));
+ break;
+ case "short":
+ outList.add(String.valueOf(((BytesMessage)message).readShort()));
+ break;
+ case "int":
+ outList.add(String.valueOf(((BytesMessage)message).readInt()));
+ break;
+ case "long":
+ outList.add(String.valueOf(((BytesMessage)message).readLong()));
+ break;
+ case "float":
+ float f = ((BytesMessage)message).readFloat();
+ int i0 = Float.floatToRawIntBits(f);
+ outList.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "double":
+ double d = ((BytesMessage)message).readDouble();
+ long l = Double.doubleToRawLongBits(d);
+ outList.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "decimal32":
+ BigDecimal bd32 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd32.toString());
+ break;
+ case "decimal64":
+ BigDecimal bd64 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd64.toString());
+ break;
+ case "decimal128":
+ BigDecimal bd128 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd128.toString());
+ break;
+ case "char":
+ outList.add(String.format("%c", ((BytesMessage)message).readChar()));
+ break;
+ case "uuid":
+ UUID uuid = (UUID)((ObjectMessage)message).getObject();
+ outList.add(uuid.toString());
+ break;
+ case "binary":
+ BytesMessage bm = (BytesMessage)message;
+ int msgLen = (int)bm.getBodyLength();
+ byte[] ba = new byte[msgLen];
+ if (bm.readBytes(ba) == msgLen) {
+ outList.add(new String(ba));
+ } else {
+ // TODO: Raise exception or error here: size mismatch
+ }
+ break;
+ case "string":
+ outList.add(((TextMessage)message).getText());
+ break;
+ case "symbol":
+ outList.add(((BytesMessage)message).readUTF());
+ break;
+ case "list":
+ break;
+ case "map":
+ break;
+ case "array":
+ break;
+ default:
+ // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
+ connection.close();
+ throw new Exception("AmqpReceiver: Internal error: unsupported AMQP type \"" + amqpType + "\"");
+ }
+ }
+ } else {
+ System.out.println("ERROR: AmqpReceiver: AMQP type \"" + amqpType + "\" is not supported");
+ connection.close();
+ System.exit(1);
+ }
+
+ connection.close();
+
+ // No exception, print results
+ for (int i=0; i<outList.size(); i++) {
+ System.out.println(outList.get(i));
+ }
+ } catch (Exception exp) {
+ if (connection != null)
+ connection.close();
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static boolean isSupportedAmqpType(String amqpType) {
+ for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
+ if (amqpType.equals(supportedAmqpType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
new file mode 100644
index 0000000..3fc5a90
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+import java.util.Arrays;
+import java.util.UUID;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class AmqpSender {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final String[] SUPPORTED_AMQP_TYPES = {"null",
+ "boolean",
+ "ubyte",
+ "ushort",
+ "uint",
+ "ulong",
+ "byte",
+ "short",
+ "int",
+ "long",
+ "float",
+ "double",
+ "decimal32",
+ "decimal64",
+ "decimal128",
+ "char",
+ "timestamp",
+ "uuid",
+ "binary",
+ "string",
+ "symbol",
+ "list",
+ "map",
+ "array"};
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("AmqpSender: Insufficient number of arguments");
+ System.out.println("AmqpSender: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String amqpType = args[2];
+ String[] testValueList = Arrays.copyOfRange(args, 3, args.length); // Use remaining args as test values
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ Connection connection = factory.createConnection();
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ if (isSupportedAmqpType(amqpType)) {
+ Message message = null;
+ for (String testValueStr : testValueList) {
+ switch (amqpType) {
+ case "null":
+ message = session.createBytesMessage();
+ break;
+ case "boolean":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBoolean(Boolean.parseBoolean(testValueStr));
+ break;
+ case "ubyte":
+ {
+ byte testValue = (byte)Short.parseShort(testValueStr);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeByte(testValue);
+ break;
+ }
+ case "ushort":
+ {
+ int testValue = Integer.parseInt(testValueStr);
+ byte[] byteArray = new byte[2];
+ byteArray[0] = (byte)(testValue >> 8);
+ byteArray[1] = (byte)(testValue);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "uint":
+ {
+ long testValue = Long.parseLong(testValueStr);
+ byte[] byteArray = new byte[4];
+ byteArray[0] = (byte)(testValue >> 24);
+ byteArray[1] = (byte)(testValue >> 16);
+ byteArray[2] = (byte)(testValue >> 8);
+ byteArray[3] = (byte)(testValue);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "ulong":
+ {
+ // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
+ BigInteger testValue = new BigInteger(testValueStr);
+ byte[] bigIntArray = testValue.toByteArray(); // may be 1 to 9 bytes depending on number
+ byte[] byteArray = {0, 0, 0, 0, 0, 0, 0, 0};
+ int effectiveBigIntArrayLen = bigIntArray.length > 8 ? 8 : bigIntArray.length; // Cap length at 8
+ int bigIntArrayOffs = bigIntArray.length > 8 ? bigIntArray.length - 8 : 0; // Offset when length > 8
+ for (int i=0; i<bigIntArray.length && i < 8; i++)
+ byteArray[8 - effectiveBigIntArrayLen + i] = bigIntArray[bigIntArrayOffs + i];
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "byte":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeByte(Byte.parseByte(testValueStr));
+ break;
+ case "short":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeShort(Short.parseShort(testValueStr));
+ break;
+ case "int":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeInt(Integer.parseInt(testValueStr));
+ break;
+ case "long":
+ case "timestamp":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeLong(Long.parseLong(testValueStr));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValueStr.substring(2), 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValueStr.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValueStr.substring(3), 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "decimal32":
+ BigDecimal bd32 = new BigDecimal(testValueStr, MathContext.DECIMAL32);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd32);
+ break;
+ case "decimal64":
+ BigDecimal bd64 = new BigDecimal(testValueStr, MathContext.DECIMAL64);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd64);
+ break;
+ case "decimal128":
+ BigDecimal bd128 = new BigDecimal(testValueStr, MathContext.DECIMAL128);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd128);
+ break;
+ case "char":
+ char c = 0;
+ if (testValueStr.length() == 1) // Single char
+ c = testValueStr.charAt(0);
+ else if (testValueStr.length() == 6) // unicode format
+ c = (char)Integer.parseInt(testValueStr, 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeChar(c);
+ break;
+ case "uuid":
+ UUID uuid = UUID.fromString(testValueStr);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(uuid);
+ break;
+ case "binary":
+ message = session.createBytesMessage();
+ byte[] byteArray = testValueStr.getBytes();
+ ((BytesMessage)message).writeBytes(byteArray, 0, byteArray.length);
+ break;
+ case "string":
+ message = session.createTextMessage(testValueStr);
+ break;
+ case "symbol":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeUTF(testValueStr);
+ break;
+ case "list":
+ break;
+ case "map":
+ break;
+ case "array":
+ break;
+ default:
+ // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
+ connection.close();
+ throw new Exception("AmqpSender: Internal error: unsupported AMQP type \"" + amqpType + "\"");
+ }
+ messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ } else {
+ System.out.println("ERROR: AmqpSender: AMQP type \"" + amqpType + "\" is not supported");
+ connection.close();
+ System.exit(1);
+ }
+
+ connection.close();
+ } catch (Exception exp) {
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static boolean isSupportedAmqpType(String amqpType) {
+ for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
+ if (amqpType.equals(supportedAmqpType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
new file mode 100644
index 0000000..f567638
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonReader;
+import javax.json.JsonWriter;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class JmsReceiverShim {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final int TIMEOUT = 1000;
+ private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE",
+ "JMS_MAPMESSAGE_TYPE",
+ "JMS_OBJECTMESSAGE_TYPE",
+ "JMS_STREAMMESSAGE_TYPE",
+ "JMS_TEXTMESSAGE_TYPE"};
+
+ // args[0]: Broker URL
+ // args[1]: Queue name
+ // args[2]: JMS message type
+ // args[3]: JSON Test number map
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("JmsReceiverShim: Insufficient number of arguments");
+ System.out.println("JmsReceiverShim: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String jmsMessageType = args[2];
+ if (!isSupportedJmsMessageType(jmsMessageType)) {
+ System.out.println("ERROR: JmsReceiverShim: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ System.exit(1);
+ }
+
+ JsonReader jsonReader = Json.createReader(new StringReader(args[3]));
+ JsonObject numTestValuesMap = jsonReader.readObject();
+ jsonReader.close();
+
+ Connection connection = null;
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ connection = factory.createConnection(USER, PASSWORD);
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ List<String> keyList = new ArrayList<String>(numTestValuesMap.keySet());
+ Collections.sort(keyList);
+
+ Message message = null;
+ JsonObjectBuilder job = Json.createObjectBuilder();
+ for (String key: keyList) {
+ JsonArrayBuilder jab = Json.createArrayBuilder();
+ for (int i=0; i<numTestValuesMap.getJsonNumber(key).intValue(); ++i) {
+ message = messageConsumer.receive(TIMEOUT);
+ if (message == null) break;
+ switch (jmsMessageType) {
+ case "JMS_BYTESMESSAGE_TYPE":
+ switch (key) {
+ case "boolean":
+ jab.add(((BytesMessage)message).readBoolean()?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((BytesMessage)message).readByte()));
+ break;
+ case "bytes":
+ {
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead)));
+ } else {
+ // NOTE: For this case, an empty byte array has nothing to return
+ jab.add(new String());
+ }
+ }
+ break;
+ case "char":
+ jab.add(formatChar(((BytesMessage)message).readChar()));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((BytesMessage)message).readDouble());
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((BytesMessage)message).readFloat());
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((BytesMessage)message).readInt()));
+ break;
+ case "long":
+ jab.add(formatLong(((BytesMessage)message).readLong()));
+ break;
+ case "object":
+ {
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(Arrays.copyOfRange(bytesBuff, 0, numBytesRead));
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ Object obj = ois.readObject();
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ } else {
+ jab.add("<object error>");
+ }
+ }
+ break;
+ case "short":
+ jab.add(formatShort(((BytesMessage)message).readShort()));
+ break;
+ case "string":
+ jab.add(((BytesMessage)message).readUTF());
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_STREAMMESSAGE_TYPE":
+ switch (key) {
+ case "boolean":
+ jab.add(((StreamMessage)message).readBoolean()?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((StreamMessage)message).readByte()));
+ break;
+ case "bytes":
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((StreamMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead)));
+ } else {
+ System.out.println("StreamMessage.readBytes() returned " + numBytesRead);
+ jab.add("<bytes error>");
+ }
+ break;
+ case "char":
+ jab.add(formatChar(((StreamMessage)message).readChar()));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((StreamMessage)message).readDouble());
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((StreamMessage)message).readFloat());
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((StreamMessage)message).readInt()));
+ break;
+ case "long":
+ jab.add(formatLong(((StreamMessage)message).readLong()));
+ break;
+ case "object":
+ Object obj = ((StreamMessage)message).readObject();
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ break;
+ case "short":
+ jab.add(formatShort(((StreamMessage)message).readShort()));
+ break;
+ case "string":
+ jab.add(((StreamMessage)message).readString());
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_MAPMESSAGE_TYPE":
+ String name = String.format("%s%03d", key, i);
+ switch (key) {
+ case "boolean":
+ jab.add(((MapMessage)message).getBoolean(name)?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((MapMessage)message).getByte(name)));
+ break;
+ case "bytes":
+ jab.add(new String(((MapMessage)message).getBytes(name)));
+ break;
+ case "char":
+ jab.add(formatChar(((MapMessage)message).getChar(name)));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((MapMessage)message).getDouble(name));
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((MapMessage)message).getFloat(name));
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((MapMessage)message).getInt(name)));
+ break;
+ case "long":
+ jab.add(formatLong(((MapMessage)message).getLong(name)));
+ break;
+ case "object":
+ Object obj = ((MapMessage)message).getObject(name);
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ break;
+ case "short":
+ jab.add(formatShort(((MapMessage)message).getShort(name)));
+ break;
+ case "string":
+ jab.add(((MapMessage)message).getString(name));
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_OBJECTMESSAGE_TYPE":
+ jab.add(((ObjectMessage)message).getObject().toString());
+ break;
+ case "JMS_TEXTMESSAGE_TYPE":
+ jab.add(((TextMessage)message).getText());
+ break;
+ default:
+ connection.close();
+ throw new Exception("JmsReceiverShim: Internal error: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ }
+ }
+ job.add(key, jab);
+ }
+ connection.close();
+
+ System.out.println(jmsMessageType);
+ StringWriter out = new StringWriter();
+ JsonWriter jsonWriter = Json.createWriter(out);
+ jsonWriter.writeObject(job.build());
+ jsonWriter.close();
+ System.out.println(out.toString());
+ } catch (Exception exp) {
+ if (connection != null)
+ connection.close();
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static String formatByte(byte b) {
+ boolean neg = false;
+ if (b < 0) {
+ neg = true;
+ b = (byte)-b;
+ }
+ return String.format("%s0x%x", neg?"-":"", b);
+ }
+
+ protected static String formatChar(char c) {
+ if (Character.isLetterOrDigit(c)) {
+ return String.format("%c", c);
+ }
+ char[] ca = {c};
+ return new String(ca);
+ }
+
+ protected static String formatInt(int i) {
+ boolean neg = false;
+ if (i < 0) {
+ neg = true;
+ i = -i;
+ }
+ return String.format("%s0x%x", neg?"-":"", i);
+ }
+
+ protected static String formatLong(long l) {
+ boolean neg = false;
+ if (l < 0) {
+ neg = true;
+ l = -l;
+ }
+ return String.format("%s0x%x", neg?"-":"", l);
+ }
+
+ protected static String formatShort(int s) {
+ boolean neg = false;
+ if (s < 0) {
+ neg = true;
+ s = -s;
+ }
+ return String.format("%s0x%x", neg?"-":"", s);
+ }
+
+ protected static boolean isSupportedJmsMessageType(String jmsMessageType) {
+ for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) {
+ if (jmsMessageType.equals(supportedJmsMessageType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
new file mode 100644
index 0000000..e22be0a
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.io.Serializable;
+import java.io.StringReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class JmsSenderShim {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE",
+ "JMS_MAPMESSAGE_TYPE",
+ "JMS_OBJECTMESSAGE_TYPE",
+ "JMS_STREAMMESSAGE_TYPE",
+ "JMS_TEXTMESSAGE_TYPE"};
+
+ // args[0]: Broker URL
+ // args[1]: Queue name
+ // args[2]: JMS message type
+ // args[3]: JSON Test value map
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("JmsSenderShim: Insufficient number of arguments");
+ System.out.println("JmsSenderShim: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String jmsMessageType = args[2];
+ if (!isSupportedJmsMessageType(jmsMessageType)) {
+ System.out.println("ERROR: JmsReceiver: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ System.exit(1);
+ }
+
+ JsonReader jsonReader = Json.createReader(new StringReader(args[3]));
+ JsonObject testValuesMap = jsonReader.readObject();
+ jsonReader.close();
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ Connection connection = factory.createConnection();
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ Message message = null;
+ List<String> keyList = new ArrayList<String>(testValuesMap.keySet());
+ Collections.sort(keyList);
+ for (String key: keyList) {
+ JsonArray testValues = testValuesMap.getJsonArray(key);
+ for (int i=0; i<testValues.size(); ++i) {
+ String testValue = testValues.getJsonString(i).getString();
+ switch (jmsMessageType) {
+ case "JMS_BYTESMESSAGE_TYPE":
+ message = createBytesMessage(session, key, testValue);
+ break;
+ case "JMS_MAPMESSAGE_TYPE":
+ message = createMapMessage(session, key, testValue, i);
+ break;
+ case "JMS_OBJECTMESSAGE_TYPE":
+ message = createObjectMessage(session, key, testValue);
+ break;
+ case "JMS_STREAMMESSAGE_TYPE":
+ message = createStreamMessage(session, key, testValue);
+ break;
+ case "JMS_TEXTMESSAGE_TYPE":
+ message = createTextMessage(session, testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message type \"" + jmsMessageType + "\"");
+ }
+ messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ connection.close();
+ } catch (Exception exp) {
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static BytesMessage createBytesMessage(Session session, String testValueType, String testValue) throws Exception, JMSException {
+ BytesMessage message = session.createBytesMessage();
+ switch (testValueType) {
+ case "boolean":
+ message.writeBoolean(Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.writeByte(Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.writeBytes(testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X" or "\xNN"
+ message.writeChar(testValue.charAt(0));
+ } else {
+ throw new Exception("JmsSenderShim.createBytesMessage() Malformed char string: \"" + testValue + "\" of length " + testValue.length());
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.writeInt(Integer.decode(testValue));
+ break;
+ case "long":
+ message.writeLong(Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.writeObject(obj);
+ break;
+ case "short":
+ message.writeShort(Short.decode(testValue));
+ break;
+ case "string":
+ message.writeUTF(testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static MapMessage createMapMessage(Session session, String testValueType, String testValue, int testValueNum) throws Exception, JMSException {
+ MapMessage message = session.createMapMessage();
+ String name = String.format("%s%03d", testValueType, testValueNum);
+ switch (testValueType) {
+ case "boolean":
+ message.setBoolean(name, Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.setByte(name, Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.setBytes(name, testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X"
+ message.setChar(name, testValue.charAt(0));
+ } else if (testValue.length() == 6) { // Char format: "\xNNNN"
+ message.setChar(name, (char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createMapMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.setDouble(name, Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.setFloat(name, Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.setInt(name, Integer.decode(testValue));
+ break;
+ case "long":
+ message.setLong(name, Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.setObject(name, obj);
+ break;
+ case "short":
+ message.setShort(name, Short.decode(testValue));
+ break;
+ case "string":
+ message.setString(name, testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static ObjectMessage createObjectMessage(Session session, String className, String testValue) throws Exception, JMSException {
+ Serializable obj = createJavaObject(className, testValue);
+ if (obj == null) {
+ // TODO: Handle error here
+ System.out.println("createObjectMessage: obj == null");
+ return null;
+ }
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(obj);
+ return message;
+ }
+
+ protected static StreamMessage createStreamMessage(Session session, String testValueType, String testValue) throws Exception, JMSException {
+ StreamMessage message = session.createStreamMessage();
+ switch (testValueType) {
+ case "boolean":
+ message.writeBoolean(Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.writeByte(Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.writeBytes(testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X"
+ message.writeChar(testValue.charAt(0));
+ } else if (testValue.length() == 6) { // Char format: "\xNNNN"
+ message.writeChar((char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.writeInt(Integer.decode(testValue));
+ break;
+ case "long":
+ message.writeLong(Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.writeObject(obj);
+ break;
+ case "short":
+ message.writeShort(Short.decode(testValue));
+ break;
+ case "string":
+ message.writeString(testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static Serializable createJavaObject(String className, String testValue) throws Exception {
+ Serializable obj = null;
+ try {
+ Class<?> c = Class.forName(className);
+ if (className.compareTo("java.lang.Character") == 0) {
+ Constructor ctor = c.getConstructor(char.class);
+ if (testValue.length() == 1) {
+ // Use first character of string
+ obj = (Serializable)ctor.newInstance(testValue.charAt(0));
+ } else if (testValue.length() == 4 || testValue.length() == 6) {
+ // Format '\xNN' or '\xNNNN'
+ obj = (Serializable)ctor.newInstance((char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ } else {
+ // Use string constructor
+ Constructor ctor = c.getConstructor(String.class);
+ obj = (Serializable)ctor.newInstance(testValue);
+ }
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (NoSuchMethodException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InstantiationException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (IllegalAccessException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InvocationTargetException e) {
+ e.printStackTrace(System.out);
+ }
+ return obj;
+ }
+
+ // value has format "classname:ctorstrvalue"
+ protected static Serializable createObject(String value) throws Exception {
+ Serializable obj = null;
+ int colonIndex = value.indexOf(":");
+ if (colonIndex >= 0) {
+ String className = value.substring(0, colonIndex);
+ String testValue = value.substring(colonIndex+1);
+ obj = createJavaObject(className, testValue);
+ } else {
+ throw new Exception("createObject(): Malformed value string");
+ }
+ return obj;
+ }
+
+ protected static TextMessage createTextMessage(Session session, String valueStr) throws JMSException {
+ return session.createTextMessage(valueStr);
+ }
+
+ protected static boolean isSupportedJmsMessageType(String jmsMessageType) {
+ for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) {
+ if (jmsMessageType.equals(supportedJmsMessageType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
deleted file mode 100644
index 4710025..0000000
--- a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.interop_test.shim;
-
-import java.math.BigDecimal;
-import java.util.UUID;
-import java.util.Vector;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.qpid.jms.JmsConnectionFactory;
-
-public class ProtonJmsReceiver {
- private static final String USER = "guest";
- private static final String PASSWORD = "guest";
- private static final int TIMEOUT = 1000;
- private static final String[] SUPPORTED_AMQP_TYPES = {"null",
- "boolean",
- "ubyte",
- "ushort",
- "uint",
- "ulong",
- "byte",
- "short",
- "int",
- "long",
- "float",
- "double",
- "decimal32",
- "decimal64",
- "decimal128",
- "char",
- "timestamp",
- "uuid",
- "binary",
- "string",
- "symbol",
- "list",
- "map",
- "array"};
-
- public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.out.println("ProtonJmsReceiver: Insufficient number of arguments");
- System.out.println("ProtonJmsReceiver: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
- System.exit(1);
- }
- String brokerAddress = "amqp://" + args[0];
- String queueName = args[1];
- String amqpType = args[2];
- int numTestValues = Integer.parseInt(args[3]);
- Connection connection = null;
-
- try {
- ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
-
- connection = factory.createConnection(USER, PASSWORD);
- connection.setExceptionListener(new MyExceptionListener());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = session.createQueue(queueName);
-
- MessageConsumer messageConsumer = session.createConsumer(queue);
-
- Vector<String> outList = new Vector<String>();
- outList.add(amqpType);
- if (isSupportedAmqpType(amqpType)) {
- int actualCount = 0;
- Message message = null;
- for (int i = 1; i <= numTestValues; i++, actualCount++) {
- message = messageConsumer.receive(TIMEOUT);
- if (message == null)
- break;
- switch (amqpType) {
- case "null":
- long bodyLength = ((BytesMessage)message).getBodyLength();
- if (bodyLength == 0L) {
- outList.add("None");
- } else {
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Expected 0 bytes, read " + bodyLength);
- }
- break;
- case "boolean":
- String bs = String.valueOf(((BytesMessage)message).readBoolean());
- outList.add(Character.toUpperCase(bs.charAt(0)) + bs.substring(1));
- break;
- case "ubyte":
- byte byteValue = ((BytesMessage)message).readByte();
- short ubyteValue = (short)(byteValue & 0xff);
- outList.add(String.valueOf(ubyteValue));
- break;
- case "ushort":
- {
- byte[] byteArray = new byte[2];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 2) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 2 bytes, read " + numBytes);
- }
- int ushortValue = 0;
- for (int j=0; j<byteArray.length; j++) {
- ushortValue = (ushortValue << 8) + (byteArray[j] & 0xff);
- }
- outList.add(String.valueOf(ushortValue));
- break;
- }
- case "uint":
- {
- byte[] byteArray = new byte[4];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 4) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 4 bytes, read " + numBytes);
- }
- long uintValue = 0;
- for (int j=0; j<byteArray.length; j++) {
- uintValue = (uintValue << 8) + (byteArray[j] & 0xff);
- }
- outList.add(String.valueOf(uintValue));
- break;
- }
- case "ulong":
- case "timestamp":
- {
- // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
- byte[] byteArray = new byte[8];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 8) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 8 bytes, read " + numBytes);
- }
- // TODO: shortcut in use here - this byte array should go through a Java type that can represent this as a number - such as BigInteger.
- outList.add(String.format("0x%02x%02x%02x%02x%02x%02x%02x%02x", byteArray[0], byteArray[1],
- byteArray[2], byteArray[3], byteArray[4], byteArray[5], byteArray[6], byteArray[7]));
- break;
- }
- case "byte":
- outList.add(String.valueOf(((BytesMessage)message).readByte()));
- break;
- case "short":
- outList.add(String.valueOf(((BytesMessage)message).readShort()));
- break;
- case "int":
- outList.add(String.valueOf(((BytesMessage)message).readInt()));
- break;
- case "long":
- outList.add(String.valueOf(((BytesMessage)message).readLong()));
- break;
- case "float":
- float f = ((BytesMessage)message).readFloat();
- int i0 = Float.floatToRawIntBits(f);
- outList.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
- break;
- case "double":
- double d = ((BytesMessage)message).readDouble();
- long l = Double.doubleToRawLongBits(d);
- outList.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
- break;
- case "decimal32":
- BigDecimal bd32 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd32.toString());
- break;
- case "decimal64":
- BigDecimal bd64 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd64.toString());
- break;
- case "decimal128":
- BigDecimal bd128 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd128.toString());
- break;
- case "char":
- outList.add(String.format("%c", ((BytesMessage)message).readChar()));
- break;
- case "uuid":
- UUID uuid = (UUID)((ObjectMessage)message).getObject();
- outList.add(uuid.toString());
- break;
- case "binary":
- BytesMessage bm = (BytesMessage)message;
- int msgLen = (int)bm.getBodyLength();
- byte[] ba = new byte[msgLen];
- if (bm.readBytes(ba) == msgLen) {
- outList.add(new String(ba));
- } else {
- // TODO: Raise exception or error here: size mismatch
- }
- break;
- case "string":
- outList.add(((TextMessage)message).getText());
- break;
- case "symbol":
- outList.add(((BytesMessage)message).readUTF());
- break;
- case "list":
- break;
- case "map":
- break;
- case "array":
- break;
- default:
- // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
- connection.close();
- throw new Exception("ProtonJmsReceiver: Internal error: unsupported AMQP type \"" + amqpType + "\"");
- }
- }
- } else {
- System.out.println("ERROR: ProtonJmsReceiver: AMQP type \"" + amqpType + "\" is not supported");
- connection.close();
- System.exit(1);
- }
-
- connection.close();
-
- // No exception, print results
- for (int i=0; i<outList.size(); i++) {
- System.out.println(outList.get(i));
- }
- } catch (Exception exp) {
- if (connection != null)
- connection.close();
- System.out.println("Caught exception, exiting.");
- exp.printStackTrace(System.out);
- System.exit(1);
- }
- }
-
- protected static boolean isSupportedAmqpType(String amqpType) {
- for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
- if (amqpType.equals(supportedAmqpType))
- return true;
- }
- return false;
- }
-
- private static class MyExceptionListener implements ExceptionListener {
- @Override
- public void onException(JMSException exception) {
- System.out.println("Connection ExceptionListener fired, exiting.");
- exception.printStackTrace(System.out);
- System.exit(1);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
deleted file mode 100644
index 3507fbd..0000000
--- a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.interop_test.shim;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.math.MathContext;
-import java.util.Arrays;
-import java.util.UUID;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.qpid.jms.JmsConnectionFactory;
-
-public class ProtonJmsSender {
- private static final String USER = "guest";
- private static final String PASSWORD = "guest";
- private static final String[] SUPPORTED_AMQP_TYPES = {"null",
- "boolean",
- "ubyte",
- "ushort",
- "uint",
- "ulong",
- "byte",
- "short",
- "int",
- "long",
- "float",
- "double",
- "decimal32",
- "decimal64",
- "decimal128",
- "char",
- "timestamp",
- "uuid",
- "binary",
- "string",
- "symbol",
- "list",
- "map",
- "array"};
-
- public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.out.println("ProtonJmsSender: Insufficient number of arguments");
- System.out.println("ProtonJmsSender: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
- System.exit(1);
- }
- String brokerAddress = "amqp://" + args[0];
- String queueName = args[1];
- String amqpType = args[2];
- String[] testValueList = Arrays.copyOfRange(args, 3, args.length); // Use remaining args as test values
-
- try {
- ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
-
- Connection connection = factory.createConnection();
- connection.setExceptionListener(new MyExceptionListener());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = session.createQueue(queueName);
-
- MessageProducer messageProducer = session.createProducer(queue);
-
- if (isSupportedAmqpType(amqpType)) {
- Message message = null;
- for (String testValueStr : testValueList) {
- switch (amqpType) {
- case "null":
- message = session.createBytesMessage();
- break;
- case "boolean":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBoolean(Boolean.parseBoolean(testValueStr));
- break;
- case "ubyte":
- {
- byte testValue = (byte)Short.parseShort(testValueStr);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeByte(testValue);
- break;
- }
- case "ushort":
- {
- int testValue = Integer.parseInt(testValueStr);
- byte[] byteArray = new byte[2];
- byteArray[0] = (byte)(testValue >> 8);
- byteArray[1] = (byte)(testValue);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "uint":
- {
- long testValue = Long.parseLong(testValueStr);
- byte[] byteArray = new byte[4];
- byteArray[0] = (byte)(testValue >> 24);
- byteArray[1] = (byte)(testValue >> 16);
- byteArray[2] = (byte)(testValue >> 8);
- byteArray[3] = (byte)(testValue);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "ulong":
- {
- // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
- BigInteger testValue = new BigInteger(testValueStr);
- byte[] bigIntArray = testValue.toByteArray(); // may be 1 to 9 bytes depending on number
- byte[] byteArray = {0, 0, 0, 0, 0, 0, 0, 0};
- int effectiveBigIntArrayLen = bigIntArray.length > 8 ? 8 : bigIntArray.length; // Cap length at 8
- int bigIntArrayOffs = bigIntArray.length > 8 ? bigIntArray.length - 8 : 0; // Offset when length > 8
- for (int i=0; i<bigIntArray.length && i < 8; i++)
- byteArray[8 - effectiveBigIntArrayLen + i] = bigIntArray[bigIntArrayOffs + i];
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "byte":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeByte(Byte.parseByte(testValueStr));
- break;
- case "short":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeShort(Short.parseShort(testValueStr));
- break;
- case "int":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeInt(Integer.parseInt(testValueStr));
- break;
- case "long":
- case "timestamp":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeLong(Long.parseLong(testValueStr));
- break;
- case "float":
- Long i = Long.parseLong(testValueStr.substring(2), 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeFloat(Float.intBitsToFloat(i.intValue()));
- break;
- case "double":
- Long l1 = Long.parseLong(testValueStr.substring(2, 3), 16) << 60;
- Long l2 = Long.parseLong(testValueStr.substring(3), 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeDouble(Double.longBitsToDouble(l1 | l2));
- break;
- case "decimal32":
- BigDecimal bd32 = new BigDecimal(testValueStr, MathContext.DECIMAL32);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd32);
- break;
- case "decimal64":
- BigDecimal bd64 = new BigDecimal(testValueStr, MathContext.DECIMAL64);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd64);
- break;
- case "decimal128":
- BigDecimal bd128 = new BigDecimal(testValueStr, MathContext.DECIMAL128);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd128);
- break;
- case "char":
- char c = 0;
- if (testValueStr.length() == 1) // Single char
- c = testValueStr.charAt(0);
- else if (testValueStr.length() == 6) // unicode format
- c = (char)Integer.parseInt(testValueStr, 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeChar(c);
- break;
- case "uuid":
- UUID uuid = UUID.fromString(testValueStr);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(uuid);
- break;
- case "binary":
- message = session.createBytesMessage();
- byte[] byteArray = testValueStr.getBytes();
- ((BytesMessage)message).writeBytes(byteArray, 0, byteArray.length);
- break;
- case "string":
- message = session.createTextMessage(testValueStr);
- break;
- case "symbol":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeUTF(testValueStr);
- break;
- case "list":
- break;
- case "map":
- break;
- case "array":
- break;
- default:
- // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
- connection.close();
- throw new Exception("ProtonJmsSender: Internal error: unsupported AMQP type \"" + amqpType + "\"");
- }
- messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- } else {
- System.out.println("ERROR: ProtonJmsSender: AMQP type \"" + amqpType + "\" is not supported");
- connection.close();
- System.exit(1);
- }
-
- connection.close();
- } catch (Exception exp) {
- System.out.println("Caught exception, exiting.");
- exp.printStackTrace(System.out);
- System.exit(1);
- }
- }
-
- protected static boolean isSupportedAmqpType(String amqpType) {
- for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
- if (amqpType.equals(supportedAmqpType))
- return true;
- }
- return false;
- }
-
- private static class MyExceptionListener implements ExceptionListener {
- @Override
- public void onException(JMSException exception) {
- System.out.println("Connection ExceptionListener fired, exiting.");
- exception.printStackTrace(System.out);
- System.exit(1);
- }
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org