You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2015/08/26 04:51:49 UTC
[1/2] qpid-interop-test git commit: QPIDIT-17: Add JMS test suite
with Qpid-JMS and Proton-Python clients.
Repository: qpid-interop-test
Updated Branches:
refs/heads/master 77eba676f -> ebdacb0dd
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/amqp-receive
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/amqp-receive b/shims/qpid-proton-python/src/amqp-receive
new file mode 100755
index 0000000..c0cebff
--- /dev/null
+++ b/shims/qpid-proton-python/src/amqp-receive
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Issues:
+# * Capturing errors from client or broker
+
+import sys
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from traceback import format_exc
+from struct import pack, unpack
+
+class Receiver(MessagingHandler):
+ def __init__(self, url, amqp_type, expected_num_messages):
+ super(Receiver, self).__init__()
+ self.url = url
+ self.amqp_type = amqp_type
+ self.received_value_list = []
+ self.expected = int(expected_num_messages)
+ self.received = 0
+
+ def get_received_value_list(self):
+ return self.received_value_list
+
+ def on_start(self, event):
+ event.container.create_receiver(self.url)
+
+ def on_message(self, event):
+ if event.message.id and event.message.id < self.received:
+ return # ignore duplicate message
+ if self.expected == 0 or self.received < self.expected:
+ if self.amqp_type == 'null' or \
+ self.amqp_type == 'boolean' or \
+ self.amqp_type == 'ubyte' or \
+ self.amqp_type == 'ushort' or \
+ self.amqp_type == 'uint' or \
+ self.amqp_type == 'ulong' or \
+ self.amqp_type == 'byte' or \
+ self.amqp_type == 'short' or \
+ self.amqp_type == 'int' or \
+ self.amqp_type == 'long' or \
+ self.amqp_type == 'decimal32' or \
+ self.amqp_type == 'decimal64' or \
+ self.amqp_type == 'decimal128' or \
+ self.amqp_type == 'timestamp' or \
+ self.amqp_type == 'uuid':
+ self.received_value_list.append(str(event.message.body))
+ elif self.amqp_type == 'float':
+ self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0])
+ elif self.amqp_type == 'double':
+ self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0])
+ elif self.amqp_type == 'decimal128':
+ self.received_value_list.append(event.message.body.encode('hex'))
+ elif self.amqp_type == 'char' or \
+ self.amqp_type == 'binary' or \
+ self.amqp_type == 'string' or \
+ self.amqp_type == 'symbol':
+ self.received_value_list.append(event.message.body)
+ elif self.amqp_type == 'list' or \
+ self.amqp_type == 'map':
+ self.received_value_list.append(event.message.body)
+ else:
+ print 'receive: Unsupported AMQP type "%s"' % self.amqp_type
+ return
+ self.received += 1
+ if self.received == self.expected:
+ event.receiver.close()
+ event.connection.close()
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: AMQP type
+# 4: Expected number of test values to receive
+try:
+ rcv = Receiver('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4])
+ Container(rcv).run()
+ print sys.argv[3]
+ for val in rcv.get_received_value_list():
+ print val
+except KeyboardInterrupt:
+ pass
+except Exception as e:
+ print 'proton-python-receive EXCEPTION:', e
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/amqp-send
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/amqp-send b/shims/qpid-proton-python/src/amqp-send
new file mode 100755
index 0000000..288243f
--- /dev/null
+++ b/shims/qpid-proton-python/src/amqp-send
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Issues:
+# * Capturing errors from client or broker
+
+import sys
+from ast import literal_eval
+from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, ubyte, uint, ulong, ushort
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from shim_utils import StrToObj
+from struct import unpack
+from traceback import format_exc
+from uuid import UUID
+
+class Sender(MessagingHandler):
+ def __init__(self, url, amqp_type, test_value_list):
+ super(Sender, self).__init__()
+ self.url = url
+ self.amqp_type = amqp_type
+ self.test_value_list = test_value_list
+ self.sent = 0
+ self.confirmed = 0
+ self.total = len(test_value_list)
+
+ def on_start(self, event):
+ event.container.create_sender(self.url)
+
+ def on_sendable(self, event):
+ if self.sent == 0:
+ for test_value in self.test_value_list:
+ if event.sender.credit:
+ message = self.create_message(test_value)
+ if message is not None:
+ event.sender.send(message)
+ self.sent += 1
+ else:
+ event.connection.close()
+ return
+
+ def create_message(self, test_value):
+ # Non-string types using literal_eval
+ if self.amqp_type == 'null':
+ return Message(id=(self.sent+1), body=None)
+ elif self.amqp_type == 'boolean':
+ return Message(id=(self.sent+1), body=True if test_value == 'True' else False)
+ elif self.amqp_type == 'ubyte':
+ return Message(id=(self.sent+1), body=ubyte(literal_eval(test_value)))
+ elif self.amqp_type == 'ushort':
+ return Message(id=(self.sent+1), body=ushort(literal_eval(test_value)))
+ elif self.amqp_type == 'uint':
+ return Message(id=(self.sent+1), body=uint(literal_eval(test_value)))
+ elif self.amqp_type == 'ulong':
+ return Message(id=(self.sent+1), body=ulong(literal_eval(test_value)))
+ elif self.amqp_type == 'byte':
+ return Message(id=(self.sent+1), body=byte(literal_eval(test_value)))
+ elif self.amqp_type == 'short':
+ return Message(id=(self.sent+1), body=short(literal_eval(test_value)))
+ elif self.amqp_type == 'int':
+ return Message(id=(self.sent+1), body=int32(literal_eval(test_value)))
+ elif self.amqp_type == 'long':
+ return Message(id=(self.sent+1), body=long(literal_eval(test_value)))
+ elif self.amqp_type == 'float':
+ return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0]))
+ elif self.amqp_type == 'double':
+ return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0])
+ elif self.amqp_type == 'decimal32':
+ return Message(id=(self.sent+1), body=decimal32(literal_eval(test_value)))
+ elif self.amqp_type == 'decimal64':
+ return Message(id=(self.sent+1), body=decimal64(literal_eval(test_value)))
+ elif self.amqp_type == 'decimal128':
+ return Message(id=(self.sent+1), body=decimal128(literal_eval(test_value)))
+ elif self.amqp_type == 'char':
+ return Message(id=(self.sent+1), body=char(test_value))
+ elif self.amqp_type == 'timestamp':
+ return Message(id=(self.sent+1), body=timestamp(literal_eval(test_value)))
+ elif self.amqp_type == 'uuid':
+ return Message(id=(self.sent+1), body=UUID(test_value))
+ elif self.amqp_type == 'binary':
+ return Message(id=(self.sent+1), body=bytes(test_value))
+ elif self.amqp_type == 'string':
+ return Message(id=(self.sent+1), body=unicode(test_value))
+ elif self.amqp_type == 'symbol':
+ return Message(id=(self.sent+1), body=symbol(test_value))
+ elif self.amqp_type == 'list':
+ return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run())
+ elif self.amqp_type == 'map':
+ return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run())
+ else:
+ print 'send: Unsupported AMQP type "%s"' % self.amqp_type
+ return None
+
+ def on_accepted(self, event):
+ self.confirmed += 1
+ if self.confirmed == self.total:
+ event.connection.close()
+
+ def on_disconnected(self, event):
+ self.sent = self.confirmed
+
+ @staticmethod
+ def _map_string_to_map(str_list):
+ return {}
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: AMQP type
+# 4...n: Test value(s) as strings
+try:
+ Container(Sender('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4:])).run()
+except KeyboardInterrupt:
+ pass
+except Exception as e:
+ print 'proton-python-send EXCEPTION:', e
+ print format_exc()
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/jms-receiver-shim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/jms-receiver-shim.py b/shims/qpid-proton-python/src/jms-receiver-shim.py
new file mode 100755
index 0000000..9091251
--- /dev/null
+++ b/shims/qpid-proton-python/src/jms-receiver-shim.py
@@ -0,0 +1,234 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from interop_test_errors import InteropTestError
+from json import dumps, loads
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from struct import pack, unpack
+from subprocess import check_output
+from traceback import format_exc
+
+class JmsReceiverShim(MessagingHandler):
+ def __init__(self, url, jms_msg_type, expected_msg_map):
+ super(JmsReceiverShim, self).__init__()
+ self.url = url
+ self.jms_msg_type = jms_msg_type
+ self.expteced_msg_map = expected_msg_map
+ self.subtype_itr = iter(sorted(self.expteced_msg_map.keys()))
+ self.expected = self._get_tot_num_messages()
+ self.received = 0
+ self.received_value_map = {}
+ self.current_subtype = None
+ self.current_subtype_msg_list = None
+
+ def get_received_value_map(self):
+ return self.received_value_map
+
+ def on_start(self, event):
+ event.container.create_receiver(self.url)
+
+ def on_message(self, event):
+ if event.message.id and event.message.id < self.received:
+ return # ignore duplicate message
+ if self.expected == 0 or self.received < self.expected:
+ if self.current_subtype is None:
+ self.current_subtype = self.subtype_itr.next()
+ self.current_subtype_msg_list = []
+ self.current_subtype_msg_list.append(self._handle_message(event.message))
+ if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]:
+ self.received_value_map[self.current_subtype] = self.current_subtype_msg_list
+ self.current_subtype = None
+ self.current_subtype_msg_list = []
+ self.received += 1
+ if self.received == self.expected:
+ event.receiver.close()
+ event.connection.close()
+
+ def _handle_message(self, message):
+ if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
+ return self._receive_jms_bytesmessage(message)
+ if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
+ return self._recieve_jms_mapmessage(message)
+ if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
+ return self._recieve_jms_objectmessage(message)
+ if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
+ return self._receive_jms_streammessage(message)
+ if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
+ return self._receive_jms_textmessage(message)
+ print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type
+ return None
+
+ def _get_tot_num_messages(self):
+ total = 0
+ for key in self.expteced_msg_map:
+ total += int(self.expteced_msg_map[key])
+ return total
+
+ def _receive_jms_bytesmessage(self, message):
+ assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
+ if self.current_subtype == 'boolean':
+ if message.body == b'\x00':
+ return 'False'
+ if message.body == b'\x01':
+ return 'True'
+ raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
+ str(message.body))
+ if self.current_subtype == 'byte':
+ return hex(unpack('b', message.body)[0])
+ if self.current_subtype == 'bytes':
+ return str(message.body)
+ if self.current_subtype == 'char':
+ if len(message.body) == 2: # format 'a' or '\xNN'
+ return str(message.body[1]) # strip leading '\x00' char
+ raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body))
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', message.body)[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', message.body)[0]
+ if self.current_subtype == 'int':
+ return hex(unpack('!i', message.body)[0])
+ if self.current_subtype == 'long':
+ return hex(unpack('!q', message.body)[0])
+ if self.current_subtype == 'short':
+ return hex(unpack('!h', message.body)[0])
+ if self.current_subtype == 'string':
+ # NOTE: first 2 bytes are string length, must be present
+ if len(message.body) >= 2:
+ str_len = unpack('!H', message.body[:2])[0]
+ str_body = str(message.body[2:])
+ if len(str_body) != str_len:
+ raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
+ (str_len, str_body, len(str_body)))
+ return str_body
+ else:
+ raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
+ (repr(message.body), len(message.body)))
+ raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _recieve_jms_mapmessage(self, message):
+ assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE'
+ key, value = message.body.items()[0]
+ assert key[:-3] == self.current_subtype
+ if self.current_subtype == 'boolean':
+ return str(value)
+ if self.current_subtype == 'byte':
+ return hex(value)
+ if self.current_subtype == 'bytes':
+ return str(value)
+ if self.current_subtype == 'char':
+ return str(value)
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', pack('!d', value))[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', pack('!f', value))[0]
+ if self.current_subtype == 'int':
+ return hex(value)
+ if self.current_subtype == 'long':
+ return hex(int(value))
+ if self.current_subtype == 'short':
+ return hex(value)
+ if self.current_subtype == 'string':
+ return str(value)
+ raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _recieve_jms_objectmessage(self, message):
+ assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE'
+ return self._get_java_obj(message.body)
+
+ def _get_java_obj(self, java_obj_bytes):
+ '''
+ Take bytes from serialized Java object and construct a Java object, then return its toString() value. The
+ work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java
+ utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar.
+ java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...')
+ returns: string containing Java class value as returned by the toString() method
+ '''
+ java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip()
+ out_str = check_output(['java',
+ '-cp',
+ 'target/JavaObjUtils.jar',
+ 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj',
+ java_obj_bytes_str])
+ out_str_list = out_str.split('\n')[:-1] # remove trailing \n
+ if len(out_str_list) > 1:
+ raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str)
+ colon_index = out_str_list[0].index(':')
+ if colon_index < 0:
+ raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str)
+ java_class_name = out_str_list[0][:colon_index]
+ java_class_value_str = out_str_list[0][colon_index+1:]
+ if java_class_name != self.current_subtype:
+ raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' %
+ (self.current_subtype, java_class_name))
+ return java_class_value_str
+
+ def _receive_jms_streammessage(self, message):
+ assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE'
+ # Every message is a list with one item [value]
+ assert len(message.body) == 1
+ value = message.body[0]
+ if self.current_subtype == 'boolean':
+ return str(value)
+ if self.current_subtype == 'byte':
+ return hex(value)
+ if self.current_subtype == 'bytes':
+ return str(value)
+ if self.current_subtype == 'char':
+ return str(value)
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', pack('!d', value))[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', pack('!f', value))[0]
+ if self.current_subtype == 'int':
+ return hex(value)
+ if self.current_subtype == 'long':
+ return hex(int(value))
+ if self.current_subtype == 'short':
+ return hex(value)
+ if self.current_subtype == 'string':
+ return str(value)
+ raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _receive_jms_textmessage(self, message):
+ assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE'
+ return message.body
+
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: JMS message type
+# 4: JSON string of map containing number of test values to receive for each type/subtype
+#print '#### sys.argv=%s' % sys.argv
+try:
+ RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))
+ Container(RECEIVER).run()
+ print sys.argv[3]
+ print dumps(RECEIVER.get_received_value_map())
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print 'jms-receiver-shim EXCEPTION:', exc
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/jms-sender-shim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/jms-sender-shim.py b/shims/qpid-proton-python/src/jms-sender-shim.py
new file mode 100755
index 0000000..d78e52b
--- /dev/null
+++ b/shims/qpid-proton-python/src/jms-sender-shim.py
@@ -0,0 +1,241 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from json import loads
+from proton import byte, char, float32, int32, Message, short, symbol
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from interop_test_errors import InteropTestError
+from subprocess import check_output
+from struct import pack, unpack
+from traceback import format_exc
+
+# These values must tie in with the Qpid-JMS client values found in
+# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport
+QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type')
+QPID_JMS_TYPE_ANNOTATIONS = {
+ 'JMS_BYTESMESSAGE_TYPE': byte(3),
+ 'JMS_MAPMESSAGE_TYPE': byte(2),
+ 'JMS_OBJECTMESSAGE_TYPE': byte(1),
+ 'JMS_STREAMMESSAGE_TYPE': byte(4),
+ 'JMS_TEXTMESSAGE_TYPE': byte(5)
+ }
+def create_annotation(jms_msg_type):
+ return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]}
+
+class JmsSenderShim(MessagingHandler):
+ def __init__(self, url, jms_msg_type, test_value_map):
+ super(JmsSenderShim, self).__init__()
+ self.url = url
+ self.jms_msg_type = jms_msg_type
+ self.test_value_map = test_value_map
+ self.sent = 0
+ self.confirmed = 0
+ self.total = self._get_total_num_msgs()
+
+ def on_start(self, event):
+ event.container.create_sender(self.url)
+
+ def on_sendable(self, event):
+ if self.sent == 0:
+ # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}'
+ for sub_type in sorted(self.test_value_map.keys()):
+ if self._send_test_values(event, sub_type, self.test_value_map[sub_type]):
+ return
+
+ def _get_total_num_msgs(self):
+ total = 0
+ for key in self.test_value_map.keys():
+ total += len(self.test_value_map[key])
+ return total
+
+ def _send_test_values(self, event, test_value_type, test_values):
+ value_num = 0
+ for test_value in test_values:
+ if event.sender.credit:
+ message = self._create_message(test_value_type, test_value, value_num)
+ if message is not None:
+ event.sender.send(message)
+ self.sent += 1
+ value_num += 1
+ else:
+ event.connection.close()
+ return True
+ return False
+
+ # TODO: Change this to return a list of messages. That way each test can return more than one message
+ def _create_message(self, test_value_type, test_value, value_num):
+ if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
+ return self._create_jms_bytesmessage(test_value_type, test_value)
+ elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
+ return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num))
+ elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
+ return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value))
+ elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
+ return self._create_jms_streammessage(test_value_type, test_value)
+ elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
+ return self._create_jms_textmessage(test_value)
+ else:
+ print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type
+ return None
+
+ def _create_jms_bytesmessage(self, test_value_type, test_value):
+ # NOTE: test_value contains all unicode strings u'...' as returned by json
+ body_bytes = None
+ if test_value_type == 'boolean':
+ body_bytes = b'\x01' if test_value == 'True' else b'\x00'
+ elif test_value_type == 'byte':
+ body_bytes = pack('b', int(test_value, 16))
+ elif test_value_type == 'bytes':
+ body_bytes = str(test_value) # remove unicode
+ elif test_value_type == 'char':
+ # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00'
+ body_bytes = '\x00' + str(test_value) # remove unicode
+ elif test_value_type == 'double' or test_value_type == 'float':
+ body_bytes = test_value[2:].decode('hex')
+ elif test_value_type == 'int':
+ body_bytes = pack('!i', int(test_value, 16))
+ elif test_value_type == 'long':
+ body_bytes = pack('!q', long(test_value, 16))
+ elif test_value_type == 'short':
+ body_bytes = pack('!h', short(test_value, 16))
+ elif test_value_type == 'string':
+ # NOTE: First two bytes must be string length
+ test_value_str = str(test_value) # remove unicode
+ body_bytes = pack('!H', len(test_value_str)) + test_value_str
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body=body_bytes,
+ inferred=True,
+ content_type='application/octet-stream',
+ annotations=create_annotation('JMS_BYTESMESSAGE_TYPE'))
+
+ def _create_jms_mapmessage(self, test_value_type, test_value, name):
+ if test_value_type == 'boolean':
+ value = test_value == 'True'
+ elif test_value_type == 'byte':
+ value = byte(int(test_value, 16))
+ elif test_value_type == 'bytes':
+ value = str(test_value) # remove unicode
+ elif test_value_type == 'char':
+ value = char(test_value)
+ elif test_value_type == 'double':
+ value = unpack('!d', test_value[2:].decode('hex'))[0]
+ elif test_value_type == 'float':
+ value = float32(unpack('!f', test_value[2:].decode('hex'))[0])
+ elif test_value_type == 'int':
+ value = int32(int(test_value, 16))
+ elif test_value_type == 'long':
+ value = int(test_value, 16)
+ elif test_value_type == 'short':
+ value = short(int(test_value, 16))
+ elif test_value_type == 'string':
+ value = test_value
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body={name: value},
+ inferred=False,
+ annotations=create_annotation('JMS_MAPMESSAGE_TYPE'))
+
+ def _create_jms_objectmessage(self, test_value):
+ java_binary = self._get_java_obj_binary(test_value)
+ return Message(id=(self.sent+1),
+ body=java_binary,
+ inferred=True,
+ content_type='application/x-java-serialized-object',
+ annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE'))
+
+ @staticmethod
+ def _get_java_obj_binary(java_class_str):
+ out_str = check_output(['java',
+ '-cp',
+ 'target/JavaObjUtils.jar',
+ 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes',
+ java_class_str])
+ out_str_list = out_str.split('\n')[:-1] # remove trailing \n
+ if out_str_list[0] != java_class_str:
+ raise InteropTestError('JmsSenderShim._get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' % out_str)
+ return out_str_list[1].decode('hex')
+
+ def _create_jms_streammessage(self, test_value_type, test_value):
+ if test_value_type == 'boolean':
+ body_list = [test_value == 'True']
+ elif test_value_type == 'byte':
+ body_list = [byte(int(test_value, 16))]
+ elif test_value_type == 'bytes':
+ body_list = [str(test_value)]
+ elif test_value_type == 'char':
+ body_list = [char(test_value)]
+ elif test_value_type == 'double':
+ body_list = [unpack('!d', test_value[2:].decode('hex'))[0]]
+ elif test_value_type == 'float':
+ body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])]
+ elif test_value_type == 'int':
+ body_list = [int32(int(test_value, 16))]
+ elif test_value_type == 'long':
+ body_list = [int(test_value, 16)]
+ elif test_value_type == 'short':
+ body_list = [short(int(test_value, 16))]
+ elif test_value_type == 'string':
+ body_list = [test_value]
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body=body_list,
+ inferred=True,
+ annotations=create_annotation('JMS_STREAMMESSAGE_TYPE'))
+
+ def _create_jms_textmessage(self, test_value_text):
+ return Message(id=(self.sent+1),
+ body=unicode(test_value_text),
+ annotations=create_annotation('JMS_TEXTMESSAGE_TYPE'))
+
+ def on_accepted(self, event):
+ self.confirmed += 1
+ if self.confirmed == self.total:
+ event.connection.close()
+
+ def on_disconnected(self, event):
+ self.sent = self.confirmed
+
+# @staticmethod
+# def _map_string_to_map(str_list):
+# return {}
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: JMS message type
+# 4: Test value(s) as JSON string
+#print '#### sys.argv=%s' % sys.argv
+#print '>>> test_values=%s' % loads(sys.argv[4])
+try:
+ Container(JmsSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run()
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print 'jms-sender-shim EXCEPTION:', exc
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/proton-python-receive
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/proton-python-receive b/shims/qpid-proton-python/src/proton-python-receive
deleted file mode 100755
index 080782d..0000000
--- a/shims/qpid-proton-python/src/proton-python-receive
+++ /dev/null
@@ -1,106 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Issues:
-# * Capturing errors from client or broker
-# * Correct Message body types, use of cproton.Data class?
-
-import sys
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from struct import pack, unpack
-
-class ReceiverError(StandardError):
- def __init__(self, error_message):
- super(ReceiverError, self).__init__(error_message)
-
-class Receiver(MessagingHandler):
- def __init__(self, url, amqp_type, expected_num_messages):
- super(Receiver, self).__init__()
- self.url = url
- self.amqp_type = amqp_type
- self.received_value_list = []
- self.expected = int(expected_num_messages)
- self.received = 0
-
- def get_received_value_list(self):
- return self.received_value_list
-
- def on_start(self, event):
- event.container.create_receiver(self.url)
-
- def on_message(self, event):
- if event.message.id and event.message.id < self.received:
- return # ignore duplicate message
- if self.expected == 0 or self.received < self.expected:
- if self.amqp_type == 'null' or \
- self.amqp_type == 'boolean' or \
- self.amqp_type == 'ubyte' or \
- self.amqp_type == 'ushort' or \
- self.amqp_type == 'uint' or \
- self.amqp_type == 'ulong' or \
- self.amqp_type == 'byte' or \
- self.amqp_type == 'short' or \
- self.amqp_type == 'int' or \
- self.amqp_type == 'long' or \
- self.amqp_type == 'decimal32' or \
- self.amqp_type == 'decimal64' or \
- self.amqp_type == 'decimal128' or \
- self.amqp_type == 'timestamp' or \
- self.amqp_type == 'uuid':
- self.received_value_list.append(str(event.message.body))
- elif self.amqp_type == 'float':
- self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0])
- elif self.amqp_type == 'double':
- self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0])
- elif self.amqp_type == 'decimal128':
- self.received_value_list.append(event.message.body.encode('hex'))
- elif self.amqp_type == 'char' or \
- self.amqp_type == 'binary' or \
- self.amqp_type == 'string' or \
- self.amqp_type == 'symbol':
- self.received_value_list.append(event.message.body)
- elif self.amqp_type == 'list' or \
- self.amqp_type == 'map':
- self.received_value_list.append(event.message.body)
- else:
- print 'proton-python-receive: Unsupported AMQP type "%s"' % self.amqp_type
- return
- self.received += 1
- if self.received == self.expected:
- event.receiver.close()
- event.connection.close()
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: AMQP type
-# 4: Expected number of test values to receive
-try:
- rcv = Receiver('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4])
- Container(rcv).run()
- print sys.argv[3]
- for val in rcv.get_received_value_list():
- print val
-except KeyboardInterrupt:
- pass
-except Exception as e:
- print 'proton-python-receive EXCEPTION:', e
-
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-proton-python/src/proton-python-send
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/proton-python-send b/shims/qpid-proton-python/src/proton-python-send
deleted file mode 100755
index eb1c52c..0000000
--- a/shims/qpid-proton-python/src/proton-python-send
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Issues:
-# * Capturing errors from client or broker
-# * Correct Message body types, use of cproton.Data class?
-
-import sys
-from ast import literal_eval
-from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, ubyte, uint, ulong, ushort
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from shim_utils import StrToObj
-from struct import unpack
-from traceback import format_exc
-from uuid import UUID
-
-class Sender(MessagingHandler):
- def __init__(self, url, amqp_type, test_value_list):
- super(Sender, self).__init__()
- self.url = url
- self.amqp_type = amqp_type
- self.test_value_list = test_value_list
- self.sent = 0
- self.confirmed = 0
- self.total = len(test_value_list)
-
- def on_start(self, event):
- event.container.create_sender(self.url)
-
- def on_sendable(self, event):
- if self.sent == 0:
- for test_value in self.test_value_list:
- if event.sender.credit:
- message = self.create_message(test_value)
- if message is not None:
- event.sender.send(message)
- self.sent += 1
- else:
- event.connection.close()
- return
-
- def create_message(self, test_value):
- # Non-string types using literal_eval
- if self.amqp_type == 'null':
- return Message(id=(self.sent+1), body=None)
- elif self.amqp_type == 'boolean':
- return Message(id=(self.sent+1), body=True if test_value == 'True' else False)
- elif self.amqp_type == 'ubyte':
- return Message(id=(self.sent+1), body=ubyte(literal_eval(test_value)))
- elif self.amqp_type == 'ushort':
- return Message(id=(self.sent+1), body=ushort(literal_eval(test_value)))
- elif self.amqp_type == 'uint':
- return Message(id=(self.sent+1), body=uint(literal_eval(test_value)))
- elif self.amqp_type == 'ulong':
- return Message(id=(self.sent+1), body=ulong(literal_eval(test_value)))
- elif self.amqp_type == 'byte':
- return Message(id=(self.sent+1), body=byte(literal_eval(test_value)))
- elif self.amqp_type == 'short':
- return Message(id=(self.sent+1), body=short(literal_eval(test_value)))
- elif self.amqp_type == 'int':
- return Message(id=(self.sent+1), body=int32(literal_eval(test_value)))
- elif self.amqp_type == 'long':
- return Message(id=(self.sent+1), body=long(literal_eval(test_value)))
- elif self.amqp_type == 'float':
- return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0]))
- elif self.amqp_type == 'double':
- return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0])
- elif self.amqp_type == 'decimal32':
- return Message(id=(self.sent+1), body=decimal32(literal_eval(test_value)))
- elif self.amqp_type == 'decimal64':
- return Message(id=(self.sent+1), body=decimal64(literal_eval(test_value)))
- elif self.amqp_type == 'decimal128':
- return Message(id=(self.sent+1), body=decimal128(literal_eval(test_value)))
- elif self.amqp_type == 'char':
- return Message(id=(self.sent+1), body=char(test_value))
- elif self.amqp_type == 'timestamp':
- return Message(id=(self.sent+1), body=timestamp(literal_eval(test_value)))
- elif self.amqp_type == 'uuid':
- return Message(id=(self.sent+1), body=UUID(test_value))
- elif self.amqp_type == 'binary':
- return Message(id=(self.sent+1), body=bytes(test_value))
- elif self.amqp_type == 'string':
- return Message(id=(self.sent+1), body=unicode(test_value))
- elif self.amqp_type == 'symbol':
- return Message(id=(self.sent+1), body=symbol(test_value))
- elif self.amqp_type == 'list':
- return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run())
- elif self.amqp_type == 'map':
- return Message(id=(self.sent+1), body=StrToObj(list(test_value).__iter__()).run())
- else:
- print 'proton-python-send: Unsupported AMQP type "%s"' % self.amqp_type
- return None
-
- def on_accepted(self, event):
- self.confirmed += 1
- if self.confirmed == self.total:
- event.connection.close()
-
- def on_disconnected(self, event):
- self.sent = self.confirmed
-
- @staticmethod
- def _map_string_to_map(str_list):
- return {}
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: AMQP type
-# 4...n: Test values as strings
-try:
- Container(Sender('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4:])).run()
-except KeyboardInterrupt:
- pass
-except Exception as e:
- print 'proton-python-send EXCEPTION:', e
- print format_exc()
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java b/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java
new file mode 100644
index 0000000..8c461ce
--- /dev/null
+++ b/src/main/java/org/apache/qpid/interop_test/obj_util/BytesToJavaObj.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.obj_util;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+
+public class BytesToJavaObj {
+ String hexObjStr = null;
+ Serializable obj = null;
+
+ public BytesToJavaObj(String hexObjStr) {
+ this.hexObjStr = hexObjStr;
+ }
+
+ public String run() {
+ byte[] bytes = hexStrToByteArray(this.hexObjStr);
+ this.obj = byteArrayToObject(bytes);
+ if (this.obj != null) {
+ return this.obj.getClass().getName() + ":" + this.obj.toString();
+ }
+ return "<null>";
+ }
+
+ protected Serializable byteArrayToObject(byte[] bytes) {
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ ObjectInput in = null;
+ try {
+ in = new ObjectInputStream(bis);
+ return (Serializable) in.readObject();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace(System.out);
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ } finally {
+ try {
+ bis.close();
+ } catch (IOException e) {} // ignore
+ try {
+ in.close();
+ } catch (IOException e) {} // ignore
+ }
+ return null;
+ }
+
+ protected byte[] hexStrToByteArray(String hexStr) {
+ int len = hexStr.length();
+ byte[] data = new byte[len / 2];
+ for(int i=0; i<len; i+=2) {
+ data[i/2] = (byte)((Character.digit(hexStr.charAt(i), 16) << 4) + Character.digit(hexStr.charAt(i+1), 16));
+ }
+ return data;
+ }
+
+ // ========= main ==========
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.out.println("BytesToJavaObj: Incorrect argument count");
+ System.out.println("BytesToJavaObj: Expected argument: \"<java_serialized_obj_str_hex>\"");
+ System.exit(1);
+ }
+ BytesToJavaObj btjo = new BytesToJavaObj(args[0]);
+ System.out.println(btjo.run());
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java b/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java
new file mode 100644
index 0000000..2bfbde0
--- /dev/null
+++ b/src/main/java/org/apache/qpid/interop_test/obj_util/JavaObjToBytes.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.obj_util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+//import java.util.Arrays;
+
+public class JavaObjToBytes {
+ String javaClassName = null;
+ String ctorArgStr = null;
+ Serializable obj = null;
+
+ public JavaObjToBytes(String javaClassName, String ctorArgStr) {
+ this.javaClassName = javaClassName;
+ this.ctorArgStr = ctorArgStr;
+ }
+
+ public byte[] run() {
+ createJavaObject();
+ return serializeJavaOjbect();
+ }
+
+ protected void createJavaObject() {
+ try {
+ Class<?> c = Class.forName(this.javaClassName);
+ if (this.javaClassName.compareTo("java.lang.Character") == 0) {
+ Constructor ctor = c.getConstructor(char.class);
+ if (this.ctorArgStr.length() == 1) {
+ // Use first character of string
+ obj = (Serializable)ctor.newInstance(this.ctorArgStr.charAt(0));
+ } else if (this.ctorArgStr.length() == 4 || this.ctorArgStr.length() == 6) {
+ // Format '\xNN' or '\xNNNN'
+ obj = (Serializable)ctor.newInstance((char)Integer.parseInt(this.ctorArgStr.substring(2), 16));
+ } else {
+ throw new Exception("JavaObjToBytes.createJavaObject() Malformed char string: \"" + this.ctorArgStr + "\"");
+ }
+ } else {
+ // Use string constructor
+ Constructor ctor = c.getConstructor(String.class);
+ obj = (Serializable)ctor.newInstance(this.ctorArgStr);
+ }
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (NoSuchMethodException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InstantiationException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (IllegalAccessException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InvocationTargetException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ protected byte[] serializeJavaOjbect() {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutput out = null;
+ try {
+ out = new ObjectOutputStream(bos);
+ out.writeObject(this.obj);
+ return bos.toByteArray();
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ } finally {
+ try {
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException e) {} // ignore
+ try {
+ bos.close();
+ } catch (IOException e) {} // ignore
+ }
+ return null;
+ }
+
+ // ========= main ==========
+
+ public static void main(String[] args) {
+ if (args.length != 1) {
+ System.out.println("JavaObjToBytes: Incorrect argument count");
+ System.out.println("JavaObjToBytes: Expected argument: \"<java_class_name>:<ctor_arg_str>\"");
+ System.exit(1);
+ }
+ int colonIndex = args[0].indexOf(":");
+ if (colonIndex < 0) {
+ System.out.println("Error: Incorect argument format: " + args[0]);
+ System.exit(-1);
+ }
+ String javaClassName = args[0].substring(0, colonIndex);
+ String ctorArgStr = args[0].substring(colonIndex+1);
+ JavaObjToBytes jotb = new JavaObjToBytes(javaClassName, ctorArgStr);
+ byte[] bytes = jotb.run();
+ System.out.println(args[0]);
+ for (byte b: bytes) {
+ System.out.print(String.format("%02x", b));
+ }
+ System.out.println();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/__init__.py
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/__init__.py b/src/py/qpid-interop-test/__init__.py
index 7cecede..404056b 100644
--- a/src/py/qpid-interop-test/__init__.py
+++ b/src/py/qpid-interop-test/__init__.py
@@ -19,4 +19,5 @@
import shim_utils
import types
+import jms
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/interop_test_errors.py
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/interop_test_errors.py b/src/py/qpid-interop-test/interop_test_errors.py
new file mode 100644
index 0000000..6be8959
--- /dev/null
+++ b/src/py/qpid-interop-test/interop_test_errors.py
@@ -0,0 +1,29 @@
+"""
+Module containing Error classes for interop testing
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class InteropTestError(StandardError):
+ """
+ Generic simple error class for use in interop tests
+ """
+ def __init__(self, error_message):
+ super(InteropTestError, self).__init__(error_message)
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/jms/jms_message_tests.py
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/jms/jms_message_tests.py b/src/py/qpid-interop-test/jms/jms_message_tests.py
new file mode 100755
index 0000000..a8b9fba
--- /dev/null
+++ b/src/py/qpid-interop-test/jms/jms_message_tests.py
@@ -0,0 +1,367 @@
+#!/usr/bin/env python
+
+"""
+Module to test JMS message types across different APIs
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import argparse
+import unittest
+
+from itertools import product
+from json import dumps, loads
+from os import getenv, path
+from subprocess import check_output, CalledProcessError
+
+# TODO - propose a sensible default when installation details are worked out
+QPID_INTEROP_TEST_HOME = getenv('QPID_INTEROP_TEST_HOME')
+
+class JmsMessageTypes(object):
+ """
+ Class which contains all the described JMS message types and the test values to be used in testing.
+ """
+
+ # The TYPE_SUBMAP defines test values for JMS message types that allow typed message content. Note that the
+ # types defined here are understood to be *Java* types and the stringified values are to be interpreted
+ # as the appropriate Java type by the send shim.
+ TYPE_SUBMAP = {
+ 'boolean': ['True', 'False'],
+ 'byte': ['-0x80', '-0x1', '0x0', '0x7f'],
+ 'bytes': [b'', b'12345', b'Hello, world', b'\\x01\\x02\\x03\\x04\\x05abcde\\x80\\x81\\xfe\\xff'],
+ #b'The quick brown fox jumped over the lazy dog 0123456789.' * 100],
+ 'char': ['a', 'Z', '\x01', '\x7f'],
+ 'double': ['0x0000000000000000', # 0.0
+ '0x8000000000000000', # -0.0
+ '0x400921fb54442eea', # pi (3.14159265359) positive decimal
+ '0xc005bf0a8b145fcf', # -e (-2.71828182846) negative decimal
+ '0x0000000000000001', # Smallest positive denormalized number
+ '0x8000000000000001', # Smallest negative denormalized number
+ '0x000fffffffffffff', # Largest positive denormalized number
+ '0x8010000000000000', # Largest negative denormalized number
+ '0x7fefffffffffffff', # Largest positive normalized number
+ '0xffefffffffffffff', # Largest negative normalized number
+ '0x7ff0000000000000', # +Infinity
+ '0xfff0000000000000', # -Infinity
+ '0x7ff8000000000000'], # +NaN
+ 'float': ['0x00000000', # 0.0
+ '0x80000000', # -0.0
+ '0x40490fdb', # pi (3.14159265359) positive decimal
+ '0xc02df854', # -e (-2.71828182846) negative decimal
+ '0x00000001', # Smallest positive denormalized number
+ '0x80000001', # Smallest negative denormalized number
+ '0x007fffff', # Largest positive denormalized number
+ '0x807fffff', # Largest negative denormalized number
+ '0x00800000', # Smallest positive normalized number
+ '0x80800000', # Smallest negative normalized number
+ '0x7f7fffff', # Largest positive normalized number
+ '0xff7fffff', # Largest negative normalized number
+ '0x7f800000', # +Infinity
+ '0xff800000', # -Infinity
+ '0x7fc00000'], # +NaN
+ 'int': ['-0x80000000', '-0x81', '-0x80', '-0x1', '0x0', '0x7f', '0x80', '0x7fffffff'],
+ 'long': ['-0x8000000000000000', '-0x81', '-0x80', '-0x1', '0x0', '0x7f', '0x80', '0x7fffffffffffffff'],
+ 'short': ['-0x8000', '-0x1', '0x0', '0x7fff'],
+ 'string': ['',
+ 'Hello, world',
+ '"Hello, world"',
+ "Charlie's \"peach\"",
+ 'Charlie\'s "peach"'],
+ #'The quick brown fox jumped over the lazy dog 0123456789.' * 100]
+ }
+ TYPE_MAP = {
+ 'JMS_BYTESMESSAGE_TYPE': TYPE_SUBMAP,
+ 'JMS_MAPMESSAGE_TYPE': TYPE_SUBMAP,
+ 'JMS_OBJECTMESSAGE_TYPE': {
+ 'java.lang.Boolean': ['true', 'false'],
+ 'java.lang.Byte': ['-128', '0', '127'],
+ 'java.lang.Character': [u'a', u'Z'],
+ 'java.lang.Double': ['0.0', '3.141592654', '-2.71828182846'],
+ 'java.lang.Float': ['0.0', '3.14159', '-2.71828'],
+ 'java.lang.Integer': ['-2147483648', '-129', '-128', '-1', '0', '127', '128', '2147483647'],
+ 'java.lang.Long' : ['-9223372036854775808', '-129', '-128', '-1', '0', '127', '128', '9223372036854775807'],
+ 'java.lang.Short': ['-32768', '-129', '-128', '-1', '0', '127', '128', '32767'],
+ 'java.lang.String': [u'',
+ u'Hello, world',
+ u'"Hello, world"',
+ u"Charlie's \"peach\"",
+ u'Charlie\'s "peach"']
+ },
+ 'JMS_STREAMMESSAGE_TYPE': TYPE_SUBMAP,
+ 'JMS_TEXTMESSAGE_TYPE': {'text': ['',
+ 'Hello, world',
+ '"Hello, world"',
+ "Charlie's \"peach\"",
+ 'Charlie\'s "peach"'],}
+ #'The quick brown fox jumped over the lazy dog 0123456789.' * 100]}
+ }
+
+ @staticmethod
+ def get_type_list():
+ """Return a list of JMS message types which this test suite supports"""
+ return JmsMessageTypes.TYPE_MAP.keys()
+
+ @staticmethod
+ def get_test_value_map(jms_messagae_type):
+ """Return a list of test values to use when testing the supplied JMS message type."""
+ if jms_messagae_type not in JmsMessageTypes.TYPE_MAP.keys():
+ return None
+ return JmsMessageTypes.TYPE_MAP[jms_messagae_type]
+
+
+class JmsMessageTypeTestCase(unittest.TestCase):
+ """
+ Abstract base class for JMS message type test cases
+ """
+
+ def run_test(self, broker_addr, jms_message_type, test_values, send_shim, receive_shim):
+ """
+ Run this test by invoking the shim send method to send the test values, followed by the shim receive method
+ to receive the values. Finally, compare the sent values with the received values.
+ """
+ if len(test_values) > 0:
+ queue_name = 'qpid-interop.jms_message_type_tests.%s.%s.%s' % (jms_message_type, send_shim.NAME,
+ receive_shim.NAME)
+ json_test_values_str = dumps(test_values)
+ send_error_text = send_shim.send(broker_addr, queue_name, jms_message_type, json_test_values_str)
+ if len(send_error_text) > 0:
+ self.fail('Send shim \'%s\':\n%s' % (send_shim.NAME, send_error_text))
+ num_test_values = {}
+ for index in test_values.keys():
+ num_test_values[index] = len(test_values[index])
+ json_test_num_values_str = dumps(num_test_values)
+ receive_text = receive_shim.receive(broker_addr, queue_name, jms_message_type, json_test_num_values_str)
+ if isinstance(receive_text, str):
+ self.fail(receive_text)
+ else:
+ self.assertEqual(receive_text, test_values, msg='\n sent:%s\n\n received:%s' % \
+ (test_values, receive_text))
+ else:
+ self.fail('Type %s has no test values' % jms_message_type)
+
+
+def create_testcase_class(broker_addr, jms_message_type, test_values, shim_product):
+ """
+ Class factory function which creates new subclasses to JmsMessageTypeTestCase.
+ """
+
+ def __repr__(self):
+ """Print the class name"""
+ return self.__class__.__name__
+
+ def add_test_method(cls, send_shim, receive_shim):
+ """Function which creates a new test method in class cls"""
+
+ def inner_test_method(self):
+ self.run_test(self.broker_addr, self.jms_message_type, self.test_values, send_shim, receive_shim)
+
+ inner_test_method.__name__ = 'test_%s_%s->%s' % (jms_message_type[4:-5], send_shim.NAME, receive_shim.NAME)
+ #inner_test_method.__doc__ = 'JMS message type \'%s\' interop test: %s -> %s' % \
+ # (jms_message_type, send_shim.NAME, receive_shim.NAME)
+ setattr(cls, inner_test_method.__name__, inner_test_method)
+
+ class_name = jms_message_type[4:-5].title() + 'TestCase'
+ class_dict = {'__name__': class_name,
+ '__repr__': __repr__,
+ '__doc__': 'Test case for JMS message type \'%s\'' % jms_message_type,
+ 'jms_message_type': jms_message_type,
+ 'broker_addr': broker_addr,
+ 'test_values': test_values}
+ new_class = type(class_name, (JmsMessageTypeTestCase,), class_dict)
+ for send_shim, receive_shim in shim_product:
+ add_test_method(new_class, send_shim, receive_shim)
+ return new_class
+
+class Shim(object):
+ """
+ Abstract shim class, parent of all shims.
+ """
+ NAME = None
+ SEND = None
+ RECEIVE = None
+ USE_SHELL = False
+
+ def send(self, broker_addr, queue_name, jms_message_type, json_test_values_str):
+ """
+ Send the values of type jms_message_type in json_test_values_str to queue queue_name.
+ Return output (if any) from stdout.
+ """
+ arg_list = []
+ arg_list.extend(self.SEND)
+ arg_list.extend([broker_addr, queue_name, jms_message_type])
+ arg_list.append(json_test_values_str)
+
+ try:
+ #print '\n>>>', arg_list # DEBUG - useful to see command-line sent to shim
+ return check_output(arg_list, shell=self.USE_SHELL)
+ except CalledProcessError as exc:
+ return str(exc) + '\n\nOutput:\n' + exc.output
+ except Exception as exc:
+ return str(exc)
+
+
+ def receive(self, broker_addr, queue_name, jms_message_type, json_test_num_values_str):
+ """
+ Receive json_test_num_values_str messages containing type jms_message_type from queue queue_name.
+ If the first line returned from stdout is the AMQP type, then the rest is assumed to be the returned
+ test value list. Otherwise error output is assumed.
+ """
+ output = ''
+ try:
+ arg_list = []
+ arg_list.extend(self.RECEIVE)
+ arg_list.extend([broker_addr, queue_name, jms_message_type])
+ arg_list.append(json_test_num_values_str)
+ #print '\n>>>', arg_list # DEBUG - useful to see command-line sent to shim
+ output = check_output(arg_list)
+ #print '<<<', output # DEBUG- useful to see text received from shim
+ str_tvl = output.split('\n')[:-1] # remove trailing \n
+ if str_tvl[0] == jms_message_type:
+ return loads(str_tvl[1])
+ else:
+ return output # return error string
+ except CalledProcessError as exc:
+ return str(exc) + '\n\n' + exc.output
+ except Exception as exc:
+ return str(exc)
+
+
+class ProtonPythonShim(Shim):
+ """
+ Shim for qpid-proton Python client
+ """
+ NAME = 'ProtonPython'
+ SHIM_LOC = path.join(QPID_INTEROP_TEST_HOME, 'shims', 'qpid-proton-python', 'src')
+ SEND = [path.join(SHIM_LOC, 'jms-sender-shim.py')]
+ RECEIVE = [path.join(SHIM_LOC, 'jms-receiver-shim.py')]
+
+
+class QpidJmsShim(Shim):
+ """
+ Shim for qpid-jms JMS client
+ """
+ NAME = 'QpidJms'
+
+ # Installed qpid versions
+ QPID_JMS_VER = '0.4.0-SNAPSHOT'
+ QPID_PROTON_J_VER = '0.10-SNAPSHOT'
+
+ # Classpath components
+ QPID_INTEROP_TEST_SHIM_JAR = path.join(QPID_INTEROP_TEST_HOME, 'shims', 'qpid-jms', 'target', 'qpid-jms-shim.jar')
+ MAVEN_REPO_PATH = path.join(getenv('HOME'), '.m2', 'repository')
+ JMS_API_JAR = path.join(MAVEN_REPO_PATH, 'org', 'apache', 'geronimo', 'specs', 'geronimo-jms_1.1_spec', '1.1.1',
+ 'geronimo-jms_1.1_spec-1.1.1.jar')
+ JMS_IMPL_JAR = path.join(MAVEN_REPO_PATH, 'org', 'apache', 'qpid', 'qpid-jms-client', QPID_JMS_VER,
+ 'qpid-jms-client-' + QPID_JMS_VER + '.jar')
+ JSON_API_JAR = path.join(QPID_INTEROP_TEST_HOME, 'jars', 'javax.json-api-1.0.jar')
+ JSON_IMPL_JAR = path.join(QPID_INTEROP_TEST_HOME, 'jars', 'javax.json-1.0.4.jar')
+ LOGGER_API_JAR = path.join(MAVEN_REPO_PATH, 'org', 'slf4j', 'slf4j-api', '1.5.6', 'slf4j-api-1.5.6.jar')
+ LOGGER_IMPL_JAR = path.join(QPID_INTEROP_TEST_HOME, 'jars', 'slf4j-nop-1.5.6.jar')
+ PROTON_J_JAR = path.join(MAVEN_REPO_PATH, 'org', 'apache', 'qpid', 'proton-j', QPID_PROTON_J_VER,
+ 'proton-j-' + QPID_PROTON_J_VER + '.jar')
+ NETTY_JAR = path.join(MAVEN_REPO_PATH, 'io', 'netty', 'netty-all', '4.0.17.Final', 'netty-all-4.0.17.Final.jar')
+
+ CLASSPATH = ':'.join([QPID_INTEROP_TEST_SHIM_JAR,
+ JMS_API_JAR,
+ JMS_IMPL_JAR,
+ JSON_API_JAR,
+ JSON_IMPL_JAR,
+ LOGGER_API_JAR,
+ LOGGER_IMPL_JAR,
+ PROTON_J_JAR,
+ NETTY_JAR])
+ JAVA_HOME = getenv('JAVA_HOME', '/usr/bin') # Default only works in Linux
+ JAVA_EXEC = path.join(JAVA_HOME, 'java')
+ SEND = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.JmsSenderShim']
+ RECEIVE = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.JmsReceiverShim']
+
+
+# SHIM_MAP contains an instance of each client language shim that is to be tested as a part of this test. For
+# every shim in this list, a test is dynamically constructed which tests it against itself as well as every
+# other shim in the list.
+#
+# As new shims are added, add them into this map to have them included in the test cases.
+#SHIM_MAP = {ProtonPythonShim.NAME: ProtonPythonShim()}
+#SHIM_MAP = {QpidJmsShim.NAME: QpidJmsShim()}
+SHIM_MAP = {ProtonPythonShim.NAME: ProtonPythonShim(), QpidJmsShim.NAME: QpidJmsShim()}
+
+
+# TODO: Complete the test options to give fine control over running tests
+class TestOptions(object):
+ """
+ Class controlling command-line arguments used to control the test.
+ """
+ def __init__(self):
+ parser = argparse.ArgumentParser(description='Qpid-interop AMQP client interoparability test suite '
+ 'for JMS message types')
+ parser.add_argument('--broker', action='store', default='localhost:5672', metavar='BROKER:PORT',
+ help='Broker against which to run test suite.')
+# test_group = parser.add_mutually_exclusive_group()
+# test_group.add_argument('--include-test', action='append', metavar='TEST-NAME',
+# help='Name of test to include')
+# test_group.add_argument('--exclude-test', action='append', metavar='TEST-NAME',
+# help='Name of test to exclude')
+# type_group = test_group.add_mutually_exclusive_group()
+# type_group.add_argument('--include-type', action='append', metavar='AMQP-TYPE',
+# help='Name of AMQP type to include. Supported types:\n%s' %
+# sorted(JmsMessageTypes.TYPE_MAP.keys()))
+ parser.add_argument('--exclude-type', action='append', metavar='JMS-MESSAGE-TYPE',
+ help='Name of JMS message type to exclude. Supported types:\n%s' %
+ sorted(JmsMessageTypes.TYPE_MAP.keys()))
+# shim_group = test_group.add_mutually_exclusive_group()
+# shim_group.add_argument('--include-shim', action='append', metavar='SHIM-NAME',
+# help='Name of shim to include. Supported shims:\n%s' % SHIM_NAMES)
+ parser.add_argument('--exclude-shim', action='append', metavar='SHIM-NAME',
+ help='Name of shim to exclude. Supported shims:\n%s' % sorted(SHIM_MAP.keys()))
+ self.args = parser.parse_args()
+
+
+#--- Main program start ---
+
+if __name__ == '__main__':
+
+ ARGS = TestOptions().args
+ #print 'ARGS:', ARGS # DEBUG
+
+ # TEST_CASE_CLASSES is a list that collects all the test classes that are constructed. One class is constructed
+ # per AMQP type used as the key in map JmsMessageTypes.TYPE_MAP.
+ TEST_CASE_CLASSES = []
+
+ # TEST_SUITE is the final suite of tests that will be run and which contains all the dynamically created
+ # type classes, each of which contains a test for the combinations of client shims
+ TEST_SUITE = unittest.TestSuite()
+
+ # Remove shims excluded from the command-line
+ if ARGS.exclude_shim is not None:
+ for shim in ARGS.exclude_shim:
+ SHIM_MAP.pop(shim)
+ # Create test classes dynamically
+ for at in sorted(JmsMessageTypes.get_type_list()):
+ if ARGS.exclude_type is None or at not in ARGS.exclude_type:
+ test_case_class = create_testcase_class(ARGS.broker,
+ at,
+ JmsMessageTypes.get_test_value_map(at),
+ product(SHIM_MAP.values(), repeat=2))
+ TEST_CASE_CLASSES.append(test_case_class)
+ TEST_SUITE.addTest(unittest.makeSuite(test_case_class))
+
+ # Finally, run all the dynamically created tests
+ unittest.TextTestRunner(verbosity=2).run(TEST_SUITE)
+
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/shim_utils.py
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/shim_utils.py b/src/py/qpid-interop-test/shim_utils.py
index ba70654..14d3a96 100755
--- a/src/py/qpid-interop-test/shim_utils.py
+++ b/src/py/qpid-interop-test/shim_utils.py
@@ -180,7 +180,7 @@ class StrToStr(StrToObj):
def _process_char(self, char):
"""
This function processes a python string type, and continues
- consuming characters until another valid delimiter character
+ consuming characters until another matching delimiter character
('\'' or '"') is encountered. A delimiter character that is
preceded by an escape character ('\\') is excluded.
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/shim_utils.pyc
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/shim_utils.pyc b/src/py/qpid-interop-test/shim_utils.pyc
deleted file mode 100644
index 49b13d2..0000000
Binary files a/src/py/qpid-interop-test/shim_utils.pyc and /dev/null differ
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/src/py/qpid-interop-test/types/simple_type_tests.py
----------------------------------------------------------------------
diff --git a/src/py/qpid-interop-test/types/simple_type_tests.py b/src/py/qpid-interop-test/types/simple_type_tests.py
index 48e1195..3457d13 100755
--- a/src/py/qpid-interop-test/types/simple_type_tests.py
+++ b/src/py/qpid-interop-test/types/simple_type_tests.py
@@ -27,6 +27,7 @@ import argparse
import unittest
from ast import literal_eval
+from interop_test_errors import InteropTestError
from itertools import product
from os import getenv, path
from proton import char, int32, symbol, timestamp, ulong
@@ -37,14 +38,6 @@ from uuid import UUID, uuid4
QPID_INTEROP_TEST_HOME = getenv('QPID_INTEROP_TEST_HOME') # TODO - propose a sensible default when installation details are worked out
-class SimpleTypeTestError(StandardError):
- """
- Error class for use in simpe AMQP type tests
- """
- def __init__(self, error_message):
- super(SimpleTypeTestError, self).__init__(error_message)
-
-
class AmqpPrimitiveTypes(object):
"""
Class which contains all the described AMQP primitive types and the test values to be used in testing.
@@ -88,7 +81,7 @@ class AmqpPrimitiveTypes(object):
'0x000fffffffffffff', # Largest positive denormalized number
'0x800fffffffffffff', # Largest negative denormalized number
'0x0010000000000000', # Smallest positive normalized number
- '0x8010000000000000', # Smallest positive normalized number
+ '0x8010000000000000', # Smallest negative normalized number
'0x7fefffffffffffff', # Largest positive normalized number
'0xffefffffffffffff', # Largest negative normalized number
'0x7ff0000000000000', # +Infinity
@@ -97,8 +90,8 @@ class AmqpPrimitiveTypes(object):
'0xfff8000000000000'], # -NaN
'decimal32': [0, 100, -1000.001, 3.14159, 1.234e+56],
'decimal64': [0, 100, -1000.001, 3.14159, 1.234e+56],
-# 'decimal128': [0, 100, -1000.001, 3.14159, 1.234e+56], # Hangs python shim, ok in jms shim
-# 'char': [u'a', u'Z', u'\u0001', u'\u007f'], # Hangs python shim, ok in jms shim
+ 'decimal128': [0, 100, -1000.001, 3.14159, 1.234e+56], # Hangs python shim, ok in jms shim
+ 'char': [u'a', u'Z', u'\u0001', u'\u007f'], # Hangs python shim, ok in jms shim
# timestamp must be in milliseconds since the unix epoch
'timestamp': [0, int(mktime((2000, 1, 1, 0, 0, 0, 5, 1, 0))*1000), int(time()*1000)],
'uuid': [UUID(int=0x0), UUID('00010203-0405-0607-0809-0a0b0c0d0e0f'), uuid4()],
@@ -108,13 +101,13 @@ class AmqpPrimitiveTypes(object):
'string': [u'', u'Hello, world!', u'"Hello, world!"', u"Charlie's peach",
u'The quick brown fox jumped over the lazy dog 0123456789.' * 1000],
'symbol': ['', 'myDomain.123', 'domain.0123456789.' * 1000],
-# 'list': [[],
-# [1, -2, 3.14],
-# [u'a', u'b', u'c'],
-# [ulong(12345), timestamp(int(time()*1000)), int32(-25), uuid4(), symbol('a.b.c')],
-# [[], None, [1,2,3], {1:'one', 2:'two', 3:'three', 4:True, 5:False, 6:None}, True, False, char(u'5')],
-# [[],[[],[[],[],[]],[]],[]],
-# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] * 1000]#,
+ 'list': [[],
+ [1, -2, 3.14],
+ [u'a', u'b', u'c'],
+ [ulong(12345), timestamp(int(time()*1000)), int32(-25), uuid4(), symbol('a.b.c')],
+ [[], None, [1,2,3], {1:'one', 2:'two', 3:'three', 4:True, 5:False, 6:None}, True, False, char(u'5')],
+ [[],[[],[[],[],[]],[]],[]],
+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] * 1000],
#'map': [{}, {1:u'one', 2:u'two'}, {None:None, 1:1, '2':'2', True:False, False:True}]#, # TODO: Bug in handling maps
#'array': [[], [1,2,3], ['Hello', 'world']] # TODO: Not yet implemented
}
@@ -272,7 +265,8 @@ class Shim(object):
amqp_type == 'map':
received_test_value_list.append(StrToObj(list(stv).__iter__()).run())
else:
- raise SimpleTypeTestError('ERROR: Shim.receive(): AMQP type \'%s\' not implemented' % amqp_type)
+# raise SimpleTypeTestError('ERROR: Shim.receive(): AMQP type \'%s\' not implemented' % amqp_type)
+ raise InteropTestError('ERROR: Shim.receive(): AMQP type \'%s\' not implemented' % amqp_type)
return received_test_value_list
else:
return output # return error string
@@ -288,8 +282,8 @@ class ProtonPythonShim(Shim):
"""
NAME = 'ProtonPython'
SHIM_LOC = path.join(QPID_INTEROP_TEST_HOME, 'shims', 'qpid-proton-python', 'src')
- SEND = [path.join(SHIM_LOC, 'proton-python-send')]
- RECEIVE = [path.join(SHIM_LOC, 'proton-python-receive')]
+ SEND = [path.join(SHIM_LOC, 'amqp-send')]
+ RECEIVE = [path.join(SHIM_LOC, 'amqp-receive')]
class QpidJmsShim(Shim):
@@ -318,8 +312,8 @@ class QpidJmsShim(Shim):
CLASSPATH = ':'.join([QPID_INTEROP_TEST_SHIM_JAR, JMS_API_JAR, JMS_IMPL_JAR, LOGGER_API_JAR, LOGGER_IMPL_JAR, PROTON_J_JAR, NETTY_JAR])
JAVA_HOME = getenv('JAVA_HOME', '/usr/bin') # Default only works in Linux
JAVA_EXEC = path.join(JAVA_HOME, 'java')
- SEND = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.ProtonJmsSender']
- RECEIVE = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.ProtonJmsReceiver']
+ SEND = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.AmqpSender']
+ RECEIVE = [JAVA_EXEC, '-cp', CLASSPATH, 'org.apache.qpid.interop_test.shim.AmqpReceiver']
# SHIM_MAP contains an instance of each client language shim that is to be tested as a part of this test. For
@@ -336,7 +330,7 @@ SHIM_MAP = {ProtonPythonShim.NAME: ProtonPythonShim()}
class TestOptions(object):
def __init__(self):
- parser = argparse.ArgumentParser(description='Qpid-interop AMQP client interoparability test suite')
+ parser = argparse.ArgumentParser(description='Qpid-interop AMQP client interoparability test suite for AMQP simple types')
parser.add_argument('--broker', action='store', default='localhost:5672', metavar='BROKER:PORT',
help='Broker against which to run test suite.')
# test_group = parser.add_mutually_exclusive_group()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-interop-test git commit: QPIDIT-17: Add JMS test suite
with Qpid-JMS and Proton-Python clients.
Posted by kp...@apache.org.
QPIDIT-17: Add JMS test suite with Qpid-JMS and Proton-Python clients.
Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/ebdacb0d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/ebdacb0d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/ebdacb0d
Branch: refs/heads/master
Commit: ebdacb0dd9063d73b49b681d39f7450fa05b391b
Parents: 77eba67
Author: Kim van der Riet <kp...@apache.org>
Authored: Tue Aug 25 22:48:34 2015 -0400
Committer: Kim van der Riet <kp...@apache.org>
Committed: Tue Aug 25 22:48:34 2015 -0400
----------------------------------------------------------------------
jars/javax.json-1.0.4.jar | Bin 0 -> 85147 bytes
jars/javax.json-api-1.0.jar | Bin 0 -> 19754 bytes
java-build.sh | 15 +
java-clean.sh | 5 +
shims/qpid-jms/java-build.sh | 7 +-
.../qpid/interop_test/shim/AmqpReceiver.java | 271 ++++++++++++++
.../qpid/interop_test/shim/AmqpSender.java | 260 +++++++++++++
.../qpid/interop_test/shim/JmsReceiverShim.java | 349 ++++++++++++++++++
.../qpid/interop_test/shim/JmsSenderShim.java | 368 +++++++++++++++++++
.../interop_test/shim/ProtonJmsReceiver.java | 269 --------------
.../qpid/interop_test/shim/ProtonJmsSender.java | 258 -------------
shims/qpid-proton-python/src/amqp-receive | 102 +++++
shims/qpid-proton-python/src/amqp-send | 135 +++++++
.../qpid-proton-python/src/jms-receiver-shim.py | 234 ++++++++++++
shims/qpid-proton-python/src/jms-sender-shim.py | 241 ++++++++++++
.../src/proton-python-receive | 106 ------
shims/qpid-proton-python/src/proton-python-send | 136 -------
.../interop_test/obj_util/BytesToJavaObj.java | 83 +++++
.../interop_test/obj_util/JavaObjToBytes.java | 129 +++++++
src/py/qpid-interop-test/__init__.py | 1 +
src/py/qpid-interop-test/interop_test_errors.py | 29 ++
.../qpid-interop-test/jms/jms_message_tests.py | 367 ++++++++++++++++++
src/py/qpid-interop-test/shim_utils.py | 2 +-
src/py/qpid-interop-test/shim_utils.pyc | Bin 14712 -> 0 bytes
.../types/simple_type_tests.py | 42 +--
25 files changed, 2612 insertions(+), 797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/jars/javax.json-1.0.4.jar
----------------------------------------------------------------------
diff --git a/jars/javax.json-1.0.4.jar b/jars/javax.json-1.0.4.jar
new file mode 100644
index 0000000..09967d8
Binary files /dev/null and b/jars/javax.json-1.0.4.jar differ
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/jars/javax.json-api-1.0.jar
----------------------------------------------------------------------
diff --git a/jars/javax.json-api-1.0.jar b/jars/javax.json-api-1.0.jar
new file mode 100644
index 0000000..d276c79
Binary files /dev/null and b/jars/javax.json-api-1.0.jar differ
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/java-build.sh
----------------------------------------------------------------------
diff --git a/java-build.sh b/java-build.sh
new file mode 100755
index 0000000..27de828
--- /dev/null
+++ b/java-build.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+# JARS
+#JMS_API=${HOME}/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar:${HOME}/.m2/repository/org/apache/qpid/qpid-jms-client/0.4.0-SNAPSHOT/qpid-jms-client-0.4.0-SNAPSHOT.jar
+#JSON_API=../../jars/javax.json-api-1.0.jar
+#CLASSPATH=${JMS_API}:${JSON_API}
+CLASSPATH=
+
+BASEPATH=org/apache/qpid/interop_test/obj_util
+SRCPATH=src/main/java/${BASEPATH}
+TARGETPATH=target
+
+mkdir -p ${TARGETPATH}/classes
+javac -cp ${CLASSPATH} -Xlint:unchecked -d ${TARGETPATH}/classes ${SRCPATH}/JavaObjToBytes.java ${SRCPATH}/BytesToJavaObj.java
+jar -cf ${TARGETPATH}/JavaObjUtils.jar -C ${TARGETPATH}/classes ${BASEPATH}/JavaObjToBytes.class -C ${TARGETPATH}/classes ${BASEPATH}/BytesToJavaObj.class
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/java-clean.sh
----------------------------------------------------------------------
diff --git a/java-clean.sh b/java-clean.sh
new file mode 100755
index 0000000..7fc8be1
--- /dev/null
+++ b/java-clean.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+TARGETPATH=target
+
+rm -rf ${TARGETPATH}/*.jar ${TARGETPATH}/classes/*
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/java-build.sh
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/java-build.sh b/shims/qpid-jms/java-build.sh
index 78f5bfa..6afe72f 100755
--- a/shims/qpid-jms/java-build.sh
+++ b/shims/qpid-jms/java-build.sh
@@ -2,12 +2,13 @@
# JARS
JMS_API=${HOME}/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar:${HOME}/.m2/repository/org/apache/qpid/qpid-jms-client/0.4.0-SNAPSHOT/qpid-jms-client-0.4.0-SNAPSHOT.jar
-CLASSPATH=${JMS_API}
+JSON_API=../../jars/javax.json-api-1.0.jar
+CLASSPATH=${JMS_API}:${JSON_API}
BASEPATH=org/apache/qpid/interop_test/shim
SRCPATH=src/main/java/${BASEPATH}
TARGETPATH=target
mkdir -p ${TARGETPATH}/classes
-javac -cp ${CLASSPATH} -d ${TARGETPATH}/classes ${SRCPATH}/ProtonJmsSender.java ${SRCPATH}/ProtonJmsReceiver.java
-jar -cf ${TARGETPATH}/qpid-jms-shim.jar -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsSender\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/ProtonJmsReceiver\$MyExceptionListener.class
+javac -cp ${CLASSPATH} -Xlint:unchecked -d ${TARGETPATH}/classes ${SRCPATH}/AmqpSender.java ${SRCPATH}/AmqpReceiver.java ${SRCPATH}/JmsSenderShim.java ${SRCPATH}/JmsReceiverShim.java
+jar -cf ${TARGETPATH}/qpid-jms-shim.jar -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpSender\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/AmqpReceiver\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsSenderShim\$MyExceptionListener.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim\$1.class -C ${TARGETPATH}/classes ${BASEPATH}/JmsReceiverShim\$MyExceptionListener.class
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
new file mode 100644
index 0000000..cf3ad81
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpReceiver.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.math.BigDecimal;
+import java.util.UUID;
+import java.util.Vector;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class AmqpReceiver {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final int TIMEOUT = 1000;
+ private static final String[] SUPPORTED_AMQP_TYPES = {"null",
+ "boolean",
+ "ubyte",
+ "ushort",
+ "uint",
+ "ulong",
+ "byte",
+ "short",
+ "int",
+ "long",
+ "float",
+ "double",
+ "decimal32",
+ "decimal64",
+ "decimal128",
+ "char",
+ "timestamp",
+ "uuid",
+ "binary",
+ "string",
+ "symbol",
+ "list",
+ "map",
+ "array"};
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("AmqpReceiver: Insufficient number of arguments");
+ System.out.println("AmqpReceiver: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String amqpType = args[2];
+ int numTestValues = Integer.parseInt(args[3]);
+ Connection connection = null;
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ connection = factory.createConnection(USER, PASSWORD);
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ Vector<String> outList = new Vector<String>();
+ outList.add(amqpType);
+ if (isSupportedAmqpType(amqpType)) {
+ int actualCount = 0;
+ Message message = null;
+ for (int i = 1; i <= numTestValues; i++, actualCount++) {
+ message = messageConsumer.receive(TIMEOUT);
+ if (message == null)
+ break;
+ switch (amqpType) {
+ case "null":
+ long bodyLength = ((BytesMessage)message).getBodyLength();
+ if (bodyLength == 0L) {
+ outList.add("None");
+ } else {
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Expected 0 bytes, read " + bodyLength);
+ }
+ break;
+ case "boolean":
+ String bs = String.valueOf(((BytesMessage)message).readBoolean());
+ outList.add(Character.toUpperCase(bs.charAt(0)) + bs.substring(1));
+ break;
+ case "ubyte":
+ byte byteValue = ((BytesMessage)message).readByte();
+ short ubyteValue = (short)(byteValue & 0xff);
+ outList.add(String.valueOf(ubyteValue));
+ break;
+ case "ushort":
+ {
+ byte[] byteArray = new byte[2];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 2) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 2 bytes, read " + numBytes);
+ }
+ int ushortValue = 0;
+ for (int j=0; j<byteArray.length; j++) {
+ ushortValue = (ushortValue << 8) + (byteArray[j] & 0xff);
+ }
+ outList.add(String.valueOf(ushortValue));
+ break;
+ }
+ case "uint":
+ {
+ byte[] byteArray = new byte[4];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 4) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 4 bytes, read " + numBytes);
+ }
+ long uintValue = 0;
+ for (int j=0; j<byteArray.length; j++) {
+ uintValue = (uintValue << 8) + (byteArray[j] & 0xff);
+ }
+ outList.add(String.valueOf(uintValue));
+ break;
+ }
+ case "ulong":
+ case "timestamp":
+ {
+ // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
+ byte[] byteArray = new byte[8];
+ int numBytes = ((BytesMessage)message).readBytes(byteArray);
+ if (numBytes != 8) {
+ // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
+ throw new Exception("AmqpReceiver: JMS BytesMessage size error: Exptected 8 bytes, read " + numBytes);
+ }
+ // TODO: shortcut in use here - this byte array should go through a Java type that can represent this as a number - such as BigInteger.
+ outList.add(String.format("0x%02x%02x%02x%02x%02x%02x%02x%02x", byteArray[0], byteArray[1],
+ byteArray[2], byteArray[3], byteArray[4], byteArray[5], byteArray[6], byteArray[7]));
+ break;
+ }
+ case "byte":
+ outList.add(String.valueOf(((BytesMessage)message).readByte()));
+ break;
+ case "short":
+ outList.add(String.valueOf(((BytesMessage)message).readShort()));
+ break;
+ case "int":
+ outList.add(String.valueOf(((BytesMessage)message).readInt()));
+ break;
+ case "long":
+ outList.add(String.valueOf(((BytesMessage)message).readLong()));
+ break;
+ case "float":
+ float f = ((BytesMessage)message).readFloat();
+ int i0 = Float.floatToRawIntBits(f);
+ outList.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "double":
+ double d = ((BytesMessage)message).readDouble();
+ long l = Double.doubleToRawLongBits(d);
+ outList.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "decimal32":
+ BigDecimal bd32 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd32.toString());
+ break;
+ case "decimal64":
+ BigDecimal bd64 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd64.toString());
+ break;
+ case "decimal128":
+ BigDecimal bd128 = (BigDecimal)((ObjectMessage)message).getObject();
+ outList.add(bd128.toString());
+ break;
+ case "char":
+ outList.add(String.format("%c", ((BytesMessage)message).readChar()));
+ break;
+ case "uuid":
+ UUID uuid = (UUID)((ObjectMessage)message).getObject();
+ outList.add(uuid.toString());
+ break;
+ case "binary":
+ BytesMessage bm = (BytesMessage)message;
+ int msgLen = (int)bm.getBodyLength();
+ byte[] ba = new byte[msgLen];
+ if (bm.readBytes(ba) == msgLen) {
+ outList.add(new String(ba));
+ } else {
+ // TODO: Raise exception or error here: size mismatch
+ }
+ break;
+ case "string":
+ outList.add(((TextMessage)message).getText());
+ break;
+ case "symbol":
+ outList.add(((BytesMessage)message).readUTF());
+ break;
+ case "list":
+ break;
+ case "map":
+ break;
+ case "array":
+ break;
+ default:
+ // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
+ connection.close();
+ throw new Exception("AmqpReceiver: Internal error: unsupported AMQP type \"" + amqpType + "\"");
+ }
+ }
+ } else {
+ System.out.println("ERROR: AmqpReceiver: AMQP type \"" + amqpType + "\" is not supported");
+ connection.close();
+ System.exit(1);
+ }
+
+ connection.close();
+
+ // No exception, print results
+ for (int i=0; i<outList.size(); i++) {
+ System.out.println(outList.get(i));
+ }
+ } catch (Exception exp) {
+ if (connection != null)
+ connection.close();
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static boolean isSupportedAmqpType(String amqpType) {
+ for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
+ if (amqpType.equals(supportedAmqpType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
new file mode 100644
index 0000000..3fc5a90
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/AmqpSender.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.MathContext;
+import java.util.Arrays;
+import java.util.UUID;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class AmqpSender {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final String[] SUPPORTED_AMQP_TYPES = {"null",
+ "boolean",
+ "ubyte",
+ "ushort",
+ "uint",
+ "ulong",
+ "byte",
+ "short",
+ "int",
+ "long",
+ "float",
+ "double",
+ "decimal32",
+ "decimal64",
+ "decimal128",
+ "char",
+ "timestamp",
+ "uuid",
+ "binary",
+ "string",
+ "symbol",
+ "list",
+ "map",
+ "array"};
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("AmqpSender: Insufficient number of arguments");
+ System.out.println("AmqpSender: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String amqpType = args[2];
+ String[] testValueList = Arrays.copyOfRange(args, 3, args.length); // Use remaining args as test values
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ Connection connection = factory.createConnection();
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ if (isSupportedAmqpType(amqpType)) {
+ Message message = null;
+ for (String testValueStr : testValueList) {
+ switch (amqpType) {
+ case "null":
+ message = session.createBytesMessage();
+ break;
+ case "boolean":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBoolean(Boolean.parseBoolean(testValueStr));
+ break;
+ case "ubyte":
+ {
+ byte testValue = (byte)Short.parseShort(testValueStr);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeByte(testValue);
+ break;
+ }
+ case "ushort":
+ {
+ int testValue = Integer.parseInt(testValueStr);
+ byte[] byteArray = new byte[2];
+ byteArray[0] = (byte)(testValue >> 8);
+ byteArray[1] = (byte)(testValue);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "uint":
+ {
+ long testValue = Long.parseLong(testValueStr);
+ byte[] byteArray = new byte[4];
+ byteArray[0] = (byte)(testValue >> 24);
+ byteArray[1] = (byte)(testValue >> 16);
+ byteArray[2] = (byte)(testValue >> 8);
+ byteArray[3] = (byte)(testValue);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "ulong":
+ {
+ // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
+ BigInteger testValue = new BigInteger(testValueStr);
+ byte[] bigIntArray = testValue.toByteArray(); // may be 1 to 9 bytes depending on number
+ byte[] byteArray = {0, 0, 0, 0, 0, 0, 0, 0};
+ int effectiveBigIntArrayLen = bigIntArray.length > 8 ? 8 : bigIntArray.length; // Cap length at 8
+ int bigIntArrayOffs = bigIntArray.length > 8 ? bigIntArray.length - 8 : 0; // Offset when length > 8
+ for (int i=0; i<bigIntArray.length && i < 8; i++)
+ byteArray[8 - effectiveBigIntArrayLen + i] = bigIntArray[bigIntArrayOffs + i];
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes(byteArray);
+ break;
+ }
+ case "byte":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeByte(Byte.parseByte(testValueStr));
+ break;
+ case "short":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeShort(Short.parseShort(testValueStr));
+ break;
+ case "int":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeInt(Integer.parseInt(testValueStr));
+ break;
+ case "long":
+ case "timestamp":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeLong(Long.parseLong(testValueStr));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValueStr.substring(2), 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValueStr.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValueStr.substring(3), 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "decimal32":
+ BigDecimal bd32 = new BigDecimal(testValueStr, MathContext.DECIMAL32);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd32);
+ break;
+ case "decimal64":
+ BigDecimal bd64 = new BigDecimal(testValueStr, MathContext.DECIMAL64);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd64);
+ break;
+ case "decimal128":
+ BigDecimal bd128 = new BigDecimal(testValueStr, MathContext.DECIMAL128);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(bd128);
+ break;
+ case "char":
+ char c = 0;
+ if (testValueStr.length() == 1) // Single char
+ c = testValueStr.charAt(0);
+ else if (testValueStr.length() == 6) // unicode format
+ c = (char)Integer.parseInt(testValueStr, 16);
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeChar(c);
+ break;
+ case "uuid":
+ UUID uuid = UUID.fromString(testValueStr);
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject(uuid);
+ break;
+ case "binary":
+ message = session.createBytesMessage();
+ byte[] byteArray = testValueStr.getBytes();
+ ((BytesMessage)message).writeBytes(byteArray, 0, byteArray.length);
+ break;
+ case "string":
+ message = session.createTextMessage(testValueStr);
+ break;
+ case "symbol":
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeUTF(testValueStr);
+ break;
+ case "list":
+ break;
+ case "map":
+ break;
+ case "array":
+ break;
+ default:
+ // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
+ connection.close();
+ throw new Exception("AmqpSender: Internal error: unsupported AMQP type \"" + amqpType + "\"");
+ }
+ messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ } else {
+ System.out.println("ERROR: AmqpSender: AMQP type \"" + amqpType + "\" is not supported");
+ connection.close();
+ System.exit(1);
+ }
+
+ connection.close();
+ } catch (Exception exp) {
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static boolean isSupportedAmqpType(String amqpType) {
+ for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
+ if (amqpType.equals(supportedAmqpType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
new file mode 100644
index 0000000..f567638
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsReceiverShim.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.json.Json;
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonReader;
+import javax.json.JsonWriter;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class JmsReceiverShim {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final int TIMEOUT = 1000;
+ private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE",
+ "JMS_MAPMESSAGE_TYPE",
+ "JMS_OBJECTMESSAGE_TYPE",
+ "JMS_STREAMMESSAGE_TYPE",
+ "JMS_TEXTMESSAGE_TYPE"};
+
+ // args[0]: Broker URL
+ // args[1]: Queue name
+ // args[2]: JMS message type
+ // args[3]: JSON Test number map
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("JmsReceiverShim: Insufficient number of arguments");
+ System.out.println("JmsReceiverShim: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String jmsMessageType = args[2];
+ if (!isSupportedJmsMessageType(jmsMessageType)) {
+ System.out.println("ERROR: JmsReceiverShim: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ System.exit(1);
+ }
+
+ JsonReader jsonReader = Json.createReader(new StringReader(args[3]));
+ JsonObject numTestValuesMap = jsonReader.readObject();
+ jsonReader.close();
+
+ Connection connection = null;
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ connection = factory.createConnection(USER, PASSWORD);
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ List<String> keyList = new ArrayList<String>(numTestValuesMap.keySet());
+ Collections.sort(keyList);
+
+ Message message = null;
+ JsonObjectBuilder job = Json.createObjectBuilder();
+ for (String key: keyList) {
+ JsonArrayBuilder jab = Json.createArrayBuilder();
+ for (int i=0; i<numTestValuesMap.getJsonNumber(key).intValue(); ++i) {
+ message = messageConsumer.receive(TIMEOUT);
+ if (message == null) break;
+ switch (jmsMessageType) {
+ case "JMS_BYTESMESSAGE_TYPE":
+ switch (key) {
+ case "boolean":
+ jab.add(((BytesMessage)message).readBoolean()?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((BytesMessage)message).readByte()));
+ break;
+ case "bytes":
+ {
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead)));
+ } else {
+ // NOTE: For this case, an empty byte array has nothing to return
+ jab.add(new String());
+ }
+ }
+ break;
+ case "char":
+ jab.add(formatChar(((BytesMessage)message).readChar()));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((BytesMessage)message).readDouble());
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((BytesMessage)message).readFloat());
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((BytesMessage)message).readInt()));
+ break;
+ case "long":
+ jab.add(formatLong(((BytesMessage)message).readLong()));
+ break;
+ case "object":
+ {
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((BytesMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(Arrays.copyOfRange(bytesBuff, 0, numBytesRead));
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ Object obj = ois.readObject();
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ } else {
+ jab.add("<object error>");
+ }
+ }
+ break;
+ case "short":
+ jab.add(formatShort(((BytesMessage)message).readShort()));
+ break;
+ case "string":
+ jab.add(((BytesMessage)message).readUTF());
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_STREAMMESSAGE_TYPE":
+ switch (key) {
+ case "boolean":
+ jab.add(((StreamMessage)message).readBoolean()?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((StreamMessage)message).readByte()));
+ break;
+ case "bytes":
+ byte[] bytesBuff = new byte[65536];
+ int numBytesRead = ((StreamMessage)message).readBytes(bytesBuff);
+ if (numBytesRead >= 0) {
+ jab.add(new String(Arrays.copyOfRange(bytesBuff, 0, numBytesRead)));
+ } else {
+ System.out.println("StreamMessage.readBytes() returned " + numBytesRead);
+ jab.add("<bytes error>");
+ }
+ break;
+ case "char":
+ jab.add(formatChar(((StreamMessage)message).readChar()));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((StreamMessage)message).readDouble());
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((StreamMessage)message).readFloat());
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((StreamMessage)message).readInt()));
+ break;
+ case "long":
+ jab.add(formatLong(((StreamMessage)message).readLong()));
+ break;
+ case "object":
+ Object obj = ((StreamMessage)message).readObject();
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ break;
+ case "short":
+ jab.add(formatShort(((StreamMessage)message).readShort()));
+ break;
+ case "string":
+ jab.add(((StreamMessage)message).readString());
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_MAPMESSAGE_TYPE":
+ String name = String.format("%s%03d", key, i);
+ switch (key) {
+ case "boolean":
+ jab.add(((MapMessage)message).getBoolean(name)?"True":"False");
+ break;
+ case "byte":
+ jab.add(formatByte(((MapMessage)message).getByte(name)));
+ break;
+ case "bytes":
+ jab.add(new String(((MapMessage)message).getBytes(name)));
+ break;
+ case "char":
+ jab.add(formatChar(((MapMessage)message).getChar(name)));
+ break;
+ case "double":
+ long l = Double.doubleToRawLongBits(((MapMessage)message).getDouble(name));
+ jab.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
+ break;
+ case "float":
+ int i0 = Float.floatToRawIntBits(((MapMessage)message).getFloat(name));
+ jab.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
+ break;
+ case "int":
+ jab.add(formatInt(((MapMessage)message).getInt(name)));
+ break;
+ case "long":
+ jab.add(formatLong(((MapMessage)message).getLong(name)));
+ break;
+ case "object":
+ Object obj = ((MapMessage)message).getObject(name);
+ jab.add(obj.getClass().getName() + ":" + obj.toString());
+ break;
+ case "short":
+ jab.add(formatShort(((MapMessage)message).getShort(name)));
+ break;
+ case "string":
+ jab.add(((MapMessage)message).getString(name));
+ break;
+ default:
+ throw new Exception("JmsReceiverShim: Unknown subtype for " + jmsMessageType + ": \"" + key + "\"");
+ }
+ break;
+ case "JMS_OBJECTMESSAGE_TYPE":
+ jab.add(((ObjectMessage)message).getObject().toString());
+ break;
+ case "JMS_TEXTMESSAGE_TYPE":
+ jab.add(((TextMessage)message).getText());
+ break;
+ default:
+ connection.close();
+ throw new Exception("JmsReceiverShim: Internal error: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ }
+ }
+ job.add(key, jab);
+ }
+ connection.close();
+
+ System.out.println(jmsMessageType);
+ StringWriter out = new StringWriter();
+ JsonWriter jsonWriter = Json.createWriter(out);
+ jsonWriter.writeObject(job.build());
+ jsonWriter.close();
+ System.out.println(out.toString());
+ } catch (Exception exp) {
+ if (connection != null)
+ connection.close();
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static String formatByte(byte b) {
+ boolean neg = false;
+ if (b < 0) {
+ neg = true;
+ b = (byte)-b;
+ }
+ return String.format("%s0x%x", neg?"-":"", b);
+ }
+
+ protected static String formatChar(char c) {
+ if (Character.isLetterOrDigit(c)) {
+ return String.format("%c", c);
+ }
+ char[] ca = {c};
+ return new String(ca);
+ }
+
+ protected static String formatInt(int i) {
+ boolean neg = false;
+ if (i < 0) {
+ neg = true;
+ i = -i;
+ }
+ return String.format("%s0x%x", neg?"-":"", i);
+ }
+
+ protected static String formatLong(long l) {
+ boolean neg = false;
+ if (l < 0) {
+ neg = true;
+ l = -l;
+ }
+ return String.format("%s0x%x", neg?"-":"", l);
+ }
+
+ protected static String formatShort(int s) {
+ boolean neg = false;
+ if (s < 0) {
+ neg = true;
+ s = -s;
+ }
+ return String.format("%s0x%x", neg?"-":"", s);
+ }
+
+ protected static boolean isSupportedJmsMessageType(String jmsMessageType) {
+ for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) {
+ if (jmsMessageType.equals(supportedJmsMessageType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
new file mode 100644
index 0000000..e22be0a
--- /dev/null
+++ b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/JmsSenderShim.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.interop_test.shim;
+
+import java.io.Serializable;
+import java.io.StringReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+public class JmsSenderShim {
+ private static final String USER = "guest";
+ private static final String PASSWORD = "guest";
+ private static final String[] SUPPORTED_JMS_MESSAGE_TYPES = {"JMS_BYTESMESSAGE_TYPE",
+ "JMS_MAPMESSAGE_TYPE",
+ "JMS_OBJECTMESSAGE_TYPE",
+ "JMS_STREAMMESSAGE_TYPE",
+ "JMS_TEXTMESSAGE_TYPE"};
+
+ // args[0]: Broker URL
+ // args[1]: Queue name
+ // args[2]: JMS message type
+ // args[3]: JSON Test value map
+ public static void main(String[] args) throws Exception {
+ if (args.length < 4) {
+ System.out.println("JmsSenderShim: Insufficient number of arguments");
+ System.out.println("JmsSenderShim: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
+ System.exit(1);
+ }
+ String brokerAddress = "amqp://" + args[0];
+ String queueName = args[1];
+ String jmsMessageType = args[2];
+ if (!isSupportedJmsMessageType(jmsMessageType)) {
+ System.out.println("ERROR: JmsReceiver: unknown or unsupported JMS message type \"" + jmsMessageType + "\"");
+ System.exit(1);
+ }
+
+ JsonReader jsonReader = Json.createReader(new StringReader(args[3]));
+ JsonObject testValuesMap = jsonReader.readObject();
+ jsonReader.close();
+
+ try {
+ ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
+
+ Connection connection = factory.createConnection();
+ connection.setExceptionListener(new MyExceptionListener());
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName);
+
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ Message message = null;
+ List<String> keyList = new ArrayList<String>(testValuesMap.keySet());
+ Collections.sort(keyList);
+ for (String key: keyList) {
+ JsonArray testValues = testValuesMap.getJsonArray(key);
+ for (int i=0; i<testValues.size(); ++i) {
+ String testValue = testValues.getJsonString(i).getString();
+ switch (jmsMessageType) {
+ case "JMS_BYTESMESSAGE_TYPE":
+ message = createBytesMessage(session, key, testValue);
+ break;
+ case "JMS_MAPMESSAGE_TYPE":
+ message = createMapMessage(session, key, testValue, i);
+ break;
+ case "JMS_OBJECTMESSAGE_TYPE":
+ message = createObjectMessage(session, key, testValue);
+ break;
+ case "JMS_STREAMMESSAGE_TYPE":
+ message = createStreamMessage(session, key, testValue);
+ break;
+ case "JMS_TEXTMESSAGE_TYPE":
+ message = createTextMessage(session, testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message type \"" + jmsMessageType + "\"");
+ }
+ messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ connection.close();
+ } catch (Exception exp) {
+ System.out.println("Caught exception, exiting.");
+ exp.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+
+ protected static BytesMessage createBytesMessage(Session session, String testValueType, String testValue) throws Exception, JMSException {
+ BytesMessage message = session.createBytesMessage();
+ switch (testValueType) {
+ case "boolean":
+ message.writeBoolean(Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.writeByte(Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.writeBytes(testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X" or "\xNN"
+ message.writeChar(testValue.charAt(0));
+ } else {
+ throw new Exception("JmsSenderShim.createBytesMessage() Malformed char string: \"" + testValue + "\" of length " + testValue.length());
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.writeInt(Integer.decode(testValue));
+ break;
+ case "long":
+ message.writeLong(Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.writeObject(obj);
+ break;
+ case "short":
+ message.writeShort(Short.decode(testValue));
+ break;
+ case "string":
+ message.writeUTF(testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static MapMessage createMapMessage(Session session, String testValueType, String testValue, int testValueNum) throws Exception, JMSException {
+ MapMessage message = session.createMapMessage();
+ String name = String.format("%s%03d", testValueType, testValueNum);
+ switch (testValueType) {
+ case "boolean":
+ message.setBoolean(name, Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.setByte(name, Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.setBytes(name, testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X"
+ message.setChar(name, testValue.charAt(0));
+ } else if (testValue.length() == 6) { // Char format: "\xNNNN"
+ message.setChar(name, (char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createMapMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.setDouble(name, Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.setFloat(name, Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.setInt(name, Integer.decode(testValue));
+ break;
+ case "long":
+ message.setLong(name, Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.setObject(name, obj);
+ break;
+ case "short":
+ message.setShort(name, Short.decode(testValue));
+ break;
+ case "string":
+ message.setString(name, testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static ObjectMessage createObjectMessage(Session session, String className, String testValue) throws Exception, JMSException {
+ Serializable obj = createJavaObject(className, testValue);
+ if (obj == null) {
+ // TODO: Handle error here
+ System.out.println("createObjectMessage: obj == null");
+ return null;
+ }
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(obj);
+ return message;
+ }
+
+ protected static StreamMessage createStreamMessage(Session session, String testValueType, String testValue) throws Exception, JMSException {
+ StreamMessage message = session.createStreamMessage();
+ switch (testValueType) {
+ case "boolean":
+ message.writeBoolean(Boolean.parseBoolean(testValue));
+ break;
+ case "byte":
+ message.writeByte(Byte.decode(testValue));
+ break;
+ case "bytes":
+ message.writeBytes(testValue.getBytes());
+ break;
+ case "char":
+ if (testValue.length() == 1) { // Char format: "X"
+ message.writeChar(testValue.charAt(0));
+ } else if (testValue.length() == 6) { // Char format: "\xNNNN"
+ message.writeChar((char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ break;
+ case "double":
+ Long l1 = Long.parseLong(testValue.substring(2, 3), 16) << 60;
+ Long l2 = Long.parseLong(testValue.substring(3), 16);
+ message.writeDouble(Double.longBitsToDouble(l1 | l2));
+ break;
+ case "float":
+ Long i = Long.parseLong(testValue.substring(2), 16);
+ message.writeFloat(Float.intBitsToFloat(i.intValue()));
+ break;
+ case "int":
+ message.writeInt(Integer.decode(testValue));
+ break;
+ case "long":
+ message.writeLong(Long.decode(testValue));
+ break;
+ case "object":
+ Object obj = (Object)createObject(testValue);
+ message.writeObject(obj);
+ break;
+ case "short":
+ message.writeShort(Short.decode(testValue));
+ break;
+ case "string":
+ message.writeString(testValue);
+ break;
+ default:
+ throw new Exception("Internal exception: Unexpected JMS message sub-type \"" + testValueType + "\"");
+ }
+ return message;
+ }
+
+ protected static Serializable createJavaObject(String className, String testValue) throws Exception {
+ Serializable obj = null;
+ try {
+ Class<?> c = Class.forName(className);
+ if (className.compareTo("java.lang.Character") == 0) {
+ Constructor ctor = c.getConstructor(char.class);
+ if (testValue.length() == 1) {
+ // Use first character of string
+ obj = (Serializable)ctor.newInstance(testValue.charAt(0));
+ } else if (testValue.length() == 4 || testValue.length() == 6) {
+ // Format '\xNN' or '\xNNNN'
+ obj = (Serializable)ctor.newInstance((char)Integer.parseInt(testValue.substring(2), 16));
+ } else {
+ throw new Exception("JmsSenderShim.createStreamMessage() Malformed char string: \"" + testValue + "\"");
+ }
+ } else {
+ // Use string constructor
+ Constructor ctor = c.getConstructor(String.class);
+ obj = (Serializable)ctor.newInstance(testValue);
+ }
+ }
+ catch (ClassNotFoundException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (NoSuchMethodException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InstantiationException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (IllegalAccessException e) {
+ e.printStackTrace(System.out);
+ }
+ catch (InvocationTargetException e) {
+ e.printStackTrace(System.out);
+ }
+ return obj;
+ }
+
+ // value has format "classname:ctorstrvalue"
+ protected static Serializable createObject(String value) throws Exception {
+ Serializable obj = null;
+ int colonIndex = value.indexOf(":");
+ if (colonIndex >= 0) {
+ String className = value.substring(0, colonIndex);
+ String testValue = value.substring(colonIndex+1);
+ obj = createJavaObject(className, testValue);
+ } else {
+ throw new Exception("createObject(): Malformed value string");
+ }
+ return obj;
+ }
+
+ protected static TextMessage createTextMessage(Session session, String valueStr) throws JMSException {
+ return session.createTextMessage(valueStr);
+ }
+
+ protected static boolean isSupportedJmsMessageType(String jmsMessageType) {
+ for (String supportedJmsMessageType: SUPPORTED_JMS_MESSAGE_TYPES) {
+ if (jmsMessageType.equals(supportedJmsMessageType))
+ return true;
+ }
+ return false;
+ }
+
+ private static class MyExceptionListener implements ExceptionListener {
+ @Override
+ public void onException(JMSException exception) {
+ System.out.println("Connection ExceptionListener fired, exiting.");
+ exception.printStackTrace(System.out);
+ System.exit(1);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
deleted file mode 100644
index 4710025..0000000
--- a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsReceiver.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.interop_test.shim;
-
-import java.math.BigDecimal;
-import java.util.UUID;
-import java.util.Vector;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.qpid.jms.JmsConnectionFactory;
-
-public class ProtonJmsReceiver {
- private static final String USER = "guest";
- private static final String PASSWORD = "guest";
- private static final int TIMEOUT = 1000;
- private static final String[] SUPPORTED_AMQP_TYPES = {"null",
- "boolean",
- "ubyte",
- "ushort",
- "uint",
- "ulong",
- "byte",
- "short",
- "int",
- "long",
- "float",
- "double",
- "decimal32",
- "decimal64",
- "decimal128",
- "char",
- "timestamp",
- "uuid",
- "binary",
- "string",
- "symbol",
- "list",
- "map",
- "array"};
-
- public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.out.println("ProtonJmsReceiver: Insufficient number of arguments");
- System.out.println("ProtonJmsReceiver: Expected arguments: broker_address, queue_name, amqp_type, num_test_values");
- System.exit(1);
- }
- String brokerAddress = "amqp://" + args[0];
- String queueName = args[1];
- String amqpType = args[2];
- int numTestValues = Integer.parseInt(args[3]);
- Connection connection = null;
-
- try {
- ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
-
- connection = factory.createConnection(USER, PASSWORD);
- connection.setExceptionListener(new MyExceptionListener());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = session.createQueue(queueName);
-
- MessageConsumer messageConsumer = session.createConsumer(queue);
-
- Vector<String> outList = new Vector<String>();
- outList.add(amqpType);
- if (isSupportedAmqpType(amqpType)) {
- int actualCount = 0;
- Message message = null;
- for (int i = 1; i <= numTestValues; i++, actualCount++) {
- message = messageConsumer.receive(TIMEOUT);
- if (message == null)
- break;
- switch (amqpType) {
- case "null":
- long bodyLength = ((BytesMessage)message).getBodyLength();
- if (bodyLength == 0L) {
- outList.add("None");
- } else {
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Expected 0 bytes, read " + bodyLength);
- }
- break;
- case "boolean":
- String bs = String.valueOf(((BytesMessage)message).readBoolean());
- outList.add(Character.toUpperCase(bs.charAt(0)) + bs.substring(1));
- break;
- case "ubyte":
- byte byteValue = ((BytesMessage)message).readByte();
- short ubyteValue = (short)(byteValue & 0xff);
- outList.add(String.valueOf(ubyteValue));
- break;
- case "ushort":
- {
- byte[] byteArray = new byte[2];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 2) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 2 bytes, read " + numBytes);
- }
- int ushortValue = 0;
- for (int j=0; j<byteArray.length; j++) {
- ushortValue = (ushortValue << 8) + (byteArray[j] & 0xff);
- }
- outList.add(String.valueOf(ushortValue));
- break;
- }
- case "uint":
- {
- byte[] byteArray = new byte[4];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 4) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 4 bytes, read " + numBytes);
- }
- long uintValue = 0;
- for (int j=0; j<byteArray.length; j++) {
- uintValue = (uintValue << 8) + (byteArray[j] & 0xff);
- }
- outList.add(String.valueOf(uintValue));
- break;
- }
- case "ulong":
- case "timestamp":
- {
- // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
- byte[] byteArray = new byte[8];
- int numBytes = ((BytesMessage)message).readBytes(byteArray);
- if (numBytes != 8) {
- // TODO: numBytes == -1 means no more bytes in stream - add error message for this case?
- throw new Exception("ProtonJmsReceiver: JMS BytesMessage size error: Exptected 8 bytes, read " + numBytes);
- }
- // TODO: shortcut in use here - this byte array should go through a Java type that can represent this as a number - such as BigInteger.
- outList.add(String.format("0x%02x%02x%02x%02x%02x%02x%02x%02x", byteArray[0], byteArray[1],
- byteArray[2], byteArray[3], byteArray[4], byteArray[5], byteArray[6], byteArray[7]));
- break;
- }
- case "byte":
- outList.add(String.valueOf(((BytesMessage)message).readByte()));
- break;
- case "short":
- outList.add(String.valueOf(((BytesMessage)message).readShort()));
- break;
- case "int":
- outList.add(String.valueOf(((BytesMessage)message).readInt()));
- break;
- case "long":
- outList.add(String.valueOf(((BytesMessage)message).readLong()));
- break;
- case "float":
- float f = ((BytesMessage)message).readFloat();
- int i0 = Float.floatToRawIntBits(f);
- outList.add(String.format("0x%8s", Integer.toHexString(i0)).replace(' ', '0'));
- break;
- case "double":
- double d = ((BytesMessage)message).readDouble();
- long l = Double.doubleToRawLongBits(d);
- outList.add(String.format("0x%16s", Long.toHexString(l)).replace(' ', '0'));
- break;
- case "decimal32":
- BigDecimal bd32 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd32.toString());
- break;
- case "decimal64":
- BigDecimal bd64 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd64.toString());
- break;
- case "decimal128":
- BigDecimal bd128 = (BigDecimal)((ObjectMessage)message).getObject();
- outList.add(bd128.toString());
- break;
- case "char":
- outList.add(String.format("%c", ((BytesMessage)message).readChar()));
- break;
- case "uuid":
- UUID uuid = (UUID)((ObjectMessage)message).getObject();
- outList.add(uuid.toString());
- break;
- case "binary":
- BytesMessage bm = (BytesMessage)message;
- int msgLen = (int)bm.getBodyLength();
- byte[] ba = new byte[msgLen];
- if (bm.readBytes(ba) == msgLen) {
- outList.add(new String(ba));
- } else {
- // TODO: Raise exception or error here: size mismatch
- }
- break;
- case "string":
- outList.add(((TextMessage)message).getText());
- break;
- case "symbol":
- outList.add(((BytesMessage)message).readUTF());
- break;
- case "list":
- break;
- case "map":
- break;
- case "array":
- break;
- default:
- // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
- connection.close();
- throw new Exception("ProtonJmsReceiver: Internal error: unsupported AMQP type \"" + amqpType + "\"");
- }
- }
- } else {
- System.out.println("ERROR: ProtonJmsReceiver: AMQP type \"" + amqpType + "\" is not supported");
- connection.close();
- System.exit(1);
- }
-
- connection.close();
-
- // No exception, print results
- for (int i=0; i<outList.size(); i++) {
- System.out.println(outList.get(i));
- }
- } catch (Exception exp) {
- if (connection != null)
- connection.close();
- System.out.println("Caught exception, exiting.");
- exp.printStackTrace(System.out);
- System.exit(1);
- }
- }
-
- protected static boolean isSupportedAmqpType(String amqpType) {
- for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
- if (amqpType.equals(supportedAmqpType))
- return true;
- }
- return false;
- }
-
- private static class MyExceptionListener implements ExceptionListener {
- @Override
- public void onException(JMSException exception) {
- System.out.println("Connection ExceptionListener fired, exiting.");
- exception.printStackTrace(System.out);
- System.exit(1);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/ebdacb0d/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
----------------------------------------------------------------------
diff --git a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java b/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
deleted file mode 100644
index 3507fbd..0000000
--- a/shims/qpid-jms/src/main/java/org/apache/qpid/interop_test/shim/ProtonJmsSender.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.interop_test.shim;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.math.MathContext;
-import java.util.Arrays;
-import java.util.UUID;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import org.apache.qpid.jms.JmsConnectionFactory;
-
-public class ProtonJmsSender {
- private static final String USER = "guest";
- private static final String PASSWORD = "guest";
- private static final String[] SUPPORTED_AMQP_TYPES = {"null",
- "boolean",
- "ubyte",
- "ushort",
- "uint",
- "ulong",
- "byte",
- "short",
- "int",
- "long",
- "float",
- "double",
- "decimal32",
- "decimal64",
- "decimal128",
- "char",
- "timestamp",
- "uuid",
- "binary",
- "string",
- "symbol",
- "list",
- "map",
- "array"};
-
- public static void main(String[] args) throws Exception {
- if (args.length < 4) {
- System.out.println("ProtonJmsSender: Insufficient number of arguments");
- System.out.println("ProtonJmsSender: Expected arguments: broker_address, queue_name, amqp_type, test_val, test_val, ...");
- System.exit(1);
- }
- String brokerAddress = "amqp://" + args[0];
- String queueName = args[1];
- String amqpType = args[2];
- String[] testValueList = Arrays.copyOfRange(args, 3, args.length); // Use remaining args as test values
-
- try {
- ConnectionFactory factory = (ConnectionFactory)new JmsConnectionFactory(brokerAddress);
-
- Connection connection = factory.createConnection();
- connection.setExceptionListener(new MyExceptionListener());
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Queue queue = session.createQueue(queueName);
-
- MessageProducer messageProducer = session.createProducer(queue);
-
- if (isSupportedAmqpType(amqpType)) {
- Message message = null;
- for (String testValueStr : testValueList) {
- switch (amqpType) {
- case "null":
- message = session.createBytesMessage();
- break;
- case "boolean":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBoolean(Boolean.parseBoolean(testValueStr));
- break;
- case "ubyte":
- {
- byte testValue = (byte)Short.parseShort(testValueStr);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeByte(testValue);
- break;
- }
- case "ushort":
- {
- int testValue = Integer.parseInt(testValueStr);
- byte[] byteArray = new byte[2];
- byteArray[0] = (byte)(testValue >> 8);
- byteArray[1] = (byte)(testValue);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "uint":
- {
- long testValue = Long.parseLong(testValueStr);
- byte[] byteArray = new byte[4];
- byteArray[0] = (byte)(testValue >> 24);
- byteArray[1] = (byte)(testValue >> 16);
- byteArray[2] = (byte)(testValue >> 8);
- byteArray[3] = (byte)(testValue);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "ulong":
- {
- // TODO: Tidy this ugliness up - perhaps use of vector<byte>?
- BigInteger testValue = new BigInteger(testValueStr);
- byte[] bigIntArray = testValue.toByteArray(); // may be 1 to 9 bytes depending on number
- byte[] byteArray = {0, 0, 0, 0, 0, 0, 0, 0};
- int effectiveBigIntArrayLen = bigIntArray.length > 8 ? 8 : bigIntArray.length; // Cap length at 8
- int bigIntArrayOffs = bigIntArray.length > 8 ? bigIntArray.length - 8 : 0; // Offset when length > 8
- for (int i=0; i<bigIntArray.length && i < 8; i++)
- byteArray[8 - effectiveBigIntArrayLen + i] = bigIntArray[bigIntArrayOffs + i];
- message = session.createBytesMessage();
- ((BytesMessage)message).writeBytes(byteArray);
- break;
- }
- case "byte":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeByte(Byte.parseByte(testValueStr));
- break;
- case "short":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeShort(Short.parseShort(testValueStr));
- break;
- case "int":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeInt(Integer.parseInt(testValueStr));
- break;
- case "long":
- case "timestamp":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeLong(Long.parseLong(testValueStr));
- break;
- case "float":
- Long i = Long.parseLong(testValueStr.substring(2), 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeFloat(Float.intBitsToFloat(i.intValue()));
- break;
- case "double":
- Long l1 = Long.parseLong(testValueStr.substring(2, 3), 16) << 60;
- Long l2 = Long.parseLong(testValueStr.substring(3), 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeDouble(Double.longBitsToDouble(l1 | l2));
- break;
- case "decimal32":
- BigDecimal bd32 = new BigDecimal(testValueStr, MathContext.DECIMAL32);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd32);
- break;
- case "decimal64":
- BigDecimal bd64 = new BigDecimal(testValueStr, MathContext.DECIMAL64);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd64);
- break;
- case "decimal128":
- BigDecimal bd128 = new BigDecimal(testValueStr, MathContext.DECIMAL128);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(bd128);
- break;
- case "char":
- char c = 0;
- if (testValueStr.length() == 1) // Single char
- c = testValueStr.charAt(0);
- else if (testValueStr.length() == 6) // unicode format
- c = (char)Integer.parseInt(testValueStr, 16);
- message = session.createBytesMessage();
- ((BytesMessage)message).writeChar(c);
- break;
- case "uuid":
- UUID uuid = UUID.fromString(testValueStr);
- message = session.createObjectMessage();
- ((ObjectMessage)message).setObject(uuid);
- break;
- case "binary":
- message = session.createBytesMessage();
- byte[] byteArray = testValueStr.getBytes();
- ((BytesMessage)message).writeBytes(byteArray, 0, byteArray.length);
- break;
- case "string":
- message = session.createTextMessage(testValueStr);
- break;
- case "symbol":
- message = session.createBytesMessage();
- ((BytesMessage)message).writeUTF(testValueStr);
- break;
- case "list":
- break;
- case "map":
- break;
- case "array":
- break;
- default:
- // Internal error, should never happen if SUPPORTED_AMQP_TYPES matches this case stmt
- connection.close();
- throw new Exception("ProtonJmsSender: Internal error: unsupported AMQP type \"" + amqpType + "\"");
- }
- messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
- }
- } else {
- System.out.println("ERROR: ProtonJmsSender: AMQP type \"" + amqpType + "\" is not supported");
- connection.close();
- System.exit(1);
- }
-
- connection.close();
- } catch (Exception exp) {
- System.out.println("Caught exception, exiting.");
- exp.printStackTrace(System.out);
- System.exit(1);
- }
- }
-
- protected static boolean isSupportedAmqpType(String amqpType) {
- for (String supportedAmqpType: SUPPORTED_AMQP_TYPES) {
- if (amqpType.equals(supportedAmqpType))
- return true;
- }
- return false;
- }
-
- private static class MyExceptionListener implements ExceptionListener {
- @Override
- public void onException(JMSException exception) {
- System.out.println("Connection ExceptionListener fired, exiting.");
- exception.printStackTrace(System.out);
- System.exit(1);
- }
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org