You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/19 16:59:26 UTC
svn commit: r1411265 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/
Author: tabish
Date: Mon Nov 19 15:59:25 2012
New Revision: 1411265
URL: http://svn.apache.org/viewvc?rev=1411265&view=rev
Log:
https://issues.apache.org/jira/browse/AMQCPP-405
Add stress test example using the CMSTemplate code
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h (with props)
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include <string>
+#include <cstdio>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Integer.h>
+#include <activemq/library/ActiveMQCPP.h>
+
+#include "TestSenderAndReceiver.h"
+#include "ConnectionFactoryMgr.h"
+
+using namespace std;
+using namespace cms;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace activemq::library;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+int main(int argc, char** argv) {
+
+ printf("Test Started\n");
+ int cnt = 25;
+ int done = 3600;
+
+ if (argc > 1) {
+ cnt = Integer::parseInt(argv[1]);
+ }
+
+ if (argc > 2) {
+ done = Integer::parseInt(argv[2]);
+ }
+
+ string url = "tcp://127.0.0.1:61616?connection.sendTimeout=1000";
+
+ ActiveMQCPP::initializeLibrary();
+ ConnectionFactoryMgr::Initialize();
+ int maxThreads = 30;
+ int reservedThreads = 3;
+ Receiver::Initialize(reservedThreads, maxThreads);
+
+ TestSenderAndReceiver **sar = new TestSenderAndReceiver *[cnt];
+
+ for (int i = 0; i < cnt; i++) {
+ string topic("topic");
+ stringstream str;
+ str << i;
+ topic += str.str();
+ sar[i] = new TestSenderAndReceiver(url, topic.c_str(), true, false, 50, 1000);
+ sar[i]->init();
+ }
+
+ Thread::sleep(done * 1000);
+
+ for (int i = 0; i < cnt; i++) {
+ sar[i]->close();
+ delete sar[i];
+ }
+ delete sar;
+
+ printf("\nTest Completed!\n");
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CMSTestMultipleSendersReceivers.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CmsMessageCreator.h"
+
+using namespace std;
+using namespace cms;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace activemq::core;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::CmsMessageCreator(const std::string& txt) {
+ m_txt = txt;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::~CmsMessageCreator() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* CmsMessageCreator::createMessage(cms::Session* session) {
+ cms::Message* message = NULL;
+ if (session) {
+ message = session->createTextMessage(m_txt);
+ }
+ return message;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_CMSMESSAGECREATOR_H_
+#define _CMSTEMPLATE_CMSMESSAGECREATOR_H_
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/library/ActiveMQCPP.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/ExceptionListener.h>
+#include <cms/MessageListener.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+namespace cmstemplate {
+
+ class CmsMessageCreator : public activemq::cmsutil::MessageCreator {
+ private:
+
+ std::string m_txt;
+
+ public:
+
+ CmsMessageCreator(const std::string& txt);
+
+ virtual ~CmsMessageCreator();
+ virtual cms::Message* createMessage(cms::Session* session);
+
+ };
+}
+
+#endif /** _CMSTEMPLATE_CMSMESSAGECREATOR_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageCreator.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_CMSMESSAGEHANDLERDEFINITIONS_H_
+#define _CMSTEMPLATE_CMSMESSAGEHANDLERDEFINITIONS_H_
+
+#include <decaf/util/Config.h>
+
+namespace cmstemplate {
+
+ enum ErrorCode { // NOTE: When added an entry to ErrorCode you must add the corresponding error string to ErrorDescription
+ CMS_SUCCESS = 0,
+ CMS_ERROR_UNABLE_TO_PARSE_XML,
+ CMS_ERROR_MESSAGE_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_HEADER_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_CLIENT_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_DESTINATION_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_INVALID_CLIENT,
+ CMS_ERROR_INVALID_DESTINATION,
+ CMS_ERROR_INVALID_MESSAGE,
+ CMS_ERROR_INVALID_HEADERS,
+ CMS_ERROR_INVALID_MESSAGELISTENER,
+ CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY,
+ CMS_ERROR_RECEIVER_TIMEDOUT,
+ CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_SENDING_MESSAGES,
+ CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_RECEIVING_MESSAGES,
+ CMS_ERROR_CAUGHT_CMS_EXCEPTION,
+ CMS_ERROR_CAUGHT_TBGENOBJ_ERROR,
+ CMS_ERROR_MESSAGE_BROKER_ERROR,
+ CMS_ERROR_BROKER_MONITOR_NOT_FOUND,
+ CMS_ERROR_BROKER_MONITORING_NOT_TURNED_ON,
+ CMS_ERROR_INVALID_BROKERSTATUSLISTENER,
+ CMS_ERROR_A_BROKERSTATUSLISTENER_HAS_BEEN_REGISTERED_ALREADY,
+ CMS_ERROR_CAUGHT_EXCEPTION_IN_INIT,
+ CMS_ERROR_CAUGHT_EXCEPTION_IN_UNINIT,
+ CMS_LAST // Put all error enums BEFORE this one. This one must be listed last.
+ };
+
+ const char ErrorDescription[][100] = {
+ "Success",
+ "Unable to parse xml",
+ "Message has been defined already",
+ "Header has been defined already",
+ "Client has been defined already",
+ "Destination has been defined already",
+ "Invalid client",
+ "Invalid destination",
+ "Invalid message",
+ "Invalid headers",
+ "Invalid messagelistener",
+ "A messagelistener has been registered already with the destination",
+ "Receiver timed out",
+ "Destination not configured for sending messages",
+ "Destination not configured for receiving messages",
+ "Caught CMS exception",
+ "Caught TBGenObj error",
+ "Message broker appears to be offline",
+ "Can not find broker monitor",
+ "The broker monitoring functinaliy has not been turned on",
+ "Invalid brokerStatuslistener",
+ "This brokerStatuslistener has been registered already with the broker",
+ "Caught an exception when initializing CmsMessageHandler",
+ "Caught an exception when uninitializing CmsMessageHandler",
+ "CMS_LAST - ErrorCodeToString macro index out of range",
+ };
+
+ #define ErrorCodeToString(i) (((i >= CMS_SUCCESS) && (i <= CMS_LAST)) ? ErrorDescription[i] : ErrorDescription[CMS_LAST])
+ #define IsError(i) (i != CMS_SUCCESS)
+}
+//typedef void (CALLBACK* MessageListener)( IDispatch* message, VARIANT* headerNames, VARIANT* headerValues);
+//
+//typedef void (CALLBACK* BrokerStatusListener)( BSTR brokerIP, BSTR brokerPort, VARIANT_BOOL isBrokerGood, LONGLONG timeStamp);
+
+#endif /** _CMSTEMPLATE_CMSMESSAGEHANDLERDEFINITIONS_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/CmsMessageHandlerDefinitions.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionFactoryMgr.h"
+#include <activemq/core/ActiveMQConnectionFactory.h>
+
+using namespace std;
+using namespace cms;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace activemq::core;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+StlMap<std::string, ConnectionFactory*> * ConnectionFactoryMgr::m_connectionFactories;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::ConnectionFactoryMgr() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::~ConnectionFactoryMgr() {
+ try {
+ UnInitialize();
+ } catch(...) {
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::Initialize() {
+ m_connectionFactories = new StlMap<std::string, ConnectionFactory*>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::UnInitialize() {
+ m_connectionFactories->lock();
+
+ Pointer<Iterator<ConnectionFactory*> > iter(m_connectionFactories->values().iterator());
+ while (iter->hasNext()) {
+ ConnectionFactory* connectionFactory = iter->next();
+ if (connectionFactory != NULL) {
+ delete connectionFactory;
+ connectionFactory = NULL;
+ }
+ }
+ m_connectionFactories->clear();
+
+ m_connectionFactories->unlock();
+
+ delete m_connectionFactories;
+ m_connectionFactories = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactory* ConnectionFactoryMgr::GetConnectionFactory(const std::string& url) {
+ ConnectionFactory* connectionFactory = NULL;
+
+ m_connectionFactories->lock();
+ try {
+ connectionFactory = m_connectionFactories->get(url);
+ } catch (NoSuchElementException& ex) {
+ }
+
+ if (!connectionFactory) {
+ connectionFactory = new ActiveMQConnectionFactory(url);
+ m_connectionFactories->put(url, connectionFactory);
+ }
+ m_connectionFactories->unlock();
+
+ return connectionFactory;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_CONNECTIONFACTORYMGR_H_
+#define _CMSTEMPLATE_CONNECTIONFACTORYMGR_H_
+
+#include <decaf/util/StlMap.h>
+#include <cms/ConnectionFactory.h>
+
+namespace cmstemplate {
+
+ class ConnectionFactoryMgr {
+ private:
+
+ static decaf::util::StlMap <std::string, cms::ConnectionFactory *> *m_connectionFactories;
+
+ ConnectionFactoryMgr();
+
+ virtual ~ConnectionFactoryMgr();
+
+ public:
+
+ static void Initialize();
+ static void UnInitialize();
+ static cms::ConnectionFactory* GetConnectionFactory(const std::string& url);
+
+ };
+}
+
+#endif /** _CMSTEMPLATE_CONNECTIONFACTORYMGR_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/ConnectionFactoryMgr.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessagingTask.h"
+
+using namespace std;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::MessagingTask(Receiver* receiver, const string& message) {
+ m_receiver = receiver;
+ m_message = string(message);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::~MessagingTask() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::run() {
+ try {
+ if (m_receiver != NULL) {
+ m_receiver->ExecuteMessagingTask(m_message);
+ }
+ } catch (...) {
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_MESSAGINGTASK_H_
+#define _CMSTEMPLATE_MESSAGINGTASK_H_
+
+#include "Receiver.h"
+#include <decaf/lang/Runnable.h>
+
+namespace cmstemplate {
+
+ class MessagingTask : public decaf::lang::Runnable {
+ private:
+
+ Receiver* m_receiver;
+ std::string m_message;
+
+ public:
+
+ MessagingTask(Receiver* m_receiver, const std::string& message);
+
+ virtual ~MessagingTask();
+
+ virtual void run();
+
+ };
+}
+
+#endif /** _CMSTEMPLATE_MESSAGINGTASK_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/MessagingTask.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Receiver.h"
+#include "MessagingTask.h"
+#include "decaf/lang/System.h"
+#include "ConnectionFactoryMgr.h"
+
+#include <cms/Message.h>
+#include <decaf/lang/Exception.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+
+using namespace std;
+using namespace cms;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace activemq::cmsutil;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor* Receiver::m_threadPoolExecutor=NULL;
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::Receiver(const string & url, const string & queueOrTopicName,
+ bool isTopic, long long receiveTimeout, bool useThreadPool) : m_ready(1) {
+
+ m_url = url;
+ m_messageListener = NULL;
+
+ m_bUseThreadPool = useThreadPool;
+ m_receiveTimeout = receiveTimeout;
+ m_asyncReceiverThread = NULL;
+ ConnectionFactory* connectionFactory = ConnectionFactoryMgr::GetConnectionFactory(m_url);
+ m_cmsTemplateCreateTime = System::currentTimeMillis();
+ m_cmsTemplate = new CmsTemplate(connectionFactory);
+ m_cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+ m_cmsTemplate->setPubSubDomain(isTopic);
+ m_cmsTemplate->setReceiveTimeout(receiveTimeout);
+
+ m_numOfMessagingTasks = 0;
+
+ m_isClosing = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::~Receiver() {
+
+ try {
+ m_isClosing = true;
+
+ //delete m_cmsTemplate
+ m_mutexForCmsTemplate.lock();
+ if (m_cmsTemplate) {
+ delete m_cmsTemplate;
+ m_cmsTemplate = NULL;
+ }
+ m_mutexForCmsTemplate.unlock();
+
+ //wait until all outstanding messaging tasks are done
+ while (true) {
+ long numOfMessagingTasks = GetNumOfMessagingTasks();
+ if (numOfMessagingTasks <= 0) {
+ break;
+ }
+ Thread::sleep(1000);
+ }
+ } catch (...) {
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::Initialize(int reservedThreads, int maxThreads) {
+ m_threadPoolExecutor = new ThreadPoolExecutor(reservedThreads, maxThreads, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+ m_threadPoolExecutor->prestartCoreThread();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::UnInitialize() {
+ if (m_threadPoolExecutor != NULL) {
+ try {
+ m_threadPoolExecutor->shutdown();
+ m_threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
+
+ } catch (Exception & ie) {
+ }
+ delete m_threadPoolExecutor;
+ m_threadPoolExecutor = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::ReceiveMessage(std::string& message, ErrorCode& errorCode, bool retryOnError) {
+ long long stopRetryTime = System::currentTimeMillis() + m_receiveTimeout;
+ errorCode = CMS_SUCCESS;
+
+ if (m_receiveTimeout == 0/*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
+ retryOnError = false;
+ } else if (m_receiveTimeout == -1/*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
+ retryOnError = true;
+ }
+
+ do {
+ long long timeoutForThisLoop;
+ if (m_receiveTimeout <= 0) {
+ timeoutForThisLoop = m_receiveTimeout;
+ } else {
+ timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
+ if (timeoutForThisLoop <= 0) {
+ errorCode = CMS_ERROR_RECEIVER_TIMEDOUT;
+ break;
+ }
+ }
+
+ m_mutexForCmsTemplate.lock();
+ if (m_cmsTemplate) {
+ m_cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
+
+ cms::Message* cmsMessage = NULL;
+ try {
+ cmsMessage = m_cmsTemplate->receive();
+ } catch (cms::CMSException& ex) {
+ m_mutexForCmsTemplate.unlock();
+ errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+ break;
+ }
+
+ m_mutexForCmsTemplate.unlock();
+ if (cmsMessage == NULL) {
+ break;
+ }
+
+ if (IsMessageExpired(cmsMessage)) {
+ errorCode = CMS_ERROR_INVALID_MESSAGE;
+ delete cmsMessage;
+ continue;
+ }
+
+ wstring text;
+ cms::TextMessage* txtMessage = dynamic_cast<cms::TextMessage*>(cmsMessage);
+ if (txtMessage) {
+
+ message = txtMessage->getText();
+ }
+ delete cmsMessage;
+ } else {
+ m_mutexForCmsTemplate.unlock();
+ }
+ } while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis() < stopRetryTime);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::WaitUntilReady() {
+ m_ready.await();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::RegisterMessageListener(const RecvMessageListener messageListener, ErrorCode& errorCode) {
+ errorCode = CMS_SUCCESS;
+
+ m_mutexGeneral.lock();
+ if (messageListener == NULL) {
+ errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
+ m_mutexGeneral.unlock();
+ return;
+ }
+
+ if (m_messageListener != NULL) {
+ errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
+ m_mutexGeneral.unlock();
+ return;
+ }
+
+ m_messageListener = messageListener;
+
+ m_asyncReceiverThread = new Thread(this, "AsyncReceiver");
+ m_asyncReceiverThread->start();
+ m_mutexGeneral.unlock();
+
+ this->WaitUntilReady();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::run() {
+ m_ready.countDown();
+ while (!m_isClosing) {
+ string message = "";
+
+ ErrorCode errorCode = CMS_SUCCESS;
+
+ Receiver::ReceiveMessage(message, errorCode, false);
+ if (message != "") {
+ if (m_bUseThreadPool) {
+ QueueMessagingTask(message);
+ } else {
+ try {
+ ExecuteMessagingTask(message, false);
+ } catch (...) {
+ }
+ }
+ } else {
+ if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR) {
+ long long sleepTime = 0;
+ m_mutexForCmsTemplate.lock();
+ sleepTime = m_cmsTemplate->getReceiveTimeout();
+ m_mutexForCmsTemplate.unlock();
+ Thread::sleep(sleepTime);
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::QueueMessagingTask(const string& message) {
+ if (message != "" && (!m_isClosing)) {
+ MessagingTask* task = new MessagingTask(this, message);
+ m_threadPoolExecutor->execute(task);
+ IncreaseNumOfMessagingTasks();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::ExecuteMessagingTask(const string& message, bool bDecreaseNumOfMessagingTasks/*=true*/) {
+ if ((!m_isClosing)) {
+ m_mutexGeneral.lock();
+ RecvMessageListener copy = m_messageListener;
+ m_mutexGeneral.unlock();
+ if (copy) {
+ (*copy)(message); //listener will release the message and make reference count 0
+ }
+ }
+
+ if (bDecreaseNumOfMessagingTasks) {
+ DecreaseNumOfMessagingTasks();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Receiver::IsMessageExpired(cms::Message* message) {
+ long long expireTime = message->getCMSExpiration();
+ long long currentTime = System::currentTimeMillis();
+ if (expireTime > 0 && currentTime > expireTime) {
+ return true;
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::IncreaseNumOfMessagingTasks() {
+ m_mutexGeneral.lock();
+ m_numOfMessagingTasks++;
+ m_mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::DecreaseNumOfMessagingTasks() {
+ m_mutexGeneral.lock();
+ m_numOfMessagingTasks--;
+ m_mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long Receiver::GetNumOfMessagingTasks() {
+ long numOfMessagingTasks = 0;
+ m_mutexGeneral.lock();
+ numOfMessagingTasks = m_numOfMessagingTasks;
+ m_mutexGeneral.unlock();
+ return numOfMessagingTasks;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::Close() {
+ m_isClosing = true;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_RECEIVER_H_
+#define _CMSTEMPLATE_RECEIVER_H_
+
+#include <decaf/lang/Runnable.h>
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+#include "CmsMessageHandlerDefinitions.h"
+
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <string>
+
+namespace cmstemplate {
+
+ class Receiver : public decaf::lang::Runnable {
+ public:
+
+ typedef void DECAF_STDCALL (*RecvMessageListener)( const std::string& message);
+
+ private:
+ std::string m_url;
+ decaf::util::concurrent::Mutex m_mutexForCmsTemplate;
+ decaf::util::concurrent::Mutex m_mutexGeneral;
+ bool m_isClosing;
+
+ decaf::util::concurrent::CountDownLatch m_ready;
+
+ RecvMessageListener m_messageListener;
+
+ activemq::cmsutil::CmsTemplate* m_cmsTemplate;
+
+ decaf::lang::Thread* m_asyncReceiverThread;
+
+ long long m_receiveTimeout;
+
+ bool m_bUseThreadPool; //determines if we should use the thread pool to process async received messages or not
+
+ long long m_cmsTemplateCreateTime;
+
+ static decaf::util::concurrent::ThreadPoolExecutor* m_threadPoolExecutor;
+ long m_numOfMessagingTasks; //number of pending messaging tasks created by this receiver that has been queued in the threadpool
+
+ virtual void WaitUntilReady();
+
+ void IncreaseNumOfMessagingTasks();
+ void DecreaseNumOfMessagingTasks();
+ long GetNumOfMessagingTasks();
+
+ public:
+
+ Receiver(const std::string& url, const std::string& queueOrTopicName, bool isTopic, long long receiveTimeout = 2000, bool useThreadPool = true);
+
+ virtual ~Receiver();
+
+ static void Initialize(int reservedThreads, int maxThreads);
+ static void UnInitialize();
+
+ void Close();
+
+ virtual void run();
+
+ void RegisterMessageListener(const RecvMessageListener m_messageListener, ErrorCode& errorCode);
+
+ void ReceiveMessage(std::string& message, ErrorCode& errorCode, bool retryOnError = true);
+
+ static bool IsMessageExpired(cms::Message* message);
+
+ void QueueMessagingTask(const std::string& message);
+
+ void ExecuteMessagingTask(const std::string& message, bool bDecreaseNumOfMessagingTasks = true);
+
+ };
+}
+
+#endif /** _CMSTEMPLATE_RECEIVER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Receiver.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Sender.h"
+
+#include "CmsMessageCreator.h"
+#include "ConnectionFactoryMgr.h"
+#include <cms/ConnectionFactory.h>
+
+using namespace std;
+using namespace cms;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace activemq::cmsutil;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::Sender(const string& url, const string& queueOrTopicName, bool isTopic, bool isDeliveryPersistent, int timeToLive) {
+
+ ConnectionFactory* connectionFactory = ConnectionFactoryMgr::GetConnectionFactory(url);
+
+ m_cmsTemplate = new CmsTemplate(connectionFactory);
+
+ m_cmsTemplate->setExplicitQosEnabled(true);
+ m_cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+ m_cmsTemplate->setPubSubDomain(isTopic);
+ m_cmsTemplate->setDeliveryPersistent(isDeliveryPersistent);
+ m_cmsTemplate->setTimeToLive(timeToLive);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::~Sender() {
+ try {
+ m_cmsTemplateMutex.lock();
+ if (m_cmsTemplate) {
+ delete m_cmsTemplate;
+ m_cmsTemplate = NULL;
+ }
+ m_cmsTemplateMutex.unlock();
+ } catch (...) {
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Sender::SendMessage(string& message, ErrorCode& errorCode) {
+
+ // create a MessageCreator
+ CmsMessageCreator messageCreator(message);
+
+ // send message through a CmsTemplate
+ try {
+ m_cmsTemplateMutex.lock();
+ m_cmsTemplate->send(&messageCreator);
+ m_cmsTemplateMutex.unlock();
+ errorCode = CMS_SUCCESS;
+ } catch (cms::CMSException& ex) {
+ m_cmsTemplateMutex.unlock();
+ errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_SENDER_H_
+#define _CMSTEMPLATE_SENDER_H_
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/Runnable.h>
+
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cmstemplate {
+
+ class Sender {
+ private:
+
+ decaf::util::concurrent::Mutex m_cmsTemplateMutex;
+ activemq::cmsutil::CmsTemplate* m_cmsTemplate;
+
+ public:
+
+ Sender(const std::string& url, const std::string& queueOrTopicName,
+ bool isTopic, bool isDeliveryPersistent, int timeToLive);
+
+ virtual ~Sender();
+
+ void SendMessage(std::string& msg, ErrorCode& errorCode);
+
+ };
+}
+
+#endif /** _CMSTEMPLATE_SENDER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/Sender.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp Mon Nov 19 15:59:25 2012
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestSenderAndReceiver.h"
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/Random.h>
+#include <bitset>
+#include <stdio.h>
+
+using namespace std;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace cmstemplate;
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::onMessage(const string& message) {
+ int index = (int) message.find(";");
+ if (index <= 0) {
+ // i for invalid message
+ printf("%c", 105);
+ } else {
+ // Smiley face for good message
+ printf("%c", 2);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::TestSenderAndReceiver(const string& url, const string& queueOrTopicName, bool isTopic,
+ bool isDeliveryPersistent, int timeToLive, int receiveTimeout) :
+ m_senderThread(NULL) {
+
+ m_sender = new Sender(url, queueOrTopicName, isTopic, isDeliveryPersistent, timeToLive);
+
+ //if you do not want to use the thread pool for the receiver, do the following
+ m_receiver = new Receiver(url, queueOrTopicName, isTopic, receiveTimeout, true);
+
+ ErrorCode errorCode = CMS_SUCCESS;
+ m_receiver->RegisterMessageListener(onMessage, errorCode);
+
+ m_sendIndex = 0;
+ m_receiveIndex = 0;
+
+ m_isClosing = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::~TestSenderAndReceiver() {
+ try {
+ close();
+ } catch(...) {}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::init() {
+ m_senderThread = new Thread(this, "TestSenderAndReceiver");
+ m_senderThread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::run() {
+ ErrorCode errorReturn;
+ Random random;
+ int i, j;
+
+ j = (int) ((random.nextInt() * 50) / RAND_MAX);
+
+ while (!m_isClosing) {
+ std::stringstream str;
+ str << m_sendIndex;
+
+ string message;
+ str >> message;
+ message.append(";");
+
+ // Add variable payload
+ j = (int) ((rand() * 1024) / RAND_MAX);
+ for (i = 0; i < j; i++) {
+ message += string(1, (char) (65 + ((rand() * 26) / RAND_MAX)));
+ }
+
+ errorReturn = CMS_SUCCESS;
+ m_sender->SendMessage(message, errorReturn);
+ if (errorReturn == CMS_SUCCESS) {
+ m_sendIndex++;
+ } else {
+ // Exclamation point for error
+ printf("%c", 33);
+ }
+
+ // Sleep for up to 1 second between sends
+ Thread::sleep(((random.nextInt() * 1000) / RAND_MAX));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::close() {
+ if (!m_isClosing) {
+ m_isClosing = true;
+
+ if (m_senderThread) {
+ m_senderThread->join();
+ delete m_senderThread;
+ m_senderThread = NULL;
+ }
+
+ delete m_sender;
+ m_sender = NULL;
+
+ m_receiver->Close();
+ delete m_receiver;
+ m_receiver = NULL;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h?rev=1411265&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h Mon Nov 19 15:59:25 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMSTEMPLATE_TESTSENDERANDRECEIVER_H_
+#define _CMSTEMPLATE_TESTSENDERANDRECEIVER_H_
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+
+#include "Sender.h"
+#include "Receiver.h"
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cmstemplate {
+
+ class TestSenderAndReceiver : public decaf::lang::Runnable {
+
+ private:
+
+ Sender* m_sender;
+ Receiver* m_receiver;
+ decaf::lang::Thread* m_senderThread;
+ bool m_isClosing;
+ int m_sendIndex;
+ int m_receiveIndex;
+
+ static void DECAF_STDCALL onMessage(const std::string& message);
+
+ public:
+
+ TestSenderAndReceiver(const std::string& url, const std::string& queueOrTopicName,
+ bool isTopic, bool isDeliveryPersistent, int timeToLive, int receiveTimeout);
+
+ virtual ~TestSenderAndReceiver();
+
+ void init();
+
+ virtual void run();
+
+ void close();
+
+ void waitUntilReady();
+ };
+}
+
+#endif /** _CMSTEMPLATE_TESTSENDERANDRECEIVER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/cmstemplate-stress/TestSenderAndReceiver.h
------------------------------------------------------------------------------
svn:eol-style = native