You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2016/10/04 19:17:29 UTC
[3/8] qpid-interop-test git commit: QPIDIT-41: Re-organization of
project directory structure
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/JmsSenderShim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/JmsSenderShim.py b/shims/qpid-proton-python/src/JmsSenderShim.py
deleted file mode 100755
index 5a1108a..0000000
--- a/shims/qpid-proton-python/src/JmsSenderShim.py
+++ /dev/null
@@ -1,390 +0,0 @@
-#!/usr/bin/env python
-
-"""
-JMS sender shim for qpid-interop-test
-"""
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import sys
-from json import loads
-from proton import byte, char, float32, int32, Message, short, symbol
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from interop_test_errors import InteropTestError
-from subprocess import check_output
-from struct import pack, unpack
-from traceback import format_exc
-
-# These values must tie in with the Qpid-JMS client values found in
-# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport
-QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type')
-QPID_JMS_TYPE_ANNOTATIONS = {
- 'JMS_MESSAGE_TYPE': byte(0),
- 'JMS_BYTESMESSAGE_TYPE': byte(3),
- 'JMS_MAPMESSAGE_TYPE': byte(2),
- 'JMS_OBJECTMESSAGE_TYPE': byte(1),
- 'JMS_STREAMMESSAGE_TYPE': byte(4),
- 'JMS_TEXTMESSAGE_TYPE': byte(5)
- }
-def create_annotation(jms_msg_type):
- """Function which creates a message annotation for JMS message type as used by the Qpid JMS client"""
- return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]}
-
-class JmsSenderShim(MessagingHandler):
- """
- This shim sends JMS messages of a particular JMS message type according to the test parameters list. This list
- contains three maps:
- 0: The test value map, which contains test value types as keys, and lists of values of that type;
- 1. The test headers map, which contains the JMS headers as keys and a submap conatining types and values;
- 2. The test proprties map, which contains the name of the properties as keys, and a submap containing types
- and values
- This shim takes the combinations of the above map and creates test cases, each of which sends a single message
- with (or without) JMS headers and properties.
- """
- def __init__(self, broker_ip_addr, queue_name, jms_msg_type, test_parameters_list):
- super(JmsSenderShim, self).__init__()
- self.broker_ip_addr = broker_ip_addr
- self.queue_name = queue_name
- self.jms_msg_type = jms_msg_type
- self.test_value_map = test_parameters_list[0]
- self.test_headers_map = test_parameters_list[1]
- self.test_properties_map = test_parameters_list[2]
- self.sent = 0
- self.confirmed = 0
- self.total = self._get_total_num_msgs()
-
- def on_start(self, event):
- """Event callback for when the client starts"""
- event.container.create_sender('%s/%s' % (self.broker_ip_addr, self.queue_name))
-
- def on_sendable(self, event):
- """Event callback for when send credit is received, allowing the sending of messages"""
- if self.sent == 0:
- # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}'
- for sub_type in sorted(self.test_value_map.keys()):
- if self._send_test_values(event, sub_type, self.test_value_map[sub_type]):
- return
-
- def on_connection_error(self, event):
- print 'JmsSenderShim.on_connection_error'
-
- def on_session_error(self, event):
- print 'JmsSenderShim.on_session_error'
-
- def on_link_error(self, event):
- print 'JmsSenderShim.on_link_error'
-
- def on_accepted(self, event):
- """Event callback for when a sent message is accepted by the broker"""
- self.confirmed += 1
- if self.confirmed == self.total:
- event.connection.close()
-
- def on_disconnected(self, event):
- """Event callback for when the broker disconnects with the client"""
- self.sent = self.confirmed
-
- def _get_total_num_msgs(self):
- """
- Calculates the total number of messages to be sent based on the message parameters received on the command-line
- """
- total = 0
- for key in self.test_value_map.keys():
- total += len(self.test_value_map[key])
- return total
-
- def _send_test_values(self, event, test_value_type, test_values):
- """Method which loops through recieved parameters and sends the corresponding messages"""
- value_num = 0
- for test_value in test_values:
- if event.sender.credit:
- message = self._create_message(test_value_type, test_value, value_num)
- # TODO: set message to address
- if message is not None:
- self._add_jms_message_headers(message)
- self._add_jms_message_properties(message)
- event.sender.send(message)
- self.sent += 1
- value_num += 1
- else:
- event.connection.close()
- return True
- return False
-
- # TODO: Change this to return a list of messages. That way each test can return more than one message
- def _create_message(self, test_value_type, test_value, value_num):
- """Create a single message of the appropriate JMS message type"""
- if self.jms_msg_type == 'JMS_MESSAGE_TYPE':
- return self._create_jms_message(test_value_type, test_value)
- elif self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
- return self._create_jms_bytesmessage(test_value_type, test_value)
- elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
- return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num))
- elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
- return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value))
- elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
- return self._create_jms_streammessage(test_value_type, test_value)
- elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
- return self._create_jms_textmessage(test_value)
- else:
- print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type
- return None
-
- def _create_jms_message(self, test_value_type, test_value):
- """Create a JMS message type (without message body)"""
- if test_value_type != 'none':
- raise InteropTestError('JmsSenderShim._create_jms_message: Unknown or unsupported subtype "%s"' %
- test_value_type)
- if test_value is not None:
- raise InteropTestError('JmsSenderShim._create_jms_message: Invalid value "%s" for subtype "%s"' %
- (test_value, test_value_type))
- return Message(id=(self.sent+1),
- content_type='application/octet-stream',
- annotations=create_annotation('JMS_MESSAGE_TYPE'))
-
- def _create_jms_bytesmessage(self, test_value_type, test_value):
- """Create a JMS bytes message"""
- # NOTE: test_value contains all unicode strings u'...' as returned by json
- body_bytes = None
- if test_value_type == 'boolean':
- body_bytes = b'\x01' if test_value == 'True' else b'\x00'
- elif test_value_type == 'byte':
- body_bytes = pack('b', int(test_value, 16))
- elif test_value_type == 'bytes':
- body_bytes = str(test_value) # remove unicode
- elif test_value_type == 'char':
- # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00'
- body_bytes = '\x00' + str(test_value) # remove unicode
- elif test_value_type == 'double' or test_value_type == 'float':
- body_bytes = test_value[2:].decode('hex')
- elif test_value_type == 'int':
- body_bytes = pack('!i', int(test_value, 16))
- elif test_value_type == 'long':
- body_bytes = pack('!q', long(test_value, 16))
- elif test_value_type == 'short':
- body_bytes = pack('!h', short(test_value, 16))
- elif test_value_type == 'string':
- # NOTE: First two bytes must be string length
- test_value_str = str(test_value) # remove unicode
- body_bytes = pack('!H', len(test_value_str)) + test_value_str
- else:
- raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' %
- test_value_type)
- return Message(id=(self.sent+1),
- body=body_bytes,
- inferred=True,
- content_type='application/octet-stream',
- annotations=create_annotation('JMS_BYTESMESSAGE_TYPE'))
-
- def _create_jms_mapmessage(self, test_value_type, test_value, name):
- """Create a JMS map message"""
- if test_value_type == 'boolean':
- value = test_value == 'True'
- elif test_value_type == 'byte':
- value = byte(int(test_value, 16))
- elif test_value_type == 'bytes':
- value = str(test_value) # remove unicode
- elif test_value_type == 'char':
- value = char(test_value)
- elif test_value_type == 'double':
- value = unpack('!d', test_value[2:].decode('hex'))[0]
- elif test_value_type == 'float':
- value = float32(unpack('!f', test_value[2:].decode('hex'))[0])
- elif test_value_type == 'int':
- value = int32(int(test_value, 16))
- elif test_value_type == 'long':
- value = long(test_value, 16)
- elif test_value_type == 'short':
- value = short(int(test_value, 16))
- elif test_value_type == 'string':
- value = test_value
- else:
- raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' %
- test_value_type)
- return Message(id=(self.sent+1),
- body={name: value},
- inferred=False,
- annotations=create_annotation('JMS_MAPMESSAGE_TYPE'))
-
- def _create_jms_objectmessage(self, test_value):
- """Create a JMS object message"""
- java_binary = self._s_get_java_obj_binary(test_value)
- return Message(id=(self.sent+1),
- body=java_binary,
- inferred=True,
- content_type='application/x-java-serialized-object',
- annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE'))
-
- @staticmethod
- def _s_get_java_obj_binary(java_class_str):
- """Call external utility to create Java object and stringify it, returning the string representation"""
- out_str = check_output(['java',
- '-cp',
- 'target/JavaObjUtils.jar',
- 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes',
- java_class_str])
- out_str_list = out_str.split('\n')[:-1] # remove trailing \n
- if out_str_list[0] != java_class_str:
- raise InteropTestError('JmsSenderShim._s_get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' %
- out_str)
- return out_str_list[1].decode('hex')
-
- def _create_jms_streammessage(self, test_value_type, test_value):
- """Create a JMS stream message"""
- if test_value_type == 'boolean':
- body_list = [test_value == 'True']
- elif test_value_type == 'byte':
- body_list = [byte(int(test_value, 16))]
- elif test_value_type == 'bytes':
- body_list = [str(test_value)]
- elif test_value_type == 'char':
- body_list = [char(test_value)]
- elif test_value_type == 'double':
- body_list = [unpack('!d', test_value[2:].decode('hex'))[0]]
- elif test_value_type == 'float':
- body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])]
- elif test_value_type == 'int':
- body_list = [int32(int(test_value, 16))]
- elif test_value_type == 'long':
- body_list = [long(test_value, 16)]
- elif test_value_type == 'short':
- body_list = [short(int(test_value, 16))]
- elif test_value_type == 'string':
- body_list = [test_value]
- else:
- raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' %
- test_value_type)
- return Message(id=(self.sent+1),
- body=body_list,
- inferred=True,
- annotations=create_annotation('JMS_STREAMMESSAGE_TYPE'))
-
- def _create_jms_textmessage(self, test_value_text):
- """Create a JMS text message"""
- return Message(id=(self.sent+1),
- body=unicode(test_value_text),
- annotations=create_annotation('JMS_TEXTMESSAGE_TYPE'))
-
- def _add_jms_message_headers(self, message):
- """Add JMS headers to the supplied message from self.test_headers_map"""
- for jms_header in self.test_headers_map.iterkeys():
- value_map = self.test_headers_map[jms_header]
- value_type = value_map.keys()[0] # There is only ever one value in map
- value = value_map[value_type]
- if jms_header == 'JMS_TYPE_HEADER':
- if value_type == 'string':
- self._s_set_jms_type_header(message, value)
- else:
- raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
- 'JMS_TYPE_HEADER requires value type "string", type "%s" found' %
- value_type)
- elif jms_header == 'JMS_CORRELATIONID_HEADER':
- if value_type == 'string':
- self._s_set_jms_correlation_id(message, value)
- elif value_type == 'bytes':
- self._s_set_jms_correlation_id(message, str(value))
- else:
- raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
- 'JMS_CORRELATIONID_HEADER requires value type "string" or "bytes", ' +
- 'type "%s" found' % value_type)
- elif jms_header == 'JMS_REPLYTO_HEADER':
- if value_type == 'queue' or value_type == 'topic':
- self._s_set_jms_reply_to(message, value_type, value)
- elif value_type == 'temp_queue' or value_type == 'temp_topic':
- raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
- 'JMS_REPLYTO_HEADER type "temp_queue" or "temp_topic" not handled')
- else:
- raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
- 'JMS_REPLYTO_HEADER requires value type "queue" or "topic", ' +
- 'type "%s" found' % value_type)
- else:
- raise InteropTestError('JmsSenderShim._add_jms_message_headers(): Invalid JMS message header "%s"' %
- jms_header)
-
-
- @staticmethod
- def _s_set_jms_type_header(message, message_type):
- """Adds a JMS message type header"""
- message._set_subject(message_type)
-
- @staticmethod
- def _s_set_jms_correlation_id(message, correlation_id):
- """Adds a JMS correlation id header"""
- message._set_correlation_id(correlation_id)
- message.annotations[symbol(u'x-opt-app-correlation-id')] = True
-
- @staticmethod
- def _s_set_jms_reply_to(message, jms_destination_type_str, destination):
- """Adds a JMS reply-to header"""
- if jms_destination_type_str == 'queue':
- message._set_reply_to(destination)
- message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(0)
- elif jms_destination_type_str == 'topic':
- message._set_reply_to(destination)
- message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(1)
- else:
- raise InteropTestError('JmsSenderShim._s_set_jms_reply_to(): ' +
- 'Invalid value for jms_destination_type_str "%s"' % jms_destination_type_str)
-
- def _add_jms_message_properties(self, message):
- """Adds message properties to the supplied message from self.test_properties_map"""
- for property_name in self.test_properties_map.iterkeys():
- value_map = self.test_properties_map[property_name]
- value_type = value_map.keys()[0] # There is only ever one value in map
- value = value_map[value_type]
- if message.properties is None:
- message.properties = {}
- if value_type == 'boolean':
- message.properties[property_name] = value == 'True'
- elif value_type == 'byte':
- message.properties[property_name] = byte(int(value, 16))
- elif value_type == 'double':
- message.properties[property_name] = unpack('!d', value[2:].decode('hex'))[0]
- elif value_type == 'float':
- message.properties[property_name] = float32(unpack('!f', value[2:].decode('hex'))[0])
- elif value_type == 'int':
- message.properties[property_name] = int(value, 16)
- elif value_type == 'long':
- message.properties[property_name] = long(value, 16)
- elif value_type == 'short':
- message.properties[property_name] = short(int(value, 16))
- elif value_type == 'string':
- message.properties[property_name] = value
- else:
- raise InteropTestError('JmsSenderShim._add_jms_message_properties: ' +
- 'Unknown or unhandled message property type ?%s"' % value_type)
-
-
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: JMS message type
-# 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap]
-#print '#### sys.argv=%s' % sys.argv
-#print '>>> test_values=%s' % loads(sys.argv[4])
-try:
- Container(JmsSenderShim(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4]))).run()
-except KeyboardInterrupt:
- pass
-except Exception as exc:
- print 'jms-sender-shim EXCEPTION:', exc
- print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/TypesReceiverShim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/TypesReceiverShim.py b/shims/qpid-proton-python/src/TypesReceiverShim.py
deleted file mode 100755
index 2876f51..0000000
--- a/shims/qpid-proton-python/src/TypesReceiverShim.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#!/usr/bin/env python
-
-"""
-AMQP type test receiver shim for qpid-interop-test
-"""
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Issues:
-# * Capturing errors from client or broker
-
-import sys
-from json import dumps
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from traceback import format_exc
-from string import digits, letters, punctuation
-from struct import pack, unpack
-
-class AmqpTypesReceiverShim(MessagingHandler):
- """
- Reciver shim for AMQP types test
- This shim receives the number of messages supplied on the command-line and checks that they contain message
- bodies of the exptected AMQP type. The values are then aggregated and returned.
- """
- def __init__(self, url, amqp_type, num_expected_messages_str):
- super(AmqpTypesReceiverShim, self).__init__()
- self.url = url
- self.received_value_list = []
- self.amqp_type = amqp_type
- self.expected = int(num_expected_messages_str)
- self.received = 0
-
- def get_received_value_list(self):
- """Return the received list of AMQP values"""
- return self.received_value_list
-
- def on_start(self, event):
- """Event callback for when the client starts"""
- event.container.create_receiver(self.url)
-
- def on_message(self, event):
- """Event callback when a message is received by the client"""
- if event.message.id and event.message.id < self.received:
- return # ignore duplicate message
- if self.expected == 0 or self.received < self.expected:
- if self.amqp_type == 'null' or \
- self.amqp_type == 'boolean' or \
- self.amqp_type == 'uuid':
- self.received_value_list.append(str(event.message.body))
- elif self.amqp_type == 'ubyte' or \
- self.amqp_type == 'ushort' or \
- self.amqp_type == 'byte' or \
- self.amqp_type == 'short' or \
- self.amqp_type == 'int':
- self.received_value_list.append(hex(event.message.body))
- elif self.amqp_type == 'uint' or \
- self.amqp_type == 'ulong' or \
- self.amqp_type == 'long' or \
- self.amqp_type == 'timestamp':
- hex_str = hex(int(event.message.body))
- if len(hex_str) == 19 and hex_str[-1] == 'L':
- self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs
- else:
- self.received_value_list.append(hex_str)
- elif self.amqp_type == 'float':
- self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0])
- elif self.amqp_type == 'double':
- self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0])
- elif self.amqp_type == 'decimal32':
- self.received_value_list.append('0x%08x' % event.message.body)
- elif self.amqp_type == 'decimal64':
- self.received_value_list.append('0x%016x' % event.message.body)
- elif self.amqp_type == 'decimal128':
- self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip())
- elif self.amqp_type == 'char':
- if ord(event.message.body) < 0x80 and event.message.body in digits + letters + punctuation:
- self.received_value_list.append(event.message.body)
- else:
- self.received_value_list.append(hex(ord(event.message.body)))
- elif self.amqp_type == 'binary' or \
- self.amqp_type == 'string' or \
- self.amqp_type == 'symbol':
- self.received_value_list.append(event.message.body)
- elif self.amqp_type == 'list' or \
- self.amqp_type == 'map':
- self.received_value_list.append(event.message.body)
- else:
- print 'receive: Unsupported AMQP type "%s"' % self.amqp_type
- return
- self.received += 1
- if self.received == self.expected:
- event.receiver.close()
- event.connection.close()
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: AMQP type
-# 4: Expected number of test values to receive
-try:
- RECEIVER = AmqpTypesReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4])
- Container(RECEIVER).run()
- print sys.argv[3]
- print dumps(RECEIVER.get_received_value_list())
-except KeyboardInterrupt:
- pass
-except Exception as exc:
- print 'proton-python-receive EXCEPTION:', exc
- print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/TypesSenderShim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/TypesSenderShim.py b/shims/qpid-proton-python/src/TypesSenderShim.py
deleted file mode 100755
index 19d183f..0000000
--- a/shims/qpid-proton-python/src/TypesSenderShim.py
+++ /dev/null
@@ -1,155 +0,0 @@
-#!/usr/bin/env python
-
-"""
-AMQP type test sender shim for qpid-interop-test
-"""
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Issues:
-# * Capturing errors from client or broker
-
-import sys
-import os.path
-from json import loads
-from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \
- ubyte, uint, ulong, ushort
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from struct import unpack
-from traceback import format_exc
-from uuid import UUID
-
-class AmqpTypesSenderShim(MessagingHandler):
- """
- Sender shim for AMQP types test
- This shim receives the AMQP type and a list of test values. Each value is sent in a message body of the appropriate
- AMQP type. There is no returned value.
- """
- def __init__(self, url, amqp_type, test_value_list):
- super(AmqpTypesSenderShim, self).__init__()
- self.url = url
- self.amqp_type = amqp_type
- self.test_value_list = test_value_list
- self.sent = 0
- self.confirmed = 0
- self.total = len(test_value_list)
-
- def on_start(self, event):
- """Event callback for when the client starts"""
- event.container.create_sender(self.url)
-
- def on_sendable(self, event):
- """Event callback for when send credit is received, allowing the sending of messages"""
- if self.sent == 0:
- for test_value in self.test_value_list:
- if event.sender.credit:
- message = self.create_message(test_value)
- if message is not None:
- event.sender.send(message)
- self.sent += 1
- else:
- event.connection.close()
- return
-
- def create_message(self, test_value):
- """
- Creates a single message with the test value translated from its string representation to the appropriate
- AMQP value (set in self.amqp_type).
- """
- if self.amqp_type == 'null':
- return Message(id=(self.sent+1), body=None)
- elif self.amqp_type == 'boolean':
- return Message(id=(self.sent+1), body=True if test_value == 'True' else False)
- elif self.amqp_type == 'ubyte':
- return Message(id=(self.sent+1), body=ubyte(int(test_value, 16)))
- elif self.amqp_type == 'ushort':
- return Message(id=(self.sent+1), body=ushort(int(test_value, 16)))
- elif self.amqp_type == 'uint':
- return Message(id=(self.sent+1), body=uint(int(test_value, 16)))
- elif self.amqp_type == 'ulong':
- return Message(id=(self.sent+1), body=ulong(int(test_value, 16)))
- elif self.amqp_type == 'byte':
- return Message(id=(self.sent+1), body=byte(int(test_value, 16)))
- elif self.amqp_type == 'short':
- return Message(id=(self.sent+1), body=short(int(test_value, 16)))
- elif self.amqp_type == 'int':
- return Message(id=(self.sent+1), body=int32(int(test_value, 16)))
- elif self.amqp_type == 'long':
- return Message(id=(self.sent+1), body=long(int(test_value, 16)))
- elif self.amqp_type == 'float':
- return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0]))
- elif self.amqp_type == 'double':
- return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0])
- elif self.amqp_type == 'decimal32':
- return Message(id=(self.sent+1), body=decimal32(int(test_value[2:], 16)))
- elif self.amqp_type == 'decimal64':
- l64 = long(test_value[2:], 16)
- return Message(id=(self.sent+1), body=decimal64(l64))
- elif self.amqp_type == 'decimal128':
- return Message(id=(self.sent+1), body=decimal128(test_value[2:].decode('hex')))
- elif self.amqp_type == 'char':
- if len(test_value) == 1: # Format 'a'
- return Message(id=(self.sent+1), body=char(test_value))
- else:
- val = int(test_value, 16)
- return Message(id=(self.sent+1), body=char(unichr(val)))
- elif self.amqp_type == 'timestamp':
- return Message(id=(self.sent+1), body=timestamp(int(test_value, 16)))
- elif self.amqp_type == 'uuid':
- return Message(id=(self.sent+1), body=UUID(test_value))
- elif self.amqp_type == 'binary':
- return Message(id=(self.sent+1), body=bytes(test_value))
- elif self.amqp_type == 'string':
- return Message(id=(self.sent+1), body=unicode(test_value))
- elif self.amqp_type == 'symbol':
- return Message(id=(self.sent+1), body=symbol(test_value))
- elif self.amqp_type == 'list':
- return Message(id=(self.sent+1), body=test_value)
- elif self.amqp_type == 'map':
- return Message(id=(self.sent+1), body=test_value)
- else:
- print 'send: Unsupported AMQP type "%s"' % self.amqp_type
- return None
-
- def on_accepted(self, event):
- """Event callback for when a sent message is accepted by the broker"""
- self.confirmed += 1
- if self.confirmed == self.total:
- event.connection.close()
-
- def on_disconnected(self, event):
- """Event callback for when the broker disconnects with the client"""
- self.sent = self.confirmed
-
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: AMQP type
-# 4...n: Test value(s) as strings
-try:
- Container(AmqpTypesSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run()
-except KeyboardInterrupt:
- pass
-except Exception as exc:
- print os.path.basename(sys.argv[0]), 'EXCEPTION:', exc
- print format_exc()
-
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/amqp_types_test/Receiver.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/amqp_types_test/Receiver.py b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py
new file mode 100755
index 0000000..2876f51
--- /dev/null
+++ b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+"""
+AMQP type test receiver shim for qpid-interop-test
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Issues:
+# * Capturing errors from client or broker
+
+import sys
+from json import dumps
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from traceback import format_exc
+from string import digits, letters, punctuation
+from struct import pack, unpack
+
+class AmqpTypesReceiverShim(MessagingHandler):
+ """
+ Reciver shim for AMQP types test
+ This shim receives the number of messages supplied on the command-line and checks that they contain message
+ bodies of the exptected AMQP type. The values are then aggregated and returned.
+ """
+ def __init__(self, url, amqp_type, num_expected_messages_str):
+ super(AmqpTypesReceiverShim, self).__init__()
+ self.url = url
+ self.received_value_list = []
+ self.amqp_type = amqp_type
+ self.expected = int(num_expected_messages_str)
+ self.received = 0
+
+ def get_received_value_list(self):
+ """Return the received list of AMQP values"""
+ return self.received_value_list
+
+ def on_start(self, event):
+ """Event callback for when the client starts"""
+ event.container.create_receiver(self.url)
+
+ def on_message(self, event):
+ """Event callback when a message is received by the client"""
+ if event.message.id and event.message.id < self.received:
+ return # ignore duplicate message
+ if self.expected == 0 or self.received < self.expected:
+ if self.amqp_type == 'null' or \
+ self.amqp_type == 'boolean' or \
+ self.amqp_type == 'uuid':
+ self.received_value_list.append(str(event.message.body))
+ elif self.amqp_type == 'ubyte' or \
+ self.amqp_type == 'ushort' or \
+ self.amqp_type == 'byte' or \
+ self.amqp_type == 'short' or \
+ self.amqp_type == 'int':
+ self.received_value_list.append(hex(event.message.body))
+ elif self.amqp_type == 'uint' or \
+ self.amqp_type == 'ulong' or \
+ self.amqp_type == 'long' or \
+ self.amqp_type == 'timestamp':
+ hex_str = hex(int(event.message.body))
+ if len(hex_str) == 19 and hex_str[-1] == 'L':
+ self.received_value_list.append(hex_str[:-1]) # strip trailing 'L' if present on some ulongs
+ else:
+ self.received_value_list.append(hex_str)
+ elif self.amqp_type == 'float':
+ self.received_value_list.append('0x%08x' % unpack('!L', pack('!f', event.message.body))[0])
+ elif self.amqp_type == 'double':
+ self.received_value_list.append('0x%016x' % unpack('!Q', pack('!d', event.message.body))[0])
+ elif self.amqp_type == 'decimal32':
+ self.received_value_list.append('0x%08x' % event.message.body)
+ elif self.amqp_type == 'decimal64':
+ self.received_value_list.append('0x%016x' % event.message.body)
+ elif self.amqp_type == 'decimal128':
+ self.received_value_list.append('0x' + ''.join(['%02x' % ord(c) for c in event.message.body]).strip())
+ elif self.amqp_type == 'char':
+ if ord(event.message.body) < 0x80 and event.message.body in digits + letters + punctuation:
+ self.received_value_list.append(event.message.body)
+ else:
+ self.received_value_list.append(hex(ord(event.message.body)))
+ elif self.amqp_type == 'binary' or \
+ self.amqp_type == 'string' or \
+ self.amqp_type == 'symbol':
+ self.received_value_list.append(event.message.body)
+ elif self.amqp_type == 'list' or \
+ self.amqp_type == 'map':
+ self.received_value_list.append(event.message.body)
+ else:
+ print 'receive: Unsupported AMQP type "%s"' % self.amqp_type
+ return
+ self.received += 1
+ if self.received == self.expected:
+ event.receiver.close()
+ event.connection.close()
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: AMQP type
+# 4: Expected number of test values to receive
+try:
+ RECEIVER = AmqpTypesReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], sys.argv[4])
+ Container(RECEIVER).run()
+ print sys.argv[3]
+ print dumps(RECEIVER.get_received_value_list())
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print 'proton-python-receive EXCEPTION:', exc
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/amqp_types_test/Sender.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/amqp_types_test/Sender.py b/shims/qpid-proton-python/src/amqp_types_test/Sender.py
new file mode 100755
index 0000000..19d183f
--- /dev/null
+++ b/shims/qpid-proton-python/src/amqp_types_test/Sender.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+
+"""
+AMQP type test sender shim for qpid-interop-test
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Issues:
+# * Capturing errors from client or broker
+
+import sys
+import os.path
+from json import loads
+from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \
+ ubyte, uint, ulong, ushort
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from struct import unpack
+from traceback import format_exc
+from uuid import UUID
+
+class AmqpTypesSenderShim(MessagingHandler):
+ """
+ Sender shim for AMQP types test
+ This shim receives the AMQP type and a list of test values. Each value is sent in a message body of the appropriate
+ AMQP type. There is no returned value.
+ """
+ def __init__(self, url, amqp_type, test_value_list):
+ super(AmqpTypesSenderShim, self).__init__()
+ self.url = url
+ self.amqp_type = amqp_type
+ self.test_value_list = test_value_list
+ self.sent = 0
+ self.confirmed = 0
+ self.total = len(test_value_list)
+
+ def on_start(self, event):
+ """Event callback for when the client starts"""
+ event.container.create_sender(self.url)
+
+ def on_sendable(self, event):
+ """Event callback for when send credit is received, allowing the sending of messages"""
+ if self.sent == 0:
+ for test_value in self.test_value_list:
+ if event.sender.credit:
+ message = self.create_message(test_value)
+ if message is not None:
+ event.sender.send(message)
+ self.sent += 1
+ else:
+ event.connection.close()
+ return
+
+ def create_message(self, test_value):
+ """
+ Creates a single message with the test value translated from its string representation to the appropriate
+ AMQP value (set in self.amqp_type).
+ """
+ if self.amqp_type == 'null':
+ return Message(id=(self.sent+1), body=None)
+ elif self.amqp_type == 'boolean':
+ return Message(id=(self.sent+1), body=True if test_value == 'True' else False)
+ elif self.amqp_type == 'ubyte':
+ return Message(id=(self.sent+1), body=ubyte(int(test_value, 16)))
+ elif self.amqp_type == 'ushort':
+ return Message(id=(self.sent+1), body=ushort(int(test_value, 16)))
+ elif self.amqp_type == 'uint':
+ return Message(id=(self.sent+1), body=uint(int(test_value, 16)))
+ elif self.amqp_type == 'ulong':
+ return Message(id=(self.sent+1), body=ulong(int(test_value, 16)))
+ elif self.amqp_type == 'byte':
+ return Message(id=(self.sent+1), body=byte(int(test_value, 16)))
+ elif self.amqp_type == 'short':
+ return Message(id=(self.sent+1), body=short(int(test_value, 16)))
+ elif self.amqp_type == 'int':
+ return Message(id=(self.sent+1), body=int32(int(test_value, 16)))
+ elif self.amqp_type == 'long':
+ return Message(id=(self.sent+1), body=long(int(test_value, 16)))
+ elif self.amqp_type == 'float':
+ return Message(id=(self.sent+1), body=float32(unpack('!f', test_value[2:].decode('hex'))[0]))
+ elif self.amqp_type == 'double':
+ return Message(id=(self.sent+1), body=unpack('!d', test_value[2:].decode('hex'))[0])
+ elif self.amqp_type == 'decimal32':
+ return Message(id=(self.sent+1), body=decimal32(int(test_value[2:], 16)))
+ elif self.amqp_type == 'decimal64':
+ l64 = long(test_value[2:], 16)
+ return Message(id=(self.sent+1), body=decimal64(l64))
+ elif self.amqp_type == 'decimal128':
+ return Message(id=(self.sent+1), body=decimal128(test_value[2:].decode('hex')))
+ elif self.amqp_type == 'char':
+ if len(test_value) == 1: # Format 'a'
+ return Message(id=(self.sent+1), body=char(test_value))
+ else:
+ val = int(test_value, 16)
+ return Message(id=(self.sent+1), body=char(unichr(val)))
+ elif self.amqp_type == 'timestamp':
+ return Message(id=(self.sent+1), body=timestamp(int(test_value, 16)))
+ elif self.amqp_type == 'uuid':
+ return Message(id=(self.sent+1), body=UUID(test_value))
+ elif self.amqp_type == 'binary':
+ return Message(id=(self.sent+1), body=bytes(test_value))
+ elif self.amqp_type == 'string':
+ return Message(id=(self.sent+1), body=unicode(test_value))
+ elif self.amqp_type == 'symbol':
+ return Message(id=(self.sent+1), body=symbol(test_value))
+ elif self.amqp_type == 'list':
+ return Message(id=(self.sent+1), body=test_value)
+ elif self.amqp_type == 'map':
+ return Message(id=(self.sent+1), body=test_value)
+ else:
+ print 'send: Unsupported AMQP type "%s"' % self.amqp_type
+ return None
+
+ def on_accepted(self, event):
+ """Event callback for when a sent message is accepted by the broker"""
+ self.confirmed += 1
+ if self.confirmed == self.total:
+ event.connection.close()
+
+ def on_disconnected(self, event):
+ """Event callback for when the broker disconnects with the client"""
+ self.sent = self.confirmed
+
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: AMQP type
+# 4...n: Test value(s) as strings
+try:
+ Container(AmqpTypesSenderShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))).run()
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print os.path.basename(sys.argv[0]), 'EXCEPTION:', exc
+ print format_exc()
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/jms_messages_test/Receiver.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/jms_messages_test/Receiver.py b/shims/qpid-proton-python/src/jms_messages_test/Receiver.py
new file mode 100755
index 0000000..9140db1
--- /dev/null
+++ b/shims/qpid-proton-python/src/jms_messages_test/Receiver.py
@@ -0,0 +1,358 @@
+#!/usr/bin/env python
+
+"""
+JMS receiver shim for qpid-interop-test
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from interop_test_errors import InteropTestError
+from json import dumps, loads
+from proton import byte, symbol
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from struct import pack, unpack
+from subprocess import check_output
+from traceback import format_exc
+
+# These values must tie in with the Qpid-JMS client values found in
+# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport
+QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type')
+
+class JmsReceiverShim(MessagingHandler):
+ """
+ Receiver shim: This shim receives JMS messages sent by the Sender shim and prints the contents of the received
+ messages onto the terminal in JSON format for retrieval by the test harness. The JMS messages type and, where
+ applicable, body values, as well as the combinations of JMS headers and properties which may be attached to
+ the message are received on the command-line in JSON format when this program is launched.
+ """
+ def __init__(self, url, jms_msg_type, test_parameters_list):
+ super(JmsReceiverShim, self).__init__()
+ self.url = url
+ self.jms_msg_type = jms_msg_type
+ self.expteced_msg_map = test_parameters_list[0]
+ self.flag_map = test_parameters_list[1]
+ self.subtype_itr = iter(sorted(self.expteced_msg_map.keys()))
+ self.expected = self._get_tot_num_messages()
+ self.received = 0
+ self.received_value_map = {}
+ self.current_subtype = None
+ self.current_subtype_msg_list = None
+ self.jms_header_map = {}
+ self.jms_property_map = {}
+
+ def get_received_value_map(self):
+ """"Return the collected message values received"""
+ return self.received_value_map
+
+ def get_jms_header_map(self):
+ """Return the collected message headers received"""
+ return self.jms_header_map
+
+ def get_jms_property_map(self):
+ """Return the collected message properties received"""
+ return self.jms_property_map
+
+ def on_start(self, event):
+ """Event callback for when the client starts"""
+ event.container.create_receiver(self.url)
+
+ def on_message(self, event):
+ """Event callback when a message is received by the client"""
+ if event.message.id and event.message.id < self.received:
+ return # ignore duplicate message
+ if self.expected == 0 or self.received < self.expected:
+ if self.current_subtype is None:
+ self.current_subtype = self.subtype_itr.next()
+ self.current_subtype_msg_list = []
+ self.current_subtype_msg_list.append(self._handle_message(event.message))
+ self._process_jms_headers(event.message)
+ self._process_jms_properties(event.message)
+ if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]:
+ self.received_value_map[self.current_subtype] = self.current_subtype_msg_list
+ self.current_subtype = None
+ self.current_subtype_msg_list = []
+ self.received += 1
+ if self.received == self.expected:
+ event.receiver.close()
+ event.connection.close()
+
+ def on_connection_error(self, event):
+ print 'JmsReceiverShim.on_connection_error'
+
+ def on_session_error(self, event):
+ print 'JmsReceiverShim.on_session_error'
+
+ def on_link_error(self, event):
+ print 'JmsReceiverShim.on_link_error'
+
+ def _handle_message(self, message):
+ """Handles the analysis of a received message"""
+ if self.jms_msg_type == 'JMS_MESSAGE_TYPE':
+ return self._receive_jms_message(message)
+ if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
+ return self._receive_jms_bytesmessage(message)
+ if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
+ return self._recieve_jms_mapmessage(message)
+ if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
+ return self._recieve_jms_objectmessage(message)
+ if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
+ return self._receive_jms_streammessage(message)
+ if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
+ return self._receive_jms_textmessage(message)
+ print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type
+ return None
+
+ def _get_tot_num_messages(self):
+ """"Counts up the total number of messages which should be received from the expected message map"""
+ total = 0
+ for key in self.expteced_msg_map:
+ total += int(self.expteced_msg_map[key])
+ return total
+
+ def _receive_jms_message(self, message):
+ """"Receives a JMS message (without a body)"""
+ assert self.jms_msg_type == 'JMS_MESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(0)
+ if message.body is not None:
+ raise InteropTestError('_receive_jms_message: Invalid body for type JMS_MESSAGE_TYPE: %s' %
+ str(message.body))
+ return None
+
+ def _receive_jms_bytesmessage(self, message):
+ """"Receives a JMS bytes message"""
+ assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3)
+ if self.current_subtype == 'boolean':
+ if message.body == b'\x00':
+ return 'False'
+ if message.body == b'\x01':
+ return 'True'
+ raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
+ str(message.body))
+ if self.current_subtype == 'byte':
+ return hex(unpack('b', message.body)[0])
+ if self.current_subtype == 'bytes':
+ return str(message.body)
+ if self.current_subtype == 'char':
+ if len(message.body) == 2: # format 'a' or '\xNN'
+ return str(message.body[1]) # strip leading '\x00' char
+ raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body))
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', message.body)[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', message.body)[0]
+ if self.current_subtype == 'int':
+ return hex(unpack('!i', message.body)[0])
+ if self.current_subtype == 'long':
+ return hex(unpack('!q', message.body)[0])
+ if self.current_subtype == 'short':
+ return hex(unpack('!h', message.body)[0])
+ if self.current_subtype == 'string':
+ # NOTE: first 2 bytes are string length, must be present
+ if len(message.body) >= 2:
+ str_len = unpack('!H', message.body[:2])[0]
+ str_body = str(message.body[2:])
+ if len(str_body) != str_len:
+ raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
+ (str_len, str_body, len(str_body)))
+ return str_body
+ else:
+ raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
+ (repr(message.body), len(message.body)))
+ raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _recieve_jms_mapmessage(self, message):
+ """"Receives a JMS map message"""
+ assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2)
+ key, value = message.body.items()[0]
+ assert key[:-3] == self.current_subtype
+ if self.current_subtype == 'boolean':
+ return str(value)
+ if self.current_subtype == 'byte':
+ return hex(value)
+ if self.current_subtype == 'bytes':
+ return str(value)
+ if self.current_subtype == 'char':
+ return str(value)
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', pack('!d', value))[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', pack('!f', value))[0]
+ if self.current_subtype == 'int':
+ return hex(value)
+ if self.current_subtype == 'long':
+ return hex(int(value))
+ if self.current_subtype == 'short':
+ return hex(value)
+ if self.current_subtype == 'string':
+ return str(value)
+ raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _recieve_jms_objectmessage(self, message):
+ """"Receives a JMS Object message"""
+ assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1)
+ return self._get_java_obj(message.body)
+
+ def _get_java_obj(self, java_obj_bytes):
+ """
+ Take bytes from serialized Java object and construct a Java object, then return its toString() value. The
+ work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java
+ utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar.
+ java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...')
+ returns: string containing Java class value as returned by the toString() method
+ """
+ java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip()
+ out_str = check_output(['java',
+ '-cp',
+ 'target/JavaObjUtils.jar',
+ 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj',
+ java_obj_bytes_str])
+ out_str_list = out_str.split('\n')[:-1] # remove trailing \n
+ if len(out_str_list) > 1:
+ raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str)
+ colon_index = out_str_list[0].index(':')
+ if colon_index < 0:
+ raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str)
+ java_class_name = out_str_list[0][:colon_index]
+ java_class_value_str = out_str_list[0][colon_index+1:]
+ if java_class_name != self.current_subtype:
+ raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' %
+ (self.current_subtype, java_class_name))
+ return java_class_value_str
+
+ def _receive_jms_streammessage(self, message):
+ """Receives a JMS stream message"""
+ assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4)
+ # Every message is a list with one item [value]
+ assert len(message.body) == 1
+ value = message.body[0]
+ if self.current_subtype == 'boolean':
+ return str(value)
+ if self.current_subtype == 'byte':
+ return hex(value)
+ if self.current_subtype == 'bytes':
+ return str(value)
+ if self.current_subtype == 'char':
+ return str(value)
+ if self.current_subtype == 'double':
+ return '0x%016x' % unpack('!Q', pack('!d', value))[0]
+ if self.current_subtype == 'float':
+ return '0x%08x' % unpack('!L', pack('!f', value))[0]
+ if self.current_subtype == 'int':
+ return hex(value)
+ if self.current_subtype == 'long':
+ return hex(int(value))
+ if self.current_subtype == 'short':
+ return hex(value)
+ if self.current_subtype == 'string':
+ return str(value)
+ raise InteropTestError('JmsRecieverShim._receive_jms_streammessage(): ' +
+ 'JMS message type %s: Unknown or unsupported subtype \'%s\'' %
+ (self.jms_msg_type, self.current_subtype))
+
+ def _receive_jms_textmessage(self, message):
+ """"Receives a JMS text message"""
+ assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE'
+ assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5)
+ return message.body
+
+ def _process_jms_headers(self, message):
+ """"Checks the supplied message for three JMS headers: message type, correlation-id and reply-to"""
+ # JMS message type header
+ message_type_header = message._get_subject()
+ if message_type_header is not None:
+ self.jms_header_map['JMS_TYPE_HEADER'] = {'string': message_type_header}
+
+ # JMS correlation ID
+ correlation_id = message._get_correlation_id()
+ if correlation_id is not None:
+ if 'JMS_CORRELATIONID_AS_BYTES' in self.flag_map and self.flag_map['JMS_CORRELATIONID_AS_BYTES']:
+ self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'bytes': correlation_id}
+ else:
+ self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'string': correlation_id}
+
+ # JMS reply-to
+ reply_to = message._get_reply_to()
+ if reply_to is not None:
+ if 'JMS_REPLYTO_AS_TOPIC' in self.flag_map and self.flag_map['JMS_REPLYTO_AS_TOPIC']:
+ # Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present
+ if len(reply_to) > 8 and reply_to[0:8] == 'topic://':
+ reply_to = reply_to[8:]
+ self.jms_header_map['JMS_REPLYTO_HEADER'] = {'topic': reply_to}
+ else:
+ if len(reply_to) > 8 and reply_to[0:8] == 'queue://':
+ reply_to = reply_to[8:]
+ self.jms_header_map['JMS_REPLYTO_HEADER'] = {'queue': reply_to}
+
+ def _process_jms_properties(self, message):
+ """"Checks the supplied message for JMS message properties and decodes them"""
+ if message.properties is not None:
+ for jms_property_name in message.properties:
+ underscore_index = jms_property_name.find('_')
+ if underscore_index >= 0: # Ignore any other properties without '_'
+ jms_property_type = jms_property_name[0:underscore_index]
+ value = message.properties[jms_property_name]
+ if jms_property_type == 'boolean':
+ self.jms_property_map[jms_property_name] = {'boolean': str(value)}
+ elif jms_property_type == 'byte':
+ self.jms_property_map[jms_property_name] = {'byte': hex(value)}
+ elif jms_property_type == 'double':
+ self.jms_property_map[jms_property_name] = {'double': '0x%016x' %
+ unpack('!Q', pack('!d', value))[0]}
+ elif jms_property_type == 'float':
+ self.jms_property_map[jms_property_name] = {'float': '0x%08x' %
+ unpack('!L', pack('!f', value))[0]}
+ elif jms_property_type == 'int':
+ self.jms_property_map[jms_property_name] = {'int': hex(value)}
+ elif jms_property_type == 'long':
+ self.jms_property_map[jms_property_name] = {'long': hex(int(value))}
+ elif jms_property_type == 'short':
+ self.jms_property_map[jms_property_name] = {'short': hex(value)}
+ elif jms_property_type == 'string':
+ self.jms_property_map[jms_property_name] = {'string': str(value)}
+ else:
+ pass # Ignore any other properties, brokers can add them and we don't know what they may be
+
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: JMS message type
+# 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap]
+#print '#### sys.argv=%s' % sys.argv
+try:
+ RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))
+ Container(RECEIVER).run()
+ print sys.argv[3]
+ print dumps(RECEIVER.get_received_value_map())
+ print dumps(RECEIVER.get_jms_header_map())
+ print dumps(RECEIVER.get_jms_property_map())
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print 'jms-receiver-shim EXCEPTION:', exc
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/jms_messages_test/Sender.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/jms_messages_test/Sender.py b/shims/qpid-proton-python/src/jms_messages_test/Sender.py
new file mode 100755
index 0000000..5a1108a
--- /dev/null
+++ b/shims/qpid-proton-python/src/jms_messages_test/Sender.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python
+
+"""
+JMS sender shim for qpid-interop-test
+"""
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from json import loads
+from proton import byte, char, float32, int32, Message, short, symbol
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from interop_test_errors import InteropTestError
+from subprocess import check_output
+from struct import pack, unpack
+from traceback import format_exc
+
+# These values must tie in with the Qpid-JMS client values found in
+# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport
+QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type')
+QPID_JMS_TYPE_ANNOTATIONS = {
+ 'JMS_MESSAGE_TYPE': byte(0),
+ 'JMS_BYTESMESSAGE_TYPE': byte(3),
+ 'JMS_MAPMESSAGE_TYPE': byte(2),
+ 'JMS_OBJECTMESSAGE_TYPE': byte(1),
+ 'JMS_STREAMMESSAGE_TYPE': byte(4),
+ 'JMS_TEXTMESSAGE_TYPE': byte(5)
+ }
+def create_annotation(jms_msg_type):
+ """Function which creates a message annotation for JMS message type as used by the Qpid JMS client"""
+ return {QPID_JMS_TYPE_ANNOTATION_NAME: QPID_JMS_TYPE_ANNOTATIONS[jms_msg_type]}
+
+class JmsSenderShim(MessagingHandler):
+ """
+ This shim sends JMS messages of a particular JMS message type according to the test parameters list. This list
+ contains three maps:
+ 0: The test value map, which contains test value types as keys, and lists of values of that type;
+ 1. The test headers map, which contains the JMS headers as keys and a submap conatining types and values;
+ 2. The test proprties map, which contains the name of the properties as keys, and a submap containing types
+ and values
+ This shim takes the combinations of the above map and creates test cases, each of which sends a single message
+ with (or without) JMS headers and properties.
+ """
+ def __init__(self, broker_ip_addr, queue_name, jms_msg_type, test_parameters_list):
+ super(JmsSenderShim, self).__init__()
+ self.broker_ip_addr = broker_ip_addr
+ self.queue_name = queue_name
+ self.jms_msg_type = jms_msg_type
+ self.test_value_map = test_parameters_list[0]
+ self.test_headers_map = test_parameters_list[1]
+ self.test_properties_map = test_parameters_list[2]
+ self.sent = 0
+ self.confirmed = 0
+ self.total = self._get_total_num_msgs()
+
+ def on_start(self, event):
+ """Event callback for when the client starts"""
+ event.container.create_sender('%s/%s' % (self.broker_ip_addr, self.queue_name))
+
+ def on_sendable(self, event):
+ """Event callback for when send credit is received, allowing the sending of messages"""
+ if self.sent == 0:
+ # These types expect a test_values Python string representation of a map: '{type:[val, val, val], ...}'
+ for sub_type in sorted(self.test_value_map.keys()):
+ if self._send_test_values(event, sub_type, self.test_value_map[sub_type]):
+ return
+
+ def on_connection_error(self, event):
+ print 'JmsSenderShim.on_connection_error'
+
+ def on_session_error(self, event):
+ print 'JmsSenderShim.on_session_error'
+
+ def on_link_error(self, event):
+ print 'JmsSenderShim.on_link_error'
+
+ def on_accepted(self, event):
+ """Event callback for when a sent message is accepted by the broker"""
+ self.confirmed += 1
+ if self.confirmed == self.total:
+ event.connection.close()
+
+ def on_disconnected(self, event):
+ """Event callback for when the broker disconnects with the client"""
+ self.sent = self.confirmed
+
+ def _get_total_num_msgs(self):
+ """
+ Calculates the total number of messages to be sent based on the message parameters received on the command-line
+ """
+ total = 0
+ for key in self.test_value_map.keys():
+ total += len(self.test_value_map[key])
+ return total
+
+ def _send_test_values(self, event, test_value_type, test_values):
+ """Method which loops through recieved parameters and sends the corresponding messages"""
+ value_num = 0
+ for test_value in test_values:
+ if event.sender.credit:
+ message = self._create_message(test_value_type, test_value, value_num)
+ # TODO: set message to address
+ if message is not None:
+ self._add_jms_message_headers(message)
+ self._add_jms_message_properties(message)
+ event.sender.send(message)
+ self.sent += 1
+ value_num += 1
+ else:
+ event.connection.close()
+ return True
+ return False
+
+ # TODO: Change this to return a list of messages. That way each test can return more than one message
+ def _create_message(self, test_value_type, test_value, value_num):
+ """Create a single message of the appropriate JMS message type"""
+ if self.jms_msg_type == 'JMS_MESSAGE_TYPE':
+ return self._create_jms_message(test_value_type, test_value)
+ elif self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
+ return self._create_jms_bytesmessage(test_value_type, test_value)
+ elif self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
+ return self._create_jms_mapmessage(test_value_type, test_value, "%s%03d" % (test_value_type, value_num))
+ elif self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
+ return self._create_jms_objectmessage('%s:%s' % (test_value_type, test_value))
+ elif self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
+ return self._create_jms_streammessage(test_value_type, test_value)
+ elif self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
+ return self._create_jms_textmessage(test_value)
+ else:
+ print 'jms-send: Unsupported JMS message type "%s"' % self.jms_msg_type
+ return None
+
+ def _create_jms_message(self, test_value_type, test_value):
+ """Create a JMS message type (without message body)"""
+ if test_value_type != 'none':
+ raise InteropTestError('JmsSenderShim._create_jms_message: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ if test_value is not None:
+ raise InteropTestError('JmsSenderShim._create_jms_message: Invalid value "%s" for subtype "%s"' %
+ (test_value, test_value_type))
+ return Message(id=(self.sent+1),
+ content_type='application/octet-stream',
+ annotations=create_annotation('JMS_MESSAGE_TYPE'))
+
+ def _create_jms_bytesmessage(self, test_value_type, test_value):
+ """Create a JMS bytes message"""
+ # NOTE: test_value contains all unicode strings u'...' as returned by json
+ body_bytes = None
+ if test_value_type == 'boolean':
+ body_bytes = b'\x01' if test_value == 'True' else b'\x00'
+ elif test_value_type == 'byte':
+ body_bytes = pack('b', int(test_value, 16))
+ elif test_value_type == 'bytes':
+ body_bytes = str(test_value) # remove unicode
+ elif test_value_type == 'char':
+ # JMS expects two-byte chars, ASCII chars can be prefixed with '\x00'
+ body_bytes = '\x00' + str(test_value) # remove unicode
+ elif test_value_type == 'double' or test_value_type == 'float':
+ body_bytes = test_value[2:].decode('hex')
+ elif test_value_type == 'int':
+ body_bytes = pack('!i', int(test_value, 16))
+ elif test_value_type == 'long':
+ body_bytes = pack('!q', long(test_value, 16))
+ elif test_value_type == 'short':
+ body_bytes = pack('!h', short(test_value, 16))
+ elif test_value_type == 'string':
+ # NOTE: First two bytes must be string length
+ test_value_str = str(test_value) # remove unicode
+ body_bytes = pack('!H', len(test_value_str)) + test_value_str
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_bytesmessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body=body_bytes,
+ inferred=True,
+ content_type='application/octet-stream',
+ annotations=create_annotation('JMS_BYTESMESSAGE_TYPE'))
+
+ def _create_jms_mapmessage(self, test_value_type, test_value, name):
+ """Create a JMS map message"""
+ if test_value_type == 'boolean':
+ value = test_value == 'True'
+ elif test_value_type == 'byte':
+ value = byte(int(test_value, 16))
+ elif test_value_type == 'bytes':
+ value = str(test_value) # remove unicode
+ elif test_value_type == 'char':
+ value = char(test_value)
+ elif test_value_type == 'double':
+ value = unpack('!d', test_value[2:].decode('hex'))[0]
+ elif test_value_type == 'float':
+ value = float32(unpack('!f', test_value[2:].decode('hex'))[0])
+ elif test_value_type == 'int':
+ value = int32(int(test_value, 16))
+ elif test_value_type == 'long':
+ value = long(test_value, 16)
+ elif test_value_type == 'short':
+ value = short(int(test_value, 16))
+ elif test_value_type == 'string':
+ value = test_value
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_mapmessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body={name: value},
+ inferred=False,
+ annotations=create_annotation('JMS_MAPMESSAGE_TYPE'))
+
+ def _create_jms_objectmessage(self, test_value):
+ """Create a JMS object message"""
+ java_binary = self._s_get_java_obj_binary(test_value)
+ return Message(id=(self.sent+1),
+ body=java_binary,
+ inferred=True,
+ content_type='application/x-java-serialized-object',
+ annotations=create_annotation('JMS_OBJECTMESSAGE_TYPE'))
+
+ @staticmethod
+ def _s_get_java_obj_binary(java_class_str):
+ """Call external utility to create Java object and stringify it, returning the string representation"""
+ out_str = check_output(['java',
+ '-cp',
+ 'target/JavaObjUtils.jar',
+ 'org.apache.qpid.interop_test.obj_util.JavaObjToBytes',
+ java_class_str])
+ out_str_list = out_str.split('\n')[:-1] # remove trailing \n
+ if out_str_list[0] != java_class_str:
+ raise InteropTestError('JmsSenderShim._s_get_java_obj_binary(): Call to JavaObjToBytes failed\n%s' %
+ out_str)
+ return out_str_list[1].decode('hex')
+
+ def _create_jms_streammessage(self, test_value_type, test_value):
+ """Create a JMS stream message"""
+ if test_value_type == 'boolean':
+ body_list = [test_value == 'True']
+ elif test_value_type == 'byte':
+ body_list = [byte(int(test_value, 16))]
+ elif test_value_type == 'bytes':
+ body_list = [str(test_value)]
+ elif test_value_type == 'char':
+ body_list = [char(test_value)]
+ elif test_value_type == 'double':
+ body_list = [unpack('!d', test_value[2:].decode('hex'))[0]]
+ elif test_value_type == 'float':
+ body_list = [float32(unpack('!f', test_value[2:].decode('hex'))[0])]
+ elif test_value_type == 'int':
+ body_list = [int32(int(test_value, 16))]
+ elif test_value_type == 'long':
+ body_list = [long(test_value, 16)]
+ elif test_value_type == 'short':
+ body_list = [short(int(test_value, 16))]
+ elif test_value_type == 'string':
+ body_list = [test_value]
+ else:
+ raise InteropTestError('JmsSenderShim._create_jms_streammessage: Unknown or unsupported subtype "%s"' %
+ test_value_type)
+ return Message(id=(self.sent+1),
+ body=body_list,
+ inferred=True,
+ annotations=create_annotation('JMS_STREAMMESSAGE_TYPE'))
+
+ def _create_jms_textmessage(self, test_value_text):
+ """Create a JMS text message"""
+ return Message(id=(self.sent+1),
+ body=unicode(test_value_text),
+ annotations=create_annotation('JMS_TEXTMESSAGE_TYPE'))
+
+ def _add_jms_message_headers(self, message):
+ """Add JMS headers to the supplied message from self.test_headers_map"""
+ for jms_header in self.test_headers_map.iterkeys():
+ value_map = self.test_headers_map[jms_header]
+ value_type = value_map.keys()[0] # There is only ever one value in map
+ value = value_map[value_type]
+ if jms_header == 'JMS_TYPE_HEADER':
+ if value_type == 'string':
+ self._s_set_jms_type_header(message, value)
+ else:
+ raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
+ 'JMS_TYPE_HEADER requires value type "string", type "%s" found' %
+ value_type)
+ elif jms_header == 'JMS_CORRELATIONID_HEADER':
+ if value_type == 'string':
+ self._s_set_jms_correlation_id(message, value)
+ elif value_type == 'bytes':
+ self._s_set_jms_correlation_id(message, str(value))
+ else:
+ raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
+ 'JMS_CORRELATIONID_HEADER requires value type "string" or "bytes", ' +
+ 'type "%s" found' % value_type)
+ elif jms_header == 'JMS_REPLYTO_HEADER':
+ if value_type == 'queue' or value_type == 'topic':
+ self._s_set_jms_reply_to(message, value_type, value)
+ elif value_type == 'temp_queue' or value_type == 'temp_topic':
+ raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
+ 'JMS_REPLYTO_HEADER type "temp_queue" or "temp_topic" not handled')
+ else:
+ raise InteropTestError('JmsSenderShim._add_jms_message_headers(): ' +
+ 'JMS_REPLYTO_HEADER requires value type "queue" or "topic", ' +
+ 'type "%s" found' % value_type)
+ else:
+ raise InteropTestError('JmsSenderShim._add_jms_message_headers(): Invalid JMS message header "%s"' %
+ jms_header)
+
+
+ @staticmethod
+ def _s_set_jms_type_header(message, message_type):
+ """Adds a JMS message type header"""
+ message._set_subject(message_type)
+
+ @staticmethod
+ def _s_set_jms_correlation_id(message, correlation_id):
+ """Adds a JMS correlation id header"""
+ message._set_correlation_id(correlation_id)
+ message.annotations[symbol(u'x-opt-app-correlation-id')] = True
+
+ @staticmethod
+ def _s_set_jms_reply_to(message, jms_destination_type_str, destination):
+ """Adds a JMS reply-to header"""
+ if jms_destination_type_str == 'queue':
+ message._set_reply_to(destination)
+ message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(0)
+ elif jms_destination_type_str == 'topic':
+ message._set_reply_to(destination)
+ message.annotations[symbol(u'x-opt-jms-reply-to')] = byte(1)
+ else:
+ raise InteropTestError('JmsSenderShim._s_set_jms_reply_to(): ' +
+ 'Invalid value for jms_destination_type_str "%s"' % jms_destination_type_str)
+
+ def _add_jms_message_properties(self, message):
+ """Adds message properties to the supplied message from self.test_properties_map"""
+ for property_name in self.test_properties_map.iterkeys():
+ value_map = self.test_properties_map[property_name]
+ value_type = value_map.keys()[0] # There is only ever one value in map
+ value = value_map[value_type]
+ if message.properties is None:
+ message.properties = {}
+ if value_type == 'boolean':
+ message.properties[property_name] = value == 'True'
+ elif value_type == 'byte':
+ message.properties[property_name] = byte(int(value, 16))
+ elif value_type == 'double':
+ message.properties[property_name] = unpack('!d', value[2:].decode('hex'))[0]
+ elif value_type == 'float':
+ message.properties[property_name] = float32(unpack('!f', value[2:].decode('hex'))[0])
+ elif value_type == 'int':
+ message.properties[property_name] = int(value, 16)
+ elif value_type == 'long':
+ message.properties[property_name] = long(value, 16)
+ elif value_type == 'short':
+ message.properties[property_name] = short(int(value, 16))
+ elif value_type == 'string':
+ message.properties[property_name] = value
+ else:
+ raise InteropTestError('JmsSenderShim._add_jms_message_properties: ' +
+ 'Unknown or unhandled message property type ?%s"' % value_type)
+
+
+
+# --- main ---
+# Args: 1: Broker address (ip-addr:port)
+# 2: Queue name
+# 3: JMS message type
+# 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap]
+#print '#### sys.argv=%s' % sys.argv
+#print '>>> test_values=%s' % loads(sys.argv[4])
+try:
+ Container(JmsSenderShim(sys.argv[1], sys.argv[2], sys.argv[3], loads(sys.argv[4]))).run()
+except KeyboardInterrupt:
+ pass
+except Exception as exc:
+ print 'jms-sender-shim EXCEPTION:', exc
+ print format_exc()
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/src/python/qpid-interop-test/__init__.py
----------------------------------------------------------------------
diff --git a/src/python/qpid-interop-test/__init__.py b/src/python/qpid-interop-test/__init__.py
index 7b8aee3..a94c993 100644
--- a/src/python/qpid-interop-test/__init__.py
+++ b/src/python/qpid-interop-test/__init__.py
@@ -19,6 +19,7 @@
import broker_properties
import interop_test_errors
+import shims
import test_type_map
import types
import jms
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org