You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2016/10/04 19:17:30 UTC
[4/8] qpid-interop-test git commit: QPIDIT-41: Re-organization of
project directory structure
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp
deleted file mode 100644
index 8cbe515..0000000
--- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpidit/shim/JmsReceiver.hpp"
-
-#include <iostream>
-#include <json/json.h>
-#include "proton/connection.hpp"
-#include "proton/default_container.hpp"
-#include "proton/delivery.hpp"
-#include "proton/transport.hpp"
-#include "qpidit/QpidItErrors.hpp"
-
-namespace qpidit
-{
- namespace shim
- {
- //static
- proton::symbol JmsReceiver::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type");
- std::map<std::string, int8_t>JmsReceiver::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap();
-
-
- JmsReceiver::JmsReceiver(const std::string& brokerUrl,
- const std::string& jmsMessageType,
- const Json::Value& testNumberMap,
- const Json::Value& flagMap):
- _brokerUrl(brokerUrl),
- _jmsMessageType(jmsMessageType),
- _testNumberMap(testNumberMap),
- _flagMap(flagMap),
- _subTypeList(testNumberMap.getMemberNames()),
- _subTypeIndex(0),
- _expected(getTotalNumExpectedMsgs(testNumberMap)),
- _received(0UL),
- _receivedSubTypeList(Json::arrayValue),
- _receivedValueMap(Json::objectValue),
- _receivedHeadersMap(Json::objectValue),
- _receivedPropertiesMap(Json::objectValue)
- {}
-
- JmsReceiver::~JmsReceiver() {}
-
- Json::Value& JmsReceiver::getReceivedValueMap() {
- return _receivedValueMap;
- }
-
- Json::Value& JmsReceiver::getReceivedHeadersMap() {
- return _receivedHeadersMap;
- }
-
- Json::Value& JmsReceiver::getReceivedPropertiesMap() {
- return _receivedPropertiesMap;
- }
-
- void JmsReceiver::on_container_start(proton::container &c) {
- c.open_receiver(_brokerUrl);
- }
-
- void JmsReceiver::on_message(proton::delivery &d, proton::message &m) {
- try {
- if (_received < _expected) {
- int8_t t = JMS_MESSAGE_TYPE;
- try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();}
- catch (const std::exception& e) {
- std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl;
- throw;
- }
- switch (t) {
- case JMS_MESSAGE_TYPE:
- receiveJmsMessage(m);
- break;
- case JMS_OBJECTMESSAGE_TYPE:
- receiveJmsObjectMessage(m);
- break;
- case JMS_MAPMESSAGE_TYPE:
- receiveJmsMapMessage(m);
- break;
- case JMS_BYTESMESSAGE_TYPE:
- receiveJmsBytesMessage(m);
- break;
- case JMS_STREAMMESSAGE_TYPE:
- receiveJmsStreamMessage(m);
- break;
- case JMS_TEXTMESSAGE_TYPE:
- receiveJmsTextMessage(m);
- break;
- default:;
- // TODO: handle error - no known JMS message type
- }
-
- processMessageHeaders(m);
- processMessageProperties(m);
-
- std::string subType(_subTypeList[_subTypeIndex]);
- // Increment the subtype if the required number of messages have been received
- if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() &&
- _subTypeIndex < _testNumberMap.size()) {
- _receivedValueMap[subType] = _receivedSubTypeList;
- _receivedSubTypeList.clear();
- ++_subTypeIndex;
- }
- _received++;
- if (_received >= _expected) {
- d.receiver().close();
- d.connection().close();
- }
- }
- } catch (const std::exception&) {
- d.receiver().close();
- d.connection().close();
- throw;
- }
- }
-
- void JmsReceiver::on_connection_error(proton::connection &c) {
- std::cerr << "JmsReceiver::on_connection_error(): " << c.error() << std::endl;
- }
-
- void JmsReceiver::on_receiver_error(proton::receiver& r) {
- std::cerr << "JmsReceiver::on_receiver_error(): " << r.error() << std::endl;
- }
-
- void JmsReceiver::on_session_error(proton::session &s) {
- std::cerr << "JmsReceiver::on_session_error(): " << s.error() << std::endl;
- }
-
- void JmsReceiver::on_transport_error(proton::transport &t) {
- std::cerr << "JmsReceiver::on_transport_error(): " << t.error() << std::endl;
- }
-
- void JmsReceiver::on_error(const proton::error_condition &ec) {
- std::cerr << "JmsReceiver::on_error(): " << ec << std::endl;
- }
-
- //static
- uint32_t JmsReceiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) {
- uint32_t total(0UL);
- for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) {
- total += (*i).asUInt();
- }
- return total;
-
- }
-
- // protected
-
- void JmsReceiver::receiveJmsMessage(const proton::message& msg) {
- _receivedSubTypeList.append(Json::Value());
- }
-
- void JmsReceiver::receiveJmsObjectMessage(const proton::message& msg) {
- // TODO
- }
-
- void JmsReceiver::receiveJmsMapMessage(const proton::message& msg) {
- if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) {
- throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE");
- }
- std::string subType(_subTypeList[_subTypeIndex]);
- std::map<std::string, proton::value> m;
- msg.body().get(m);
- for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) {
- std::string key = i->first;
- if (subType.compare(key.substr(0, key.size()-3)) != 0) {
- throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key);
- }
- proton::value val = i->second;
- if (subType.compare("boolean") == 0) {
- _receivedSubTypeList.append(val.get<bool>() ? Json::Value("True") : Json::Value("False"));
- } else if (subType.compare("byte") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val.get<int8_t>())));
- } else if (subType.compare("bytes") == 0) {
- _receivedSubTypeList.append(Json::Value(std::string(val.get<proton::binary>())));
- } else if (subType.compare("char") == 0) {
- std::ostringstream oss;
- oss << (char)val.get<wchar_t>();
- _receivedSubTypeList.append(Json::Value(oss.str()));
- } else if (subType.compare("double") == 0) {
- double d = val.get<double>();
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false)));
- } else if (subType.compare("float") == 0) {
- float f = val.get<float>();
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false)));
- } else if (subType.compare("int") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val.get<int32_t>())));
- } else if (subType.compare("long") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val.get<int64_t>())));
- } else if (subType.compare("short") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val.get<int16_t>())));
- } else if (subType.compare("string") == 0) {
- _receivedSubTypeList.append(Json::Value(val.get<std::string>()));
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- }
- }
-
- void JmsReceiver::receiveJmsBytesMessage(const proton::message& msg) {
- if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) {
- throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE");
- }
- std::string subType(_subTypeList[_subTypeIndex]);
- proton::binary body = msg.body().get<proton::binary>();
- if (subType.compare("boolean") == 0) {
- if (body.size() != 1) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=boolean", 1, body.size());
- _receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False"));
- } else if (subType.compare("byte") == 0) {
- if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=byte", sizeof(int8_t), body.size());
- int8_t val = *((int8_t*)body.data());
- _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val)));
- } else if (subType.compare("bytes") == 0) {
- _receivedSubTypeList.append(Json::Value(std::string(body)));
- } else if (subType.compare("char") == 0) {
- if (body.size() != sizeof(uint16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=char", sizeof(uint16_t), body.size());
- // TODO: This is ugly: ignoring first byte - handle UTF-16 correctly
- char c = body[1];
- std::ostringstream oss;
- oss << c;
- _receivedSubTypeList.append(Json::Value(oss.str()));
- } else if (subType.compare("double") == 0) {
- if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=double", sizeof(int64_t), body.size());
- int64_t val = be64toh(*((int64_t*)body.data()));
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false)));
- } else if (subType.compare("float") == 0) {
- if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=float", sizeof(int32_t), body.size());
- int32_t val = be32toh(*((int32_t*)body.data()));
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false)));
- } else if (subType.compare("long") == 0) {
- if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=long", sizeof(int64_t), body.size());
- int64_t val = be64toh(*((int64_t*)body.data()));
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val)));
- } else if (subType.compare("int") == 0) {
- if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=int", sizeof(int32_t), body.size());
- int32_t val = be32toh(*((int32_t*)body.data()));
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val)));
- } else if (subType.compare("short") == 0) {
- if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=short", sizeof(int16_t), body.size());
- int16_t val = be16toh(*((int16_t*)body.data()));
- _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val)));
- } else if (subType.compare("string") == 0) {
- // TODO: decode string size in first two bytes and check string size
- _receivedSubTypeList.append(Json::Value(std::string(body).substr(2)));
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- }
-
- void JmsReceiver::receiveJmsStreamMessage(const proton::message& msg) {
- if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) {
- throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE");
- }
- std::string subType(_subTypeList[_subTypeIndex]);
- std::vector<proton::value> l;
- msg.body().get(l);
- for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) {
- if (subType.compare("boolean") == 0) {
- _receivedSubTypeList.append(i->get<bool>() ? Json::Value("True") : Json::Value("False"));
- } else if (subType.compare("byte") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(i->get<int8_t>())));
- } else if (subType.compare("bytes") == 0) {
- _receivedSubTypeList.append(Json::Value(std::string(i->get<proton::binary>())));
- } else if (subType.compare("char") == 0) {
- std::ostringstream oss;
- oss << (char)i->get<wchar_t>();
- _receivedSubTypeList.append(Json::Value(oss.str()));
- } else if (subType.compare("double") == 0) {
- double d = i->get<double>();
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false)));
- } else if (subType.compare("float") == 0) {
- float f = i->get<float>();
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false)));
- } else if (subType.compare("int") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(i->get<int32_t>())));
- } else if (subType.compare("long") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(i->get<int64_t>())));
- } else if (subType.compare("short") == 0) {
- _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(i->get<int16_t>())));
- } else if (subType.compare("string") == 0) {
- _receivedSubTypeList.append(Json::Value(i->get<std::string>()));
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- }
-
- }
-
- void JmsReceiver::receiveJmsTextMessage(const proton::message& msg) {
- if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) {
- throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE");
- }
- _receivedSubTypeList.append(Json::Value(msg.body().get<std::string>()));
- }
-
- void JmsReceiver::processMessageHeaders(const proton::message& msg) {
- addMessageHeaderString("JMS_TYPE_HEADER", msg.subject());
- if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) {
- addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id()));
- } else {
- try {
- addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id()));
- } catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id?
- }
-
- std::string reply_to = msg.reply_to();
- // Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present
- if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) {
- if (reply_to.find("topic://") == 0) {
- addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to.substr(8));
- } else {
- addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to);
- }
- } else {
- if (reply_to.find("queue://") == 0) {
- addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to.substr(8));
- } else {
- addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to);
- }
- }
- }
-
- void JmsReceiver::addMessageHeaderString(const char* headerName, const std::string& value) {
- if (!value.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty strings are allowed in headers
- Json::Value valueMap(Json::objectValue);
- valueMap["string"] = value;
- _receivedHeadersMap[headerName] = valueMap;
- }
- }
-
- void JmsReceiver::addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba) {
- if (!ba.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty binaries are allowed in headers
- Json::Value valueMap(Json::objectValue);
- valueMap["bytes"] = std::string(ba);
- _receivedHeadersMap[headerName] = valueMap;
- }
- }
-
- void JmsReceiver::addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d) {
- if (!d.empty()) {
- Json::Value valueMap(Json::objectValue);
- switch (dt) {
- case JMS_QUEUE:
- valueMap["queue"] = d;
- break;
- case JMS_TOPIC:
- valueMap["topic"] = d;
- break;
- default:
- ; // TODO: Handle error: remaining JMS destinations not handled.
- }
- _receivedHeadersMap[headerName] = valueMap;
- }
- }
-
- void JmsReceiver::processMessageProperties(const proton::message& msg) {
- // TODO: Add this function when PROTON-1284 is fixed
-// std::map<proton::value, proton::value> props;
-// msg.properties().value() >> props;
- }
-
- //static
- std::map<std::string, int8_t> JmsReceiver::initializeJmsMessageTypeAnnotationMap() {
- std::map<std::string, int8_t> m;
- m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE;
- m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE;
- m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE;
- m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE;
- m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE;
- m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE;
- return m;
- }
-
-
- } /* namespace shim */
-} /* namespace qpidit */
-
-/* --- main ---
- * Args: 1: Broker address (ip-addr:port)
- * 2: Queue name
- * 3: JMS message type
- * 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap]
- */
-int main(int argc, char** argv) {
- /*
- for (int i=0; i<argc; ++i) {
- std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl;
- }
- */
- // TODO: improve arg management a little...
- if (argc != 5) {
- throw qpidit::ArgumentError("Incorrect number of arguments");
- }
-
- std::ostringstream oss;
- oss << argv[1] << "/" << argv[2];
-
- try {
- Json::Value testParams;
- Json::Reader jsonReader;
- if (not jsonReader.parse(argv[4], testParams, false)) {
- throw qpidit::JsonParserError(jsonReader);
- }
-
- qpidit::shim::JmsReceiver receiver(oss.str(), argv[3], testParams[0], testParams[1]);
- proton::default_container(receiver).run();
-
- Json::FastWriter fw;
- std::cout << argv[3] << std::endl;
- std::cout << fw.write(receiver.getReceivedValueMap());
- std::cout << fw.write(receiver.getReceivedHeadersMap());
- std::cout << fw.write(receiver.getReceivedPropertiesMap());
- } catch (const std::exception& e) {
- std::cout << "JmsReceiver error: " << e.what() << std::endl;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp
deleted file mode 100644
index 23c60fc..0000000
--- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_
-#define SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_
-
-#include <iomanip>
-#include <json/value.h>
-#include "proton/messaging_handler.hpp"
-#include "proton/types.hpp"
-#include "qpidit/shim/JmsDefinitions.hpp"
-#include <sstream>
-
-namespace qpidit
-{
- namespace shim
- {
-
- class JmsReceiver : public proton::messaging_handler
- {
- protected:
- static proton::symbol s_jmsMessageTypeAnnotationKey;
- static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues;
-
- const std::string _brokerUrl;
- const std::string _jmsMessageType;
- const Json::Value _testNumberMap;
- const Json::Value _flagMap;
- Json::Value::Members _subTypeList;
- int _subTypeIndex;
- uint32_t _expected;
- uint32_t _received;
- Json::Value _receivedSubTypeList;
- Json::Value _receivedValueMap;
- Json::Value _receivedHeadersMap;
- Json::Value _receivedPropertiesMap;
- public:
- JmsReceiver(const std::string& brokerUrl,
- const std::string& jmsMessageType,
- const Json::Value& testNumberMap,
- const Json::Value& flagMap);
- virtual ~JmsReceiver();
- Json::Value& getReceivedValueMap();
- Json::Value& getReceivedHeadersMap();
- Json::Value& getReceivedPropertiesMap();
- void on_container_start(proton::container &c);
- void on_message(proton::delivery &d, proton::message &m);
-
- void on_connection_error(proton::connection &c);
- void on_receiver_error(proton::receiver& r);
- void on_session_error(proton::session &s);
- void on_transport_error(proton::transport &t);
- void on_error(const proton::error_condition &c);
-
- static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap);
-
- protected:
- void receiveJmsMessage(const proton::message& msg);
- void receiveJmsObjectMessage(const proton::message& msg);
- void receiveJmsMapMessage(const proton::message& msg);
- void receiveJmsBytesMessage(const proton::message& msg);
- void receiveJmsStreamMessage(const proton::message& msg);
- void receiveJmsTextMessage(const proton::message& msg);
-
- void processMessageHeaders(const proton::message& msg);
- void addMessageHeaderString(const char* headerName, const std::string& value);
- void addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba);
- void addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d);
- void processMessageProperties(const proton::message& msg);
-
- static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap();
-
- // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format
- template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) {
- std::ostringstream oss;
- bool neg = false;
- if (signedFlag) {
- neg = val < 0;
- if (neg) val = -val;
- }
- oss << (neg ? "-" : "") << "0x" << std::hex;
- if (fillFlag) {
- oss << std::setw(sizeof(T)*2) << std::setfill('0');
- }
- oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val);
- return oss.str();
- }
- };
-
- } /* namespace shim */
-} /* namespace qpidit */
-
-#endif /* SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ */
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp
deleted file mode 100644
index 6576732..0000000
--- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp
+++ /dev/null
@@ -1,487 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpidit/shim/JmsSender.hpp"
-
-#include <cerrno>
-#include <iomanip>
-#include <iostream>
-#include <json/json.h>
-#include "proton/connection.hpp"
-#include "proton/default_container.hpp"
-#include "proton/tracker.hpp"
-#include "proton/transport.hpp"
-#include <stdio.h>
-
-namespace qpidit
-{
- namespace shim
- {
- //static
- proton::symbol JmsSender::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type");
- std::map<std::string, int8_t>JmsSender::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap();
-
- JmsSender::JmsSender(const std::string& brokerUrl,
- const std::string& jmsMessageType,
- const Json::Value& testParams) :
- _brokerUrl(brokerUrl),
- _jmsMessageType(jmsMessageType),
- _testValueMap(testParams[0]),
- _testHeadersMap(testParams[1]),
- _testPropertiesMap(testParams[2]),
- _msgsSent(0),
- _msgsConfirmed(0),
- _totalMsgs(getTotalNumMessages(_testValueMap))
- {
- if (_testValueMap.type() != Json::objectValue) {
- throw qpidit::InvalidJsonRootNodeError(Json::objectValue, _testValueMap.type());
- }
- }
-
- JmsSender::~JmsSender() {}
-
- void JmsSender::on_container_start(proton::container &c) {
- c.open_sender(_brokerUrl);
- }
-
- void JmsSender::on_sendable(proton::sender &s) {
- if (_totalMsgs == 0) {
- s.connection().close();
- } else if (_msgsSent == 0) {
- Json::Value::Members subTypes = _testValueMap.getMemberNames();
- std::sort(subTypes.begin(), subTypes.end());
- for (std::vector<std::string>::const_iterator i=subTypes.begin(); i!=subTypes.end(); ++i) {
- sendMessages(s, *i, _testValueMap[*i]);
- }
- }
- }
-
- void JmsSender::on_tracker_accept(proton::tracker &t) {
- _msgsConfirmed++;
- if (_msgsConfirmed == _totalMsgs) {
- t.connection().close();
- }
- }
-
- void JmsSender::on_transport_close(proton::transport &t) {
- _msgsSent = _msgsConfirmed;
- }
-
- void JmsSender::on_connection_error(proton::connection &c) {
- std::cerr << "JmsSender::on_connection_error(): " << c.error() << std::endl;
- }
-
- void JmsSender::on_sender_error(proton::sender &s) {
- std::cerr << "JmsSender::on_sender_error(): " << s.error() << std::endl;
- }
-
- void JmsSender::on_session_error(proton::session &s) {
- std::cerr << "JmsSender::on_session_error(): " << s.error() << std::endl;
- }
-
- void JmsSender::on_transport_error(proton::transport &t) {
- std::cerr << "JmsSender::on_transport_error(): " << t.error() << std::endl;
- }
-
- void JmsSender::on_error(const proton::error_condition &ec) {
- std::cerr << "JmsSender::on_error(): " << ec << std::endl;
- }
-
- // protected
-
- void JmsSender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) {
- uint32_t valueNumber = 0;
- for (Json::Value::const_iterator i=testValues.begin(); i!=testValues.end(); ++i) {
- if (s.credit()) {
- proton::message msg;
- if (_jmsMessageType.compare("JMS_MESSAGE_TYPE") == 0) {
- setMessage(msg, subType, (*i).asString());
- } else if (_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") == 0) {
- setBytesMessage(msg, subType, (*i).asString());
- } else if (_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") == 0) {
- setMapMessage(msg, subType, (*i).asString(), valueNumber);
- } else if (_jmsMessageType.compare("JMS_OBJECTMESSAGE_TYPE") == 0) {
- setObjectMessage(msg, subType, *i);
- } else if (_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") == 0) {
- setStreamMessage(msg, subType, (*i).asString());
- } else if (_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") == 0) {
- setTextMessage(msg, *i);
- } else {
- throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType);
- }
- addMessageHeaders(msg);
- addMessageProperties(msg);
- s.send(msg);
- _msgsSent += 1;
- valueNumber += 1;
- }
- }
-
- }
-
- proton::message& JmsSender::setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
- if (subType.compare("none") != 0) {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- if (testValueStr.size() != 0) {
- throw InvalidTestValueError(subType, testValueStr);
- }
- msg.content_type(proton::symbol("application/octet-stream"));
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
- proton::binary bin;
- if (subType.compare("boolean") == 0) {
- if (testValueStr.compare("False") == 0) bin.push_back(char(0));
- else if (testValueStr.compare("True") == 0) bin.push_back(char(1));
- else throw InvalidTestValueError(subType, testValueStr);
- } else if (subType.compare("byte") == 0) {
- uint8_t val = getIntegralValue<int8_t>(testValueStr);
- bin.push_back(char(val));
- } else if (subType.compare("bytes") == 0) {
- bin.assign(testValueStr.begin(), testValueStr.end());
- } else if (subType.compare("char") == 0) {
- bin.push_back(char(0));
- if (testValueStr[0] == '\\') { // Format: '\xNN'
- bin.push_back(getIntegralValue<char>(testValueStr.substr(2)));
- } else { // Format: 'c'
- bin.push_back(testValueStr[0]);
- }
- } else if (subType.compare("double") == 0) {
- uint64_t val;
- try {
- val = htobe64(std::strtoul(testValueStr.data(), NULL, 16));
- } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("double", testValueStr); }
- numToBinary(val, bin);
- //for (int i=0; i<sizeof(val); ++i) {
- // bin.push_back(* ((char*)&val + i));
- // }
- } else if (subType.compare("float") == 0) {
- uint32_t val;
- try {
- val = htobe32((uint32_t)std::strtoul(testValueStr.data(), NULL, 16));
- } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("float", testValueStr); }
- numToBinary(val, bin);
- //for (int i=0; i<sizeof(val); ++i) {
- // bin.push_back(* ((char*)&val + i));
- //}
- } else if (subType.compare("long") == 0) {
- uint64_t val = htobe64(getIntegralValue<uint64_t>(testValueStr));
- numToBinary(val, bin);
- //bin.assign(sizeof(val), val);
- } else if (subType.compare("int") == 0) {
- uint32_t val = htobe32(getIntegralValue<uint32_t>(testValueStr));
- numToBinary(val, bin);
- //bin.assign(sizeof(val), val);
- } else if (subType.compare("short") == 0) {
- uint16_t val = htobe16(getIntegralValue<int16_t>(testValueStr));
- numToBinary(val, bin);
- //bin.assign(sizeof(val), val);
- } else if (subType.compare("string") == 0) {
- std::ostringstream oss;
- uint16_t strlen = htobe16((uint16_t)testValueStr.size());
- oss.write((char*)&strlen, sizeof(strlen));
- oss << testValueStr;
- std::string os = oss.str();
- bin.assign(os.begin(), os.end());
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- msg.body(bin);
- msg.inferred(true);
- msg.content_type(proton::symbol("application/octet-stream"));
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_BYTESMESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber) {
- std::ostringstream oss;
- oss << subType << std::setw(3) << std::setfill('0') << valueNumber;
- std::string mapKey(oss.str());
- std::map<std::string, proton::value> m;
- if (subType.compare("boolean") == 0) {
- if (testValueStr.compare("False") == 0) m[mapKey] = false;
- else if (testValueStr.compare("True") == 0) m[mapKey] = true;
- else throw InvalidTestValueError(subType, testValueStr);
- } else if (subType.compare("byte") == 0) {
- m[mapKey] = int8_t(getIntegralValue<int8_t>(testValueStr));
- } else if (subType.compare("bytes") == 0) {
- m[mapKey] = proton::binary(testValueStr);
- } else if (subType.compare("char") == 0) {
- wchar_t val;
- if (testValueStr[0] == '\\') { // Format: '\xNN'
- val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2));
- } else { // Format: 'c'
- val = testValueStr[0];
- }
- m[mapKey] = val;
- } else if (subType.compare("double") == 0) {
- m[mapKey] = getFloatValue<double, uint64_t>(testValueStr);
- } else if (subType.compare("float") == 0) {
- m[mapKey] = getFloatValue<float, uint32_t>(testValueStr);
- } else if (subType.compare("int") == 0) {
- m[mapKey] = getIntegralValue<int32_t>(testValueStr);
- } else if (subType.compare("long") == 0) {
- m[mapKey] = getIntegralValue<int64_t>(testValueStr);
- } else if (subType.compare("short") == 0) {
- m[mapKey] = getIntegralValue<int16_t>(testValueStr);
- } else if (subType.compare("string") == 0) {
- m[mapKey] = testValueStr;
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- msg.inferred(false);
- msg.body(m);
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MAPMESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue) {
- msg.body(getJavaObjectBinary(subType, testValue.asString()));
- msg.inferred(true);
- msg.content_type(proton::symbol("application/x-java-serialized-object"));
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_OBJECTMESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
- std::vector<proton::value> l;
- if (subType.compare("boolean") == 0) {
- if (testValueStr.compare("False") == 0) l.push_back(false);
- else if (testValueStr.compare("True") == 0) l.push_back(true);
- else throw InvalidTestValueError(subType, testValueStr);
- } else if (subType.compare("byte") == 0) {
- l.push_back(int8_t(getIntegralValue<int8_t>(testValueStr)));
- } else if (subType.compare("bytes") == 0) {
- l.push_back(proton::binary(testValueStr));
- } else if (subType.compare("char") == 0) {
- wchar_t val;
- if (testValueStr[0] == '\\') { // Format: '\xNN'
- val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2));
- } else { // Format: 'c'
- val = testValueStr[0];
- }
- l.push_back(val);
- } else if (subType.compare("double") == 0) {
- l.push_back(getFloatValue<double, uint64_t>(testValueStr));
- } else if (subType.compare("float") == 0) {
- l.push_back(getFloatValue<float, uint32_t>(testValueStr));
- } else if (subType.compare("int") == 0) {
- l.push_back(getIntegralValue<int32_t>(testValueStr));
- } else if (subType.compare("long") == 0) {
- l.push_back(getIntegralValue<int64_t>(testValueStr));
- } else if (subType.compare("short") == 0) {
- l.push_back(getIntegralValue<int16_t>(testValueStr));
- } else if (subType.compare("string") == 0) {
- l.push_back(testValueStr);
- } else {
- throw qpidit::UnknownJmsMessageSubTypeError(subType);
- }
- msg.body(l);
- msg.inferred(true);
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_STREAMMESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::setTextMessage(proton::message& msg, const Json::Value& testValue) {
- msg.body(testValue.asString());
- msg.inferred(false);
- msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_TEXTMESSAGE_TYPE"]);
- return msg;
- }
-
- proton::message& JmsSender::addMessageHeaders(proton::message& msg) {
- Json::Value::Members headerNames = _testHeadersMap.getMemberNames();
- for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) {
- const Json::Value _subMap = _testHeadersMap[*i];
- const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map
- std::string val = _subMap[headerValueType].asString();
- if (i->compare("JMS_TYPE_HEADER") == 0) {
- setJmsTypeHeader(msg, val);
- } else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) {
- if (headerValueType.compare("bytes") == 0) {
- setJmsCorrelationId(msg, proton::binary(val));
- } else {
- setJmsCorrelationId(msg, val);
- }
- } else if (i->compare("JMS_REPLYTO_HEADER") == 0) {
- setJmsReplyTo(msg, headerValueType, val);
- } else {
- throw qpidit::UnknownJmsHeaderTypeError(*i);
- }
- }
- return msg;
- }
-
- //static
- proton::message& JmsSender::setJmsTypeHeader(proton::message& msg, const std::string& t) {
- msg.subject(t);
- return msg;
- }
-
- //static
- proton::message& JmsSender::setJmsCorrelationId(proton::message& msg, const std::string& cid) {
- proton::message_id mid(cid);
- msg.correlation_id(mid);
- msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true);
- return msg;
- }
-
- //static
- proton::message& JmsSender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) {
- proton::message_id mid(cid);
- msg.correlation_id(cid);
- msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true);
- return msg;
- }
-
- //static
- proton::message& JmsSender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) {
- if (dts.compare("queue") == 0) {
- msg.reply_to(/*std::string("queue://") + */d);
- msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_QUEUE));
- } else if (dts.compare("temp_queue") == 0) {
- msg.reply_to(/*std::string("queue://") + */d);
- msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TMEP_QUEUE));
- } else if (dts.compare("topic") == 0) {
- msg.reply_to(/*std::string("topic://") + */d);
- msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TOPIC));
- } else if (dts.compare("temp_topic") == 0) {
- msg.reply_to(/*std::string("topic://") + */d);
- msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TEMP_TOPIC));
- } else {
- throw qpidit::UnknownJmsDestinationTypeError(dts);
- }
- return msg;
- }
-
- proton::message& JmsSender::addMessageProperties(proton::message& msg) {
- Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames();
- for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) {
- const Json::Value _subMap = _testPropertiesMap[*i];
- const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map
- std::string val = _subMap[propertyValueType].asString();
- if (propertyValueType.compare("boolean") == 0) {
- if (val.compare("False") == 0) setMessageProperty(msg, *i, false);
- else if (val.compare("True") == 0) setMessageProperty(msg, *i, true);
- else throw InvalidTestValueError(propertyValueType, val);
- } else if (propertyValueType.compare("byte") == 0) {
- setMessageProperty(msg, *i, getIntegralValue<int8_t>(val));
- } else if (propertyValueType.compare("double") == 0) {
- setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val));
- } else if (propertyValueType.compare("float") == 0) {
- setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val));
- } else if (propertyValueType.compare("int") == 0) {
- setMessageProperty(msg, *i, getIntegralValue<int32_t>(val));
- } else if (propertyValueType.compare("long") == 0) {
- setMessageProperty(msg, *i, getIntegralValue<int64_t>(val));
- } else if (propertyValueType.compare("short") == 0) {
- setMessageProperty(msg, *i, getIntegralValue<int16_t>(val));
- } else if (propertyValueType.compare("string") == 0) {
- setMessageProperty(msg, *i, val);
- } else {
- throw qpidit::UnknownJmsPropertyTypeError(propertyValueType);
- }
- }
- return msg;
- }
-
- //static
- proton::binary JmsSender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) {
- proton::binary javaObjectBinary;
- char buf[1024];
- int bytesRead;
- FILE* fp = ::popen("java -cp target/JavaObjUtils.jar org.apache.qpid.interop_test.obj_util.JavaObjToBytes javaClassStr", "rb");
- if (fp == NULL) { throw qpidit::PopenError(errno); }
- do {
- bytesRead = ::fread(buf, 1, sizeof(buf), fp);
- javaObjectBinary.insert(javaObjectBinary.end(), &buf[0], &buf[bytesRead-1]);
- } while (bytesRead == sizeof(buf));
- int status = ::pclose(fp);
- if (status == -1) {
- throw qpidit::PcloseError(errno);
- }
- return javaObjectBinary;
- }
-
- // static
- uint32_t JmsSender::getTotalNumMessages(const Json::Value& testValueMap) {
- uint32_t tot = 0;
- for (Json::Value::const_iterator i = testValueMap.begin(); i != testValueMap.end(); ++i) {
- tot += (*i).size();
- }
- return tot;
- }
-
- //static
- std::map<std::string, int8_t> JmsSender::initializeJmsMessageTypeAnnotationMap() {
- std::map<std::string, int8_t> m;
- m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE;
- m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE;
- m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE;
- m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE;
- m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE;
- m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE;
- return m;
- }
-
- } /* namespace shim */
-} /* namespace qpidit */
-
-
-
-/*
- * --- main ---
- * Args: 1: Broker address (ip-addr:port)
- * 2: Queue name
- * 3: AMQP type
- * 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap]
- */
-
-int main(int argc, char** argv) {
-/*
- for (int i=0; i<argc; ++i) {
- std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl;
- }
-*/
- // TODO: improve arg management a little...
- if (argc != 5) {
- throw qpidit::ArgumentError("Incorrect number of arguments");
- }
-
- std::ostringstream oss;
- oss << argv[1] << "/" << argv[2];
-
- try {
- Json::Value testParams;
- Json::Reader jsonReader;
- if (not jsonReader.parse(argv[4], testParams, false)) {
- throw qpidit::JsonParserError(jsonReader);
- }
-
- qpidit::shim::JmsSender sender(oss.str(), argv[3], testParams);
- proton::default_container(sender).run();
- } catch (const std::exception& e) {
- std::cout << "JmsSender error: " << e.what() << std::endl;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp
deleted file mode 100644
index ed3d57d..0000000
--- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#ifndef SRC_QPIDIT_SHIM_JMSSENDER_HPP_
-#define SRC_QPIDIT_SHIM_JMSSENDER_HPP_
-
-#include "json/value.h"
-#include "proton/message.hpp"
-#include "proton/messaging_handler.hpp"
-#include "qpidit/QpidItErrors.hpp"
-#include "qpidit/shim/JmsDefinitions.hpp"
-#include <typeinfo>
-
-namespace proton {
- class message;
-}
-
-namespace qpidit
-{
- namespace shim
- {
-
- class JmsSender : public proton::messaging_handler
- {
- protected:
- static proton::symbol s_jmsMessageTypeAnnotationKey;
- static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues;
-
- const std::string _brokerUrl;
- const std::string _jmsMessageType;
- const Json::Value _testValueMap;
- const Json::Value _testHeadersMap;
- const Json::Value _testPropertiesMap;
- uint32_t _msgsSent;
- uint32_t _msgsConfirmed;
- uint32_t _totalMsgs;
- public:
- JmsSender(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testParams);
- virtual ~JmsSender();
-
- void on_container_start(proton::container &c);
- void on_sendable(proton::sender &s);
- void on_tracker_accept(proton::tracker &t);
- void on_transport_close(proton::transport &t);
-
- void on_connection_error(proton::connection &c);
- void on_session_error(proton::session &s);
- void on_sender_error(proton::sender& s);
- void on_transport_error(proton::transport &t);
- void on_error(const proton::error_condition &c);
- protected:
- void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap);
- proton::message& setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr);
- proton::message& setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr);
- proton::message& setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber);
- proton::message& setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue);
- proton::message& setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValue);
- proton::message& setTextMessage(proton::message& msg, const Json::Value& testValue);
-
- proton::message& addMessageHeaders(proton::message& msg);
- static proton::message& setJmsTypeHeader(proton::message& msg, const std::string& t);
- static proton::message& setJmsCorrelationId(proton::message& msg, const std::string& cid);
- static proton::message& setJmsCorrelationId(proton::message& msg, const proton::binary cid);
- static proton::message& setJmsReplyTo(proton::message& msg, const std::string& dt, const std::string& d);
-
- proton::message& addMessageProperties(proton::message& msg);
- template<typename T> proton::message& setMessageProperty(proton::message& msg, const std::string& propertyName, T val) {
- msg.properties().put(propertyName, val);
- return msg;
- }
-
- static proton::binary getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString);
- static uint32_t getTotalNumMessages(const Json::Value& testValueMap);
-
- static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap();
-
- template<typename T> static T numToBinary(T n, proton::binary& b) {
- for (int i=0; i<sizeof(n); ++i) {
- b.push_back(* ((char*)&n + i));
- }
- }
-
- // Set message body to floating type T through integral type U
- // Used to convert a hex string representation of a float or double to a float or double
- template<typename T, typename U> T getFloatValue(const std::string& testValueStr) {
- try {
- U ival(std::strtoul(testValueStr.data(), NULL, 16));
- return T(*reinterpret_cast<T*>(&ival));
- } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); }
- }
-
- template<typename T> T getIntegralValue(const std::string& testValueStr) {
- try {
- return T(std::strtol(testValueStr.data(), NULL, 16));
- } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); }
- }
- };
-
- } /* namespace shim */
-} /* namespace qpidit */
-
-#endif /* SRC_QPIDIT_SHIM_JMSSENDER_HPP_ */
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/JmsReceiverShim.py
----------------------------------------------------------------------
diff --git a/shims/qpid-proton-python/src/JmsReceiverShim.py b/shims/qpid-proton-python/src/JmsReceiverShim.py
deleted file mode 100755
index 9140db1..0000000
--- a/shims/qpid-proton-python/src/JmsReceiverShim.py
+++ /dev/null
@@ -1,358 +0,0 @@
-#!/usr/bin/env python
-
-"""
-JMS receiver shim for qpid-interop-test
-"""
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import sys
-from interop_test_errors import InteropTestError
-from json import dumps, loads
-from proton import byte, symbol
-from proton.handlers import MessagingHandler
-from proton.reactor import Container
-from struct import pack, unpack
-from subprocess import check_output
-from traceback import format_exc
-
-# These values must tie in with the Qpid-JMS client values found in
-# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport
-QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type')
-
-class JmsReceiverShim(MessagingHandler):
- """
- Receiver shim: This shim receives JMS messages sent by the Sender shim and prints the contents of the received
- messages onto the terminal in JSON format for retrieval by the test harness. The JMS messages type and, where
- applicable, body values, as well as the combinations of JMS headers and properties which may be attached to
- the message are received on the command-line in JSON format when this program is launched.
- """
- def __init__(self, url, jms_msg_type, test_parameters_list):
- super(JmsReceiverShim, self).__init__()
- self.url = url
- self.jms_msg_type = jms_msg_type
- self.expteced_msg_map = test_parameters_list[0]
- self.flag_map = test_parameters_list[1]
- self.subtype_itr = iter(sorted(self.expteced_msg_map.keys()))
- self.expected = self._get_tot_num_messages()
- self.received = 0
- self.received_value_map = {}
- self.current_subtype = None
- self.current_subtype_msg_list = None
- self.jms_header_map = {}
- self.jms_property_map = {}
-
- def get_received_value_map(self):
- """"Return the collected message values received"""
- return self.received_value_map
-
- def get_jms_header_map(self):
- """Return the collected message headers received"""
- return self.jms_header_map
-
- def get_jms_property_map(self):
- """Return the collected message properties received"""
- return self.jms_property_map
-
- def on_start(self, event):
- """Event callback for when the client starts"""
- event.container.create_receiver(self.url)
-
- def on_message(self, event):
- """Event callback when a message is received by the client"""
- if event.message.id and event.message.id < self.received:
- return # ignore duplicate message
- if self.expected == 0 or self.received < self.expected:
- if self.current_subtype is None:
- self.current_subtype = self.subtype_itr.next()
- self.current_subtype_msg_list = []
- self.current_subtype_msg_list.append(self._handle_message(event.message))
- self._process_jms_headers(event.message)
- self._process_jms_properties(event.message)
- if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]:
- self.received_value_map[self.current_subtype] = self.current_subtype_msg_list
- self.current_subtype = None
- self.current_subtype_msg_list = []
- self.received += 1
- if self.received == self.expected:
- event.receiver.close()
- event.connection.close()
-
- def on_connection_error(self, event):
- print 'JmsReceiverShim.on_connection_error'
-
- def on_session_error(self, event):
- print 'JmsReceiverShim.on_session_error'
-
- def on_link_error(self, event):
- print 'JmsReceiverShim.on_link_error'
-
- def _handle_message(self, message):
- """Handles the analysis of a received message"""
- if self.jms_msg_type == 'JMS_MESSAGE_TYPE':
- return self._receive_jms_message(message)
- if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE':
- return self._receive_jms_bytesmessage(message)
- if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE':
- return self._recieve_jms_mapmessage(message)
- if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE':
- return self._recieve_jms_objectmessage(message)
- if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE':
- return self._receive_jms_streammessage(message)
- if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE':
- return self._receive_jms_textmessage(message)
- print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type
- return None
-
- def _get_tot_num_messages(self):
- """"Counts up the total number of messages which should be received from the expected message map"""
- total = 0
- for key in self.expteced_msg_map:
- total += int(self.expteced_msg_map[key])
- return total
-
- def _receive_jms_message(self, message):
- """"Receives a JMS message (without a body)"""
- assert self.jms_msg_type == 'JMS_MESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(0)
- if message.body is not None:
- raise InteropTestError('_receive_jms_message: Invalid body for type JMS_MESSAGE_TYPE: %s' %
- str(message.body))
- return None
-
- def _receive_jms_bytesmessage(self, message):
- """"Receives a JMS bytes message"""
- assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3)
- if self.current_subtype == 'boolean':
- if message.body == b'\x00':
- return 'False'
- if message.body == b'\x01':
- return 'True'
- raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' %
- str(message.body))
- if self.current_subtype == 'byte':
- return hex(unpack('b', message.body)[0])
- if self.current_subtype == 'bytes':
- return str(message.body)
- if self.current_subtype == 'char':
- if len(message.body) == 2: # format 'a' or '\xNN'
- return str(message.body[1]) # strip leading '\x00' char
- raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body))
- if self.current_subtype == 'double':
- return '0x%016x' % unpack('!Q', message.body)[0]
- if self.current_subtype == 'float':
- return '0x%08x' % unpack('!L', message.body)[0]
- if self.current_subtype == 'int':
- return hex(unpack('!i', message.body)[0])
- if self.current_subtype == 'long':
- return hex(unpack('!q', message.body)[0])
- if self.current_subtype == 'short':
- return hex(unpack('!h', message.body)[0])
- if self.current_subtype == 'string':
- # NOTE: first 2 bytes are string length, must be present
- if len(message.body) >= 2:
- str_len = unpack('!H', message.body[:2])[0]
- str_body = str(message.body[2:])
- if len(str_body) != str_len:
- raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' %
- (str_len, str_body, len(str_body)))
- return str_body
- else:
- raise InteropTestError('Malformed string binary: len(\'%s\')=%d' %
- (repr(message.body), len(message.body)))
- raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
- (self.jms_msg_type, self.current_subtype))
-
- def _recieve_jms_mapmessage(self, message):
- """"Receives a JMS map message"""
- assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2)
- key, value = message.body.items()[0]
- assert key[:-3] == self.current_subtype
- if self.current_subtype == 'boolean':
- return str(value)
- if self.current_subtype == 'byte':
- return hex(value)
- if self.current_subtype == 'bytes':
- return str(value)
- if self.current_subtype == 'char':
- return str(value)
- if self.current_subtype == 'double':
- return '0x%016x' % unpack('!Q', pack('!d', value))[0]
- if self.current_subtype == 'float':
- return '0x%08x' % unpack('!L', pack('!f', value))[0]
- if self.current_subtype == 'int':
- return hex(value)
- if self.current_subtype == 'long':
- return hex(int(value))
- if self.current_subtype == 'short':
- return hex(value)
- if self.current_subtype == 'string':
- return str(value)
- raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' %
- (self.jms_msg_type, self.current_subtype))
-
- def _recieve_jms_objectmessage(self, message):
- """"Receives a JMS Object message"""
- assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1)
- return self._get_java_obj(message.body)
-
- def _get_java_obj(self, java_obj_bytes):
- """
- Take bytes from serialized Java object and construct a Java object, then return its toString() value. The
- work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java
- utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar.
- java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...')
- returns: string containing Java class value as returned by the toString() method
- """
- java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip()
- out_str = check_output(['java',
- '-cp',
- 'target/JavaObjUtils.jar',
- 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj',
- java_obj_bytes_str])
- out_str_list = out_str.split('\n')[:-1] # remove trailing \n
- if len(out_str_list) > 1:
- raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str)
- colon_index = out_str_list[0].index(':')
- if colon_index < 0:
- raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str)
- java_class_name = out_str_list[0][:colon_index]
- java_class_value_str = out_str_list[0][colon_index+1:]
- if java_class_name != self.current_subtype:
- raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' %
- (self.current_subtype, java_class_name))
- return java_class_value_str
-
- def _receive_jms_streammessage(self, message):
- """Receives a JMS stream message"""
- assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4)
- # Every message is a list with one item [value]
- assert len(message.body) == 1
- value = message.body[0]
- if self.current_subtype == 'boolean':
- return str(value)
- if self.current_subtype == 'byte':
- return hex(value)
- if self.current_subtype == 'bytes':
- return str(value)
- if self.current_subtype == 'char':
- return str(value)
- if self.current_subtype == 'double':
- return '0x%016x' % unpack('!Q', pack('!d', value))[0]
- if self.current_subtype == 'float':
- return '0x%08x' % unpack('!L', pack('!f', value))[0]
- if self.current_subtype == 'int':
- return hex(value)
- if self.current_subtype == 'long':
- return hex(int(value))
- if self.current_subtype == 'short':
- return hex(value)
- if self.current_subtype == 'string':
- return str(value)
- raise InteropTestError('JmsRecieverShim._receive_jms_streammessage(): ' +
- 'JMS message type %s: Unknown or unsupported subtype \'%s\'' %
- (self.jms_msg_type, self.current_subtype))
-
- def _receive_jms_textmessage(self, message):
- """"Receives a JMS text message"""
- assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE'
- assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5)
- return message.body
-
- def _process_jms_headers(self, message):
- """"Checks the supplied message for three JMS headers: message type, correlation-id and reply-to"""
- # JMS message type header
- message_type_header = message._get_subject()
- if message_type_header is not None:
- self.jms_header_map['JMS_TYPE_HEADER'] = {'string': message_type_header}
-
- # JMS correlation ID
- correlation_id = message._get_correlation_id()
- if correlation_id is not None:
- if 'JMS_CORRELATIONID_AS_BYTES' in self.flag_map and self.flag_map['JMS_CORRELATIONID_AS_BYTES']:
- self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'bytes': correlation_id}
- else:
- self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'string': correlation_id}
-
- # JMS reply-to
- reply_to = message._get_reply_to()
- if reply_to is not None:
- if 'JMS_REPLYTO_AS_TOPIC' in self.flag_map and self.flag_map['JMS_REPLYTO_AS_TOPIC']:
- # Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present
- if len(reply_to) > 8 and reply_to[0:8] == 'topic://':
- reply_to = reply_to[8:]
- self.jms_header_map['JMS_REPLYTO_HEADER'] = {'topic': reply_to}
- else:
- if len(reply_to) > 8 and reply_to[0:8] == 'queue://':
- reply_to = reply_to[8:]
- self.jms_header_map['JMS_REPLYTO_HEADER'] = {'queue': reply_to}
-
- def _process_jms_properties(self, message):
- """"Checks the supplied message for JMS message properties and decodes them"""
- if message.properties is not None:
- for jms_property_name in message.properties:
- underscore_index = jms_property_name.find('_')
- if underscore_index >= 0: # Ignore any other properties without '_'
- jms_property_type = jms_property_name[0:underscore_index]
- value = message.properties[jms_property_name]
- if jms_property_type == 'boolean':
- self.jms_property_map[jms_property_name] = {'boolean': str(value)}
- elif jms_property_type == 'byte':
- self.jms_property_map[jms_property_name] = {'byte': hex(value)}
- elif jms_property_type == 'double':
- self.jms_property_map[jms_property_name] = {'double': '0x%016x' %
- unpack('!Q', pack('!d', value))[0]}
- elif jms_property_type == 'float':
- self.jms_property_map[jms_property_name] = {'float': '0x%08x' %
- unpack('!L', pack('!f', value))[0]}
- elif jms_property_type == 'int':
- self.jms_property_map[jms_property_name] = {'int': hex(value)}
- elif jms_property_type == 'long':
- self.jms_property_map[jms_property_name] = {'long': hex(int(value))}
- elif jms_property_type == 'short':
- self.jms_property_map[jms_property_name] = {'short': hex(value)}
- elif jms_property_type == 'string':
- self.jms_property_map[jms_property_name] = {'string': str(value)}
- else:
- pass # Ignore any other properties, brokers can add them and we don't know what they may be
-
-
-# --- main ---
-# Args: 1: Broker address (ip-addr:port)
-# 2: Queue name
-# 3: JMS message type
-# 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap]
-#print '#### sys.argv=%s' % sys.argv
-try:
- RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4]))
- Container(RECEIVER).run()
- print sys.argv[3]
- print dumps(RECEIVER.get_received_value_map())
- print dumps(RECEIVER.get_jms_header_map())
- print dumps(RECEIVER.get_jms_property_map())
-except KeyboardInterrupt:
- pass
-except Exception as exc:
- print 'jms-receiver-shim EXCEPTION:', exc
- print format_exc()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org