You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/19 21:12:30 UTC
svn commit: r1447894 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/examples: ./ stress-test/
Author: tabish
Date: Tue Feb 19 20:12:30 2013
New Revision: 1447894
URL: http://svn.apache.org/r1447894
Log:
Adds another example app.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am?rev=1447894&r1=1447893&r2=1447894&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/Makefile.am Tue Feb 19 20:12:30 2013
@@ -105,3 +105,17 @@ noinst_PROGRAMS += cmstemplate_stress
cmstemplate_stress_SOURCES = $(cmstemplate_stress_sources)
cmstemplate_stress_LDADD= $(AMQ_TEST_LIBS)
cmstemplate_stress_CXXFLAGS = $(AMQ_TEST_CXXFLAGS) -I$(srcdir)/../main
+
+## CMS Template Stress Test Example
+stress_stress_sources = stress-test/TestSenderAndReceiver.cpp \
+ stress-test/Sender.cpp \
+ stress-test/Receiver.cpp \
+ stress-test/MessagingTask.cpp \
+ stress-test/ConnectionFactoryMgr.cpp \
+ stress-test/CmsStress.cpp \
+ stress-test/BrokerMonitor.cpp \
+ stress-test/CmsMessageCreator.cpp
+noinst_PROGRAMS += stress_test
+stress_test_SOURCES = $(stress_stress_sources)
+stress_test_LDADD= $(AMQ_TEST_LIBS)
+stress_test_CXXFLAGS = $(AMQ_TEST_CXXFLAGS) -I$(srcdir)/../main
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BrokerMonitor.h"
+#include "ConnectionFactoryMgr.h"
+#include "TestSenderAndReceiver.h"
+
+#include <cms/Session.h>
+#include <cms/Message.h>
+#include <cms/ConnectionFactory.h>
+
+#include <activemq/cmsutil/MessageCreator.h>
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <decaf/util/Iterator.h>
+
+#include <stdio.h>
+
+extern bool VERBOSE;
+
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::cmsutil;
+using namespace decaf::lang::exceptions;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+BrokerMonitor::BrokerMonitor(const std::string& url, int interval, CountDownLatch* quit) :
+ closing(false), brokerOk(false), url(url), interval(interval), brokerMonitorThread(), quit(quit) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BrokerMonitor::~BrokerMonitor() {
+ try {
+ close();
+ } catch (...) {
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::close() {
+ closing = true;
+ if (brokerMonitorThread) {
+ brokerMonitorThread->join();
+ delete brokerMonitorThread;
+ brokerMonitorThread = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::start() {
+ if (!brokerMonitorThread) {
+ brokerMonitorThread = new Thread(this, "Message Broker Monitor Thread");
+ brokerMonitorThread->start();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BrokerMonitor::run() {
+ ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+ CmsTemplate* cmsTemplate = createCmsTemplate(connectionFactory);
+
+ while (!closing) {
+ try {
+ cmsTemplate->send(this);
+ Message* message = cmsTemplate->receive();
+
+ if (message) {
+ delete message;
+ if (VERBOSE) {
+ printf("%c", SYM_MON_GOOD);
+ }
+ brokerOk = true;
+ } else {
+ if (VERBOSE) {
+ printf("%c", SYM_MON_BAD);
+ }
+ brokerOk = false;
+ }
+ } catch (cms::CMSException& ex) {
+ if (VERBOSE) {
+ printf("%c", SYM_MON_CMS);
+ }
+ brokerOk = false;
+ } catch (...) {
+ if (VERBOSE) {
+ printf("%c", SYM_MON_EXC);
+ }
+ brokerOk = false;
+ }
+
+ if (quit->await(interval)) {
+ closing = true;
+ }
+ }
+
+ delete cmsTemplate;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Message* BrokerMonitor::createMessage(cms::Session* session) {
+ Message* message = NULL;
+ if (session) {
+ message = session->createTextMessage("Heart Beat");
+ }
+ return message;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsTemplate* BrokerMonitor::createCmsTemplate(ConnectionFactory* connectionFactory) {
+
+ CmsTemplate* cmsTemplate = new CmsTemplate(connectionFactory);
+ cmsTemplate->setDefaultDestinationName("cpp.CmsMessageHandler.BrokerMonitor.HeartBeatingChannel");
+ cmsTemplate->setTimeToLive(1000);
+ cmsTemplate->setReceiveTimeout(1000);
+
+ return cmsTemplate;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BrokerMonitor::isBrokerOk() {
+ return brokerOk;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_BROKERMONITOR_H_
+#define _CMS_STRESS_BROKERMONITOR_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+namespace cms {
+ class Session;
+namespace stress {
+
+ class Client;
+
+ class BrokerMonitor: public decaf::lang::Runnable, activemq::cmsutil::MessageCreator {
+ private:
+
+ bool closing;
+ bool brokerOk;
+ std::string url;
+ int interval;
+ decaf::lang::Thread* brokerMonitorThread;
+ decaf::util::concurrent::CountDownLatch* quit;
+
+ private:
+
+ activemq::cmsutil::CmsTemplate* createCmsTemplate(cms::ConnectionFactory* connectionFactory);
+
+ public:
+
+ BrokerMonitor(const std::string& url, int interval,
+ decaf::util::concurrent::CountDownLatch* quit);
+
+ virtual ~BrokerMonitor();
+
+ virtual cms::Message* createMessage(cms::Session* session);
+
+ virtual void run();
+
+ void start();
+
+ void close();
+
+ bool isBrokerOk();
+
+ };
+
+}}
+
+#endif /** _CMS_STRESS_BROKERMONITOR_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/BrokerMonitor.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CmsMessageCreator.h"
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::CmsMessageCreator(const std::string& text, const std::string& name, const std::string& value) :
+ text(text), headerName(name), headerValue(value) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CmsMessageCreator::~CmsMessageCreator() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* CmsMessageCreator::createMessage(cms::Session* session) {
+
+ cms::Message* message = NULL;
+ if (session) {
+ message = session->createTextMessage(text);
+ if (headerName != "") {
+ message->setStringProperty(headerName, headerValue);
+ }
+ }
+ return message;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CMSMESSAGECREATOR_H_
+#define _CMS_STRESS_CMSMESSAGECREATOR_H_
+
+#include <decaf/util/Config.h>
+
+#include <cms/Session.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+namespace cms {
+namespace stress {
+
+ class CmsMessageCreator: public activemq::cmsutil::MessageCreator {
+ private:
+
+ std::string text;
+ std::string headerName;
+ std::string headerValue;
+
+ public:
+
+ CmsMessageCreator(const std::string& txt,
+ const std::string& name = "",
+ const std::string& value = "");
+
+ virtual ~CmsMessageCreator();
+
+ virtual cms::Message* createMessage(cms::Session* session);
+ };
+
+}}
+
+#endif /** _CMS_STRESS_CMSMESSAGECREATOR_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageCreator.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_
+#define _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_
+
+#include <decaf/util/Config.h>
+
+namespace cms {
+namespace stress {
+
+ #define SYM_GOOD_SEND 32
+ #define SYM_BAD_SEND 33
+ #define SYM_BIG_DIFF 63
+ #define SYM_GOOD_SEQ 2
+ #define SYM_BAD_MSG 105
+ #define SYM_MON_GOOD 71
+ #define SYM_MON_BAD 66
+ #define SYM_MON_CMS 67
+ #define SYM_MON_EXC 69
+
+ enum ErrorCode { // NOTE: When added an entry to ErrorCode you must add the corresponding error string to ErrorDescription
+ CMS_SUCCESS = 0,
+ CMS_ERROR_UNABLE_TO_PARSE_XML,
+ CMS_ERROR_MESSAGE_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_HEADER_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_CLIENT_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_DESTINATION_HAS_BEEN_DEFINED_ALREADY,
+ CMS_ERROR_INVALID_CLIENT,
+ CMS_ERROR_INVALID_DESTINATION,
+ CMS_ERROR_INVALID_MESSAGE,
+ CMS_ERROR_INVALID_HEADERS,
+ CMS_ERROR_INVALID_MESSAGELISTENER,
+ CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY,
+ CMS_ERROR_RECEIVER_TIMEDOUT,
+ CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_SENDING_MESSAGES,
+ CMS_ERROR_DESTINATION_NOT_CONFIGURED_FOR_RECEIVING_MESSAGES,
+ CMS_ERROR_CAUGHT_CMS_EXCEPTION,
+ CMS_ERROR_CAUGHT_TBGENOBJ_ERROR,
+ CMS_ERROR_MESSAGE_BROKER_ERROR,
+ CMS_ERROR_BROKER_MONITOR_NOT_FOUND,
+ CMS_ERROR_BROKER_MONITORING_NOT_TURNED_ON,
+ CMS_ERROR_INVALID_BROKERSTATUSLISTENER,
+ CMS_ERROR_A_BROKERSTATUSLISTENER_HAS_BEEN_REGISTERED_ALREADY,
+ CMS_ERROR_CAUGHT_EXCEPTION_IN_INIT,
+ CMS_ERROR_CAUGHT_EXCEPTION_IN_UNINIT,
+ CMS_LAST // Put all error enums BEFORE this one. This one must be listed last.
+ };
+
+ const char ErrorDescription[][100] = {
+ "Success",
+ "Unable to parse xml",
+ "Message has been defined already",
+ "Header has been defined already",
+ "Client has been defined already",
+ "Destination has been defined already",
+ "Invalid client",
+ "Invalid destination",
+ "Invalid message",
+ "Invalid headers",
+ "Invalid messagelistener",
+ "A messagelistener has been registered already with the destination",
+ "Receiver timed out",
+ "Destination not configured for sending messages",
+ "Destination not configured for receiving messages",
+ "Caught CMS exception",
+ "Caught TBGenObj error",
+ "Message broker appears to be offline",
+ "Can not find broker monitor",
+ "The broker monitoring functinaliy has not been turned on",
+ "Invalid brokerStatuslistener",
+ "This brokerStatuslistener has been registered already with the broker",
+ "Caught an exception when initializing CmsMessageHandler",
+ "Caught an exception when uninitializing CmsMessageHandler",
+ "CMS_LAST - ErrorCodeToString macro index out of range",
+ };
+
+ #define ErrorCodeToString(i) (((i >= CMS_SUCCESS) && (i <= CMS_LAST)) ? ErrorDescription[i] : ErrorDescription[CMS_LAST])
+ #define IsError(i) (i != CMS_SUCCESS)
+
+}}
+
+#endif /** _CMS_STRESS_CMSMESSAGEHANDLERDEFINITIONS_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsMessageHandlerDefinitions.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/library/ActiveMQCPP.h>
+
+#include "TestSenderAndReceiver.h"
+#include "ConnectionFactoryMgr.h"
+#include "BrokerMonitor.h"
+#include "MessagingTask.h"
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+#include <iostream>
+#include <stdlib.h>
+#include <stdio.h>
+
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::library;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+bool VERBOSE = false;
+static bool bPause = false;
+
+decaf::util::concurrent::CountDownLatch* quit;
+
+TESTINFO TestResults;
+int InitialPrivateMemorySize;
+
+////////////////////////////////////////////////////////////////////////////////
+void DisplayResults() {
+ long long mills = TestResults.endTime - TestResults.startTime;
+ int secs = (int) (mills / 1000);
+ int mins = secs / 60;
+ int hrs = mins / 60;
+ int days = hrs / 24;
+
+ printf("\nT E S T S U M M A R Y\n");
+
+ if (days > 0) {
+ printf("Elapsed time = %d:%02.2d:%02.2d:%02.2d.%03.3d\n", days, hrs % 24, mins % 60, secs % 60, mills % 1000);
+ } else {
+ printf("Elapsed time = %02.2d:%02.2d:%02.2d.%03.3d\n", hrs % 24, mins % 60, secs % 60, mills % 1000);
+ }
+
+ printf("Threads used = %d\n", TestResults.threadCount);
+
+ printf("Messages sent = %d\n", TestResults.sent.get());
+ printf("Messages received = %d\n", TestResults.received.get());
+ if (TestResults.invalidMessages.get()) {
+ printf("Invalid Messages = %d\n", TestResults.invalidMessages.get());
+ }
+ if (TestResults.badSequenceMessages.get()) {
+ printf("Sequence Errors = %d\n", TestResults.badSequenceMessages.get());
+ }
+ if (TestResults.sendErrors.get()) {
+ printf("Send Errors = %d\n", TestResults.sendErrors.get());
+ }
+
+ if (TestResults.sent.get() > 0) {
+ printf("Reliability = %0.3f%%\n", (double) (((double) (TestResults.received.get()) * 100) / (double) TestResults.sent.get()));
+ printf("Sequenced = %0.3f%%\n", (double) (((double) (TestResults.received.get() - TestResults.badSequenceMessages.get()) * 100) / (double) TestResults.sent.get()));
+ printf("Output performance = %0.3f ms/msg\n", (double) ((double) mills / (double) TestResults.sent.get()));
+ if ((TestResults.received.get() + TestResults.invalidMessages.get()) > 0) {
+ printf("Input performance = %0.3f ms/msg\n", (double) ((double) mills / (double) (TestResults.received.get() + TestResults.invalidMessages.get())));
+ }
+ }
+
+ if (bPause) {
+ printf("Press any key to continue...\n");
+ std::cin.get();
+ printf("\n");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int main(int argc, char** argv) {
+
+ BrokerMonitor* monitor = NULL;
+ std::string url = "tcp://";
+ int cnt = 25;
+ int done = 3600;
+ int monint = 5;
+ int inter = -1;
+ int seed = 0;
+ int thrdmax = 100;
+ int thrdmin = 3;
+ char *ptr;
+ bool bOK = true;
+ bool bReport = false;
+ bool bPool = true;
+ const char* pName = NULL;
+ const char* pHeader = NULL;
+
+ printf("\n");
+
+ for (int i = 1; bOK && (i < argc); i++) {
+ if (argv[i][0] != '-') {
+ bOK = false;
+ } else {
+ switch (argv[i][1]) {
+ case 'a':
+ bPool = false;
+ break;
+ case 'b':
+ monint = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 'f':
+ thrdmin = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 'g':
+ thrdmax = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 'i':
+ inter = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 'n':
+ pName = argv[i + 1];
+ i++;
+ break;
+ case 'p':
+ bPause = true;
+ break;
+ case 'r':
+ bReport = true;
+ break;
+ case 's':
+ done = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 't':
+ cnt = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ case 'u':
+ pHeader = argv[i + 1];
+ i++;
+ break;
+ case 'v':
+ VERBOSE = true;
+ break;
+ case 'x':
+ seed = Integer::parseInt(argv[i + 1]);
+ i++;
+ break;
+ default:
+ bOK = false;
+ break;
+ }
+ }
+ }
+
+ if (!bOK) {
+ printf("Usage: %s [-b #] [-i #] [-m] [-n name] [-p] [-r] [-s #] [-t #] [-u header] [-v] [-x # ]\n", argv[0]);
+ printf(" -a : Do not use thread pools when dispatching incoming messages (default is to use pools)\n");
+ printf(" -b # : Number of seconds between heartbeat messages (0 disables) (default is 5)\n");
+ printf(" -f # : Minimum number of threads to have available when use thread pools (default is 3)\n");
+ printf(" -g # : Maximum number of threads that can be created in each thread pool (default is 100)\n");
+ printf(" -i # : Number of milliseconds to wait between message sends (-1 is random delay between 0 and 1000ms) (default is -1)\n");
+ printf(" -n s : Specify the name of the topic to use (default is 'topic')\n");
+ printf(" -p : Pause for keyboard input (default is false)\n");
+ printf(" -r : Display test summary (default is false)\n");
+ printf(" -s # : Number of seconds to run test for (0 is until CTRL+C is entered) (default is 3600 - 1hr)\n");
+ printf(" -t # : Number of threads to use (default is 25)\n");
+ printf(" -u s : Use the specified header name for ID and utilize selectors\n");
+ printf(" -v : Display verbose progress where the following symbols have the specified meaning:\n");
+ printf(" (%c) = Good send of a message\n", SYM_GOOD_SEND);
+ printf(" (%c) = Error encountered sending a message\n", SYM_BAD_SEND);
+ printf(" (%c) = Good sequenced message received\n", SYM_GOOD_SEQ);
+ printf(" (%c) = Invalid message received\n", SYM_BAD_MSG);
+ printf(" (%c) = Received message sequence is off by 10 or more\n", SYM_BIG_DIFF);
+ printf(" (#) = Number indicating how far off received sequence number is\n");
+ printf(" (%c) = Good heartbeat sent and received\n", SYM_MON_GOOD);
+ printf(" (%c) = Failed to send and receive heartbeat\n", SYM_MON_BAD);
+ printf(" (%c) = CMS exception received while send/receiving heartbeat\n", SYM_MON_CMS);
+ printf(" (%c) = Exception received while send/receiving heartbeat\n", SYM_MON_EXC);
+ printf(" -x # : Seed the random number generator with the specified seed (0 uses time) (default is 0)\n");
+ printf("\n");
+ return -1;
+ }
+
+ ptr = getenv("MessageBrokerIP");
+ if (ptr != NULL) {
+ url += ptr;
+ } else {
+ url += "127.0.0.1";
+ }
+ url += ":";
+ ptr = getenv("MessageBrokerPort");
+ if (ptr != NULL) {
+ url += ptr;
+ } else {
+ url += "61616";
+ }
+ url += "?connection.sendTimeout=1000";
+
+ ActiveMQCPP::initializeLibrary();
+ ConnectionFactoryMgr::initialize();
+ MessagingTask::initializeThreads(thrdmin, thrdmax);
+ quit = new CountDownLatch(1);
+
+ TestSenderAndReceiver** sar = new TestSenderAndReceiver*[cnt];
+ TestResults.threadCount = cnt;
+ TestResults.lastSequence = new AtomicInteger[cnt];
+
+ if (pName == NULL) {
+ pName = "topic";
+ }
+ if (pHeader == NULL) {
+ pHeader = "";
+ }
+
+#ifdef _UNICODE
+ wstring wstr(pName);
+ string nstr(wstr.begin(), wstr.end());
+ wstring whdr(pHeader);
+ string nhdr(whdr.begin(), whdr.end());
+#else
+ string nstr(pName);
+ string nhdr(pHeader);
+#endif
+
+ TestResults.startTime = System::currentTimeMillis();
+
+ if (monint > 0) {
+ monitor = new BrokerMonitor(url, monint * 1000, quit);
+ monitor->start();
+ }
+
+ for (int i = 0; i < cnt; i++) {
+ string topic(nstr);
+
+ // If not using selectors send to different topic names
+ if (nhdr == "") {
+ stringstream str;
+ str << i;
+ topic += str.str();
+ }
+ sar[i] = new TestSenderAndReceiver(url, topic.c_str(), nhdr.c_str(), true, false, monitor, quit, 50, 1000, i, bPool, inter, seed);
+ sar[i]->init();
+ }
+
+ if (done != 0) {
+ quit->await(done * 1000);
+ } else {
+ quit->await();
+ }
+
+ quit->countDown();
+
+ for (int i = 0; i < cnt; i++) {
+ sar[i]->close();
+ delete sar[i];
+ }
+ delete sar;
+
+ if (monitor != NULL) {
+ monitor->close();
+ delete monitor;
+ }
+
+ TestResults.endTime = System::currentTimeMillis();
+ printf("\n");
+
+ delete[] TestResults.lastSequence;
+ MessagingTask::terminateThreads();
+ ConnectionFactoryMgr::unInitialize();
+ delete quit;
+ ActiveMQCPP::shutdownLibrary();
+
+ if (bReport) {
+ DisplayResults();
+ }
+
+ printf("\nTest Completed!\n");
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/CmsStress.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConnectionFactoryMgr.h"
+#include <activemq/core/ActiveMQConnectionFactory.h>
+
+using namespace decaf::lang::exceptions;
+using namespace activemq::core;
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+
+StlMap<std::string, ConnectionFactory*> * ConnectionFactoryMgr::connectionFactories;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::ConnectionFactoryMgr() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactoryMgr::~ConnectionFactoryMgr() {
+ unInitialize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::initialize() {
+ connectionFactories = new StlMap<std::string, ConnectionFactory*>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionFactoryMgr::unInitialize() {
+ connectionFactories->lock();
+
+ Pointer<Iterator<ConnectionFactory*> > iter(connectionFactories->values().iterator());
+ while (iter->hasNext()) {
+ ConnectionFactory* connectionFactory = iter->next();
+ if (connectionFactory != NULL) {
+ delete connectionFactory;
+ connectionFactory = NULL;
+ }
+ }
+
+ connectionFactories->clear();
+ connectionFactories->unlock();
+
+ delete connectionFactories;
+ connectionFactories = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectionFactory* ConnectionFactoryMgr::getConnectionFactory(const std::string& url) {
+
+ ConnectionFactory* connectionFactory = NULL;
+
+ connectionFactories->lock();
+ try {
+ if (connectionFactories->containsKey(url)) {
+ connectionFactory = connectionFactories->get(url);
+ }
+ } catch (NoSuchElementException& ex) {
+ }
+
+ if (!connectionFactory) {
+ connectionFactory = new ActiveMQConnectionFactory(url);
+ connectionFactories->put(url, connectionFactory);
+ }
+ connectionFactories->unlock();
+
+ return connectionFactory;
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_CONNECTIONFACTORYMGR_H_
+#define _CMS_STRESS_CONNECTIONFACTORYMGR_H_
+
+#include <decaf/util/Config.h>
+#include <cms/ConnectionFactory.h>
+
+#include <decaf/util/StlMap.h>
+
+using namespace cms;
+using namespace decaf::util;
+using namespace std;
+
+namespace cms {
+namespace stress {
+
+ class ConnectionFactoryMgr {
+ private:
+
+ static decaf::util::StlMap<std::string, cms::ConnectionFactory*>* connectionFactories;
+
+ ConnectionFactoryMgr();
+ virtual ~ConnectionFactoryMgr();
+
+ public:
+
+ static void initialize();
+ static void unInitialize();
+
+ static cms::ConnectionFactory* getConnectionFactory(const std::string& url);
+
+ };
+
+}}
+
+#endif /** _CMS_STRESS_CONNECTIONFACTORYMGR_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/ConnectionFactoryMgr.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessagingTask.h"
+
+using namespace decaf::lang;
+using namespace decaf::util::concurrent;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+ThreadPoolExecutor* MessagingTask::threadPoolExecutor = NULL;
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::MessagingTask(Receiver* receiver, const std::string& message) :
+ receiver(receiver), message(message) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessagingTask::~MessagingTask() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::queue() {
+ if (threadPoolExecutor != NULL) {
+ threadPoolExecutor->execute(this);
+ } else {
+ run();
+ delete this;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::run() {
+ try {
+ if (receiver != NULL) {
+ receiver->executeMessagingTask(message);
+ }
+ } catch (...) {
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::initializeThreads(int min, int max) {
+ if (min > 0) {
+ threadPoolExecutor = new ThreadPoolExecutor(min, max, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
+ } else {
+ threadPoolExecutor = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagingTask::terminateThreads() {
+ if (threadPoolExecutor != NULL) {
+ threadPoolExecutor->shutdown();
+ //threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
+ threadPoolExecutor->awaitTermination(-1, TimeUnit::SECONDS);
+ delete threadPoolExecutor;
+ threadPoolExecutor = NULL;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_MESSAGINGTASK_H_
+#define _CMS_STRESS_MESSAGINGTASK_H_
+
+#include <decaf/util/Config.h>
+
+#include "Receiver.h"
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+
+namespace cms {
+namespace stress {
+
+ class MessagingTask:public decaf::lang::Runnable {
+ private:
+
+ Receiver* receiver;
+ std::string message;
+
+ static decaf::util::concurrent::ThreadPoolExecutor* threadPoolExecutor;
+
+ public:
+
+ MessagingTask(Receiver* receiver, const std::string& message);
+
+ virtual ~MessagingTask();
+
+ virtual void run();
+
+ static void initializeThreads(int min, int max);
+ static void terminateThreads();
+
+ virtual void queue();
+
+ };
+
+}}
+
+#endif /** _CMS_STRESS_MESSAGINGTASK_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/MessagingTask.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Receiver.h"
+#include "MessagingTask.h"
+#include "ConnectionFactoryMgr.h"
+#include "BrokerMonitor.h"
+#include "CmsMessageCreator.h"
+
+#include <cms/Message.h>
+#include <activemq/cmsutil/MessageCreator.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Exception.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <stdio.h>
+
+using namespace decaf::lang;
+using namespace decaf::util::concurrent;
+using namespace activemq::cmsutil;
+using namespace cms;
+using namespace cms::stress;
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::Receiver(const std::string& url, const std::string& queueOrTopicName,
+ bool isTopic, BrokerMonitor* monitor, CountDownLatch* quit,
+ long long receiveTimeout, bool useThreadPool) :
+ url(url),
+ mutexForCmsTemplate(),
+ mutexGeneral(),
+ closing(false),
+ brokerOnline(true),
+ ready(1),
+ quit(quit),
+ messageListener(NULL),
+ cmsTemplate(NULL),
+ asyncReceiverThread(NULL),
+ receiveTimeout(receiveTimeout),
+ cmsTemplateCreateTime(System::currentTimeMillis()),
+ useThreadPool(useThreadPool),
+ numOfMessagingTasks(0),
+ monitor(monitor),
+ selector() {
+
+ ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+ cmsTemplateCreateTime = System::currentTimeMillis();
+ cmsTemplate = new CmsTemplate(connectionFactory);
+ cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+ cmsTemplate->setPubSubDomain(isTopic);
+ cmsTemplate->setReceiveTimeout(receiveTimeout);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Receiver::~Receiver() {
+
+ closing = true;
+
+ //delete cmsTemplate
+ mutexForCmsTemplate.lock();
+ if (cmsTemplate) {
+ delete cmsTemplate;
+ cmsTemplate = NULL;
+ }
+ mutexForCmsTemplate.unlock();
+
+ //wait until all outstanding messaging tasks are done
+ while (getNumOfMessagingTasks() > 0) {
+ Thread::sleep(100);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::receiveMessage(std::string& message, ErrorCode& errorCode,
+ const std::string &selector, bool retryOnError) {
+
+ long long stopRetryTime = System::currentTimeMillis() + receiveTimeout;
+ errorCode = CMS_SUCCESS;
+
+ if (receiveTimeout == 0 /*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
+ retryOnError = false;
+ } else if (receiveTimeout == -1 /*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
+ retryOnError = true;
+ }
+
+ if (monitor != NULL) {
+ if (monitor->isBrokerOk()) {
+ if (!brokerOnline) {
+ mutexForCmsTemplate.lock();
+ if (cmsTemplate) {
+ cmsTemplateCreateTime = System::currentTimeMillis();
+ CmsTemplate* cmsTemplate = new CmsTemplate(cmsTemplate->getConnectionFactory());
+ cmsTemplate->setDefaultDestinationName(cmsTemplate->getDefaultDestinationName());
+ cmsTemplate->setPubSubDomain(cmsTemplate->isPubSubDomain());
+ cmsTemplate->setReceiveTimeout(cmsTemplate->getReceiveTimeout());
+ delete cmsTemplate;
+ }
+ mutexForCmsTemplate.unlock();
+
+ brokerOnline = true;
+ }
+ } else {
+ brokerOnline = false;
+ errorCode = CMS_ERROR_MESSAGE_BROKER_ERROR;
+ return;
+ }
+ }
+
+ do {
+ long long timeoutForThisLoop;
+ if (receiveTimeout <= 0) {
+ timeoutForThisLoop = receiveTimeout;
+ } else {
+ timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
+ if (timeoutForThisLoop <= 0) {
+ errorCode = CMS_ERROR_RECEIVER_TIMEDOUT;
+ break;
+ }
+ }
+
+ mutexForCmsTemplate.lock();
+ if (cmsTemplate) {
+ cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
+
+ cms::Message* cmsMessage = NULL;
+ try {
+ if (selector != "") {
+ cmsMessage = cmsTemplate->receiveSelected(selector);
+ } else {
+ cmsMessage = cmsTemplate->receive();
+ }
+ } catch (cms::CMSException& ex) {
+ mutexForCmsTemplate.unlock();
+ errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+ break;
+ }
+
+ mutexForCmsTemplate.unlock();
+ if (cmsMessage == NULL) {
+ break;
+ }
+
+ if (isMessageExpired(cmsMessage)) {
+ errorCode = CMS_ERROR_INVALID_MESSAGE;
+ delete cmsMessage;
+ continue;
+ }
+
+ wstring text;
+ cms::TextMessage* txtMessage = dynamic_cast<cms::TextMessage*>(cmsMessage);
+ if (txtMessage) {
+ message = txtMessage->getText();
+ }
+ delete cmsMessage;
+ } else {
+ mutexForCmsTemplate.unlock();
+ }
+ } while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis() < stopRetryTime);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::waitUntilReady() {
+ ready.await();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::registerMessageListener(ReceiverListener* messageListener, ErrorCode& errorCode,
+ const std::string& selector, int id) {
+ errorCode = CMS_SUCCESS;
+ char buffer[512];
+
+ if (id != 0) {
+ sprintf(buffer, "TestListener-%d", id);
+ } else {
+ sprintf(buffer, "TestAsyncListener");
+ }
+
+ mutexGeneral.lock();
+ if (messageListener == NULL) {
+ errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
+ mutexGeneral.unlock();
+ return;
+ }
+
+ if (messageListener != NULL) {
+ errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
+ mutexGeneral.unlock();
+ return;
+ }
+
+ this->messageListener = messageListener;
+ this->selector = selector;
+
+ asyncReceiverThread = new Thread(this, buffer);
+ asyncReceiverThread->start();
+ mutexGeneral.unlock();
+
+ this->waitUntilReady();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::run() {
+ ready.countDown();
+ while (!closing) {
+ std::string message = "";
+
+ ErrorCode errorCode = CMS_SUCCESS;
+
+ Receiver::receiveMessage(message, errorCode, selector, false);
+ if (quit->getCount() == 0) {
+ closing = true;
+ }
+
+ if ((message != "") && (!closing)) {
+ if (useThreadPool) {
+ MessagingTask* task = new MessagingTask(this, message);
+
+ increaseNumOfMessagingTasks();
+ task->queue();
+ } else {
+ try {
+ executeMessagingTask(message, false);
+ } catch (...) {
+ }
+ }
+ } else if (!closing) {
+ if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR) {
+ long long sleepTime = 0;
+ mutexForCmsTemplate.lock();
+ sleepTime = cmsTemplate->getReceiveTimeout();
+ mutexForCmsTemplate.unlock();
+
+ if (quit->await(sleepTime)) {
+ closing = true;
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::executeMessagingTask(const std::string& message, bool isDecreaseNumOfMessagingTasks) {
+ if (!closing) {
+ mutexGeneral.lock();
+ ReceiverListener* copy = messageListener;
+ mutexGeneral.unlock();
+ if (copy) {
+ copy->onMessage(message);
+ }
+ }
+
+ if (isDecreaseNumOfMessagingTasks) {
+ decreaseNumOfMessagingTasks();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool Receiver::isMessageExpired(cms::Message* message) {
+ long long expireTime = message->getCMSExpiration();
+ long long currentTime = System::currentTimeMillis();
+ if (expireTime > 0 && currentTime > expireTime) {
+ return true;
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::increaseNumOfMessagingTasks() {
+ mutexGeneral.lock();
+ numOfMessagingTasks++;
+ mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::decreaseNumOfMessagingTasks() {
+ mutexGeneral.lock();
+ numOfMessagingTasks--;
+ mutexGeneral.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long Receiver::getNumOfMessagingTasks() {
+ long result = 0;
+
+ mutexGeneral.lock();
+ result = numOfMessagingTasks;
+ mutexGeneral.unlock();
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Receiver::close() {
+ closing = true;
+ if (asyncReceiverThread) {
+ asyncReceiverThread->join();
+ delete asyncReceiverThread;
+ asyncReceiverThread = NULL;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_RECEIVER_H_
+#define _CMS_STRESS_RECEIVER_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+
+#include <string>
+
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+ class BrokerMonitor;
+
+ class ReceiverListener {
+ public:
+
+ virtual ~ReceiverListener() {}
+
+ virtual void onMessage(const std::string& message) = 0;
+
+ };
+
+ class Receiver: public decaf::lang::Runnable {
+ private:
+
+ std::string url;
+ decaf::util::concurrent::Mutex mutexForCmsTemplate;
+ decaf::util::concurrent::Mutex mutexGeneral;
+ bool closing;
+ bool brokerOnline;
+ decaf::util::concurrent::CountDownLatch ready;
+ decaf::util::concurrent::CountDownLatch* quit;
+ ReceiverListener* messageListener;
+ activemq::cmsutil::CmsTemplate* cmsTemplate;
+ decaf::lang::Thread* asyncReceiverThread;
+ long long receiveTimeout;
+ long long cmsTemplateCreateTime;
+ bool useThreadPool;
+ long numOfMessagingTasks;
+ BrokerMonitor* monitor;
+ std::string selector;
+
+ private:
+
+ virtual void waitUntilReady();
+ void increaseNumOfMessagingTasks();
+ void decreaseNumOfMessagingTasks();
+ long getNumOfMessagingTasks();
+
+ public:
+
+ Receiver(const std::string& url,
+ const std::string& queueOrTopicName,
+ bool isTopic,
+ BrokerMonitor* monitor,
+ decaf::util::concurrent::CountDownLatch* quit,
+ long long receiveTimeout = 2000,
+ bool useThreadPool = true);
+
+ virtual ~Receiver();
+
+ void close();
+
+ virtual void run();
+
+ void registerMessageListener(ReceiverListener* messageListener,
+ ErrorCode& errorCode, const std::string& selector, int id = 0);
+
+ void receiveMessage(std::string& message, ErrorCode& errorCode,
+ const std::string& selector, bool retryOnError = true);
+
+ static bool isMessageExpired(cms::Message* message);
+
+ void executeMessagingTask(const std::string& message,
+ bool bDecreaseNumOfMessagingTasks = true);
+
+ };
+
+}}
+
+#endif /** _CMS_STRESS_RECEIVER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Receiver.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Sender.h"
+
+#include "CmsMessageCreator.h"
+#include "ConnectionFactoryMgr.h"
+
+#include <cms/ConnectionFactory.h>
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+using namespace activemq::cmsutil;
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::Sender(const std::string& url, const std::string& queueOrTopicName,
+ bool isTopic, bool isDeliveryPersistent, int timeToLive) : cmsTemplateMutex(), cmsTemplate() {
+
+ ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
+
+ cmsTemplate = new CmsTemplate(connectionFactory);
+
+ cmsTemplate->setExplicitQosEnabled(true);
+ cmsTemplate->setDefaultDestinationName(queueOrTopicName);
+ cmsTemplate->setPubSubDomain(isTopic);
+ cmsTemplate->setDeliveryPersistent(isDeliveryPersistent);
+ cmsTemplate->setTimeToLive(timeToLive);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Sender::~Sender() {
+ cmsTemplateMutex.lock();
+ if (cmsTemplate) {
+ delete cmsTemplate;
+ cmsTemplate = NULL;
+ }
+ cmsTemplateMutex.unlock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void Sender::SendMessage(const std::string& message, ErrorCode& errorCode,
+ const std::string& header, const std::string& value) {
+
+ CmsMessageCreator messageCreator(message, header, value);
+
+ try {
+ cmsTemplateMutex.lock();
+ cmsTemplate->send(&messageCreator);
+ cmsTemplateMutex.unlock();
+ errorCode = CMS_SUCCESS;
+ } catch (cms::CMSException& ex) {
+ cmsTemplateMutex.unlock();
+ errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_SENDER_H_
+#define _CMS_STRESS_SENDER_H_
+
+#include <decaf/util/Config.h>
+
+#include <activemq/cmsutil/CmsTemplate.h>
+#include <activemq/cmsutil/MessageCreator.h>
+
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/Runnable.h>
+
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+ class Sender {
+ private:
+
+ decaf::util::concurrent::Mutex cmsTemplateMutex;
+ activemq::cmsutil::CmsTemplate* cmsTemplate;
+
+ public:
+
+ Sender(const std::string& url,
+ const std::string& queueOrTopicName,
+ bool isTopic, bool isDeliveryPersistent,
+ int timeToLive);
+
+ virtual ~Sender();
+
+ void SendMessage(const std::string& msg,
+ ErrorCode& errorCode,
+ const std::string& header,
+ const std::string& value);
+
+ };
+
+}}
+
+#endif /** _CMS_STRESS_SENDER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/Sender.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp Tue Feb 19 20:12:30 2013
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestSenderAndReceiver.h"
+
+#include <stdio.h>
+
+using namespace decaf::lang;
+using namespace cms;
+using namespace cms::stress;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+extern bool VERBOSE;
+extern TESTINFO TestResults;
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::TestSenderAndReceiver(const std::string& url, const std::string& queueOrTopicName,
+ const std::string& headerName, bool isTopic,
+ bool isDeliveryPersistent, BrokerMonitor* monitor,
+ CountDownLatch* quit, int timeToLive, int receiveTimeout,
+ int identifier, bool useThreadPool, int sleep, int seed) :
+ sender(NULL),
+ receiver(NULL),
+ senderThread(NULL),
+ monitor(monitor),
+ header(headerName),
+ closing(false),
+ sendIndex(0),
+ id(identifier),
+ sleep(sleep),
+ seed(seed),
+ quit(quit),
+ random(seed) {
+
+ sender = new Sender(url, queueOrTopicName, isTopic, isDeliveryPersistent, timeToLive);
+ receiver = new Receiver(url, queueOrTopicName, isTopic, monitor, quit, receiveTimeout, useThreadPool);
+ ErrorCode errorCode = CMS_SUCCESS;
+
+ std::string selector("");
+ if (headerName != "") {
+ std::stringstream sID;
+ selector = headerName;
+ selector.append("='");
+ sID << identifier;
+ selector.append(sID.str());
+ selector.append("'");
+ }
+
+ receiver->registerMessageListener(this, errorCode, selector, identifier);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TestSenderAndReceiver::~TestSenderAndReceiver() {
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::init() {
+ char buffer[512];
+ sprintf(buffer, "TestSender-%d", id);
+ senderThread = new Thread(this, buffer);
+ senderThread->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::onMessage(const std::string& message) {
+
+ int index = (int) message.find(";");
+ std::string msg;
+ int thrdidx;
+ int thrdseq, curseq, diffseq;
+
+ if (index <= 0) {
+ // i for invalid message
+ if (VERBOSE) {
+ printf("%c", SYM_BAD_MSG);
+ }
+ TestResults.invalidMessages.incrementAndGet();
+ } else {
+ thrdidx = atoi(message.substr(0, index).c_str());
+ msg = message.substr(index + 1);
+
+ if (thrdidx > (int) TestResults.threadCount) {
+ if (VERBOSE) {
+ printf("%c", SYM_BAD_MSG);
+ }
+ TestResults.invalidMessages.incrementAndGet();
+ } else {
+ index = (int) msg.find(";");
+ if (index <= 0) {
+ if (VERBOSE) {
+ printf("%c", SYM_BAD_MSG);
+ }
+ TestResults.invalidMessages.incrementAndGet();
+ } else {
+ TestResults.received.incrementAndGet();
+ thrdseq = Integer::parseInt(msg.substr(0, index));
+ msg = msg.substr(index + 1);
+ curseq = TestResults.lastSequence[thrdidx].incrementAndGet();
+ if (thrdseq == curseq) {
+ if (VERBOSE) {
+ // Smiley face for good message
+ printf("%c", SYM_GOOD_SEQ);
+ }
+ } else {
+ TestResults.lastSequence[thrdidx].set(thrdseq);
+ TestResults.badSequenceMessages.incrementAndGet();
+ if (thrdseq > curseq) {
+ diffseq = thrdseq - curseq;
+ } else {
+ diffseq = curseq - thrdseq;
+ }
+ TestResults.sequenceDifferences.addAndGet(diffseq);
+ if (VERBOSE) {
+ if ((diffseq > 0) && (diffseq < 10)) {
+ printf("%d", diffseq);
+ } else {
+ printf("%c", SYM_BIG_DIFF);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::run() {
+ ErrorCode errorReturn;
+ int i, j;
+ bool result;
+
+ // Seed the random numbers - time if zero
+ if (seed == 0) {
+ random.setSeed(System::currentTimeMillis());
+ } else {
+ random.setSeed(seed);
+ }
+
+ // If randomizing sleeps - stagger start by up to 1 second
+ if (sleep == -1) {
+ Thread::sleep(random.nextInt(1000));
+ }
+
+ while (!closing) {
+ std::stringstream sID;
+ std::stringstream sSeq;
+ std::stringstream sHdr;
+ std::string message;
+
+ // Add id to messages
+ sID << id;
+ sID >> message;
+ sHdr << id;
+ message.append(";");
+
+ // Add sequence to messages
+ sSeq << sendIndex;
+ message.append(sSeq.str());
+ message.append(";");
+
+ // Add variable payload
+ j = random.nextInt(1024);
+ for (i = 0; i < j; i++) {
+ message += std::string(1, (char) (65 + (random.nextInt(24))));
+ }
+
+ errorReturn = CMS_SUCCESS;
+ sender->SendMessage(message, errorReturn, header, sHdr.str());
+ if (errorReturn == CMS_SUCCESS) {
+ sendIndex++;
+ if (VERBOSE) {
+ printf("%c", SYM_GOOD_SEND);
+ }
+ TestResults.sent.incrementAndGet();
+ } else {
+ if (VERBOSE) {
+ // Exclamation point for error
+ printf("%c", SYM_BAD_SEND);
+ }
+ TestResults.sendErrors.incrementAndGet();
+ }
+
+ if (sleep) {
+ if (sleep == -1) {
+ result = quit->await(random.nextInt(1000));
+ } else {
+ result = quit->await(random.nextInt(sleep));
+ }
+ } else {
+ result = quit->getCount() == 0;
+ }
+
+ if (result) {
+ closing = true;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TestSenderAndReceiver::close() {
+ closing = true;
+
+ if (senderThread) {
+ senderThread->join();
+ delete senderThread;
+ senderThread = NULL;
+ }
+
+ if (sender) {
+ delete sender;
+ sender = NULL;
+ }
+
+ if (receiver) {
+ receiver->close();
+ delete receiver;
+ receiver = NULL;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h?rev=1447894&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h Tue Feb 19 20:12:30 2013
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_STRESS_TESTSENDERANDRECEIVER_H_
+#define _CMS_STRESS_TESTSENDERANDRECEIVER_H_
+
+#include <decaf/util/Config.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+#include <decaf/util/Random.h>
+
+#include "Sender.h"
+#include "Receiver.h"
+#include "CmsMessageHandlerDefinitions.h"
+
+namespace cms {
+namespace stress {
+
+ class TestSenderAndReceiver: public decaf::lang::Runnable,
+ public ReceiverListener {
+ private:
+
+ Sender* sender;
+ Receiver* receiver;
+ decaf::lang::Thread* senderThread;
+ BrokerMonitor* monitor;
+ std::string header;
+ bool closing;
+ int sendIndex;
+ int id;
+ int sleep;
+ unsigned int seed;
+ decaf::util::concurrent::CountDownLatch* quit;
+ decaf::util::Random random;
+
+ public:
+
+ TestSenderAndReceiver(const std::string& url, const std::string& queueOrTopicName,
+ const std::string& headerName, bool isTopic, bool isDeliveryPersistent,
+ BrokerMonitor *monitor, decaf::util::concurrent::CountDownLatch* quit,
+ int timeToLive, int receiveTimeout, int identifier,
+ bool useThreadPool = true, int sleep = -1, int seed = 0);
+
+ virtual ~TestSenderAndReceiver();
+
+ void init();
+
+ virtual void run();
+
+ void close();
+
+ void waitUntilReady();
+
+ public:
+
+ virtual void onMessage(const std::string& message);
+
+ };
+
+ typedef struct {
+ int threadCount;
+ long long startTime;
+ long long endTime;
+ decaf::util::concurrent::atomic::AtomicInteger sent;
+ decaf::util::concurrent::atomic::AtomicInteger received;
+ decaf::util::concurrent::atomic::AtomicInteger sendErrors;
+ decaf::util::concurrent::atomic::AtomicInteger receiveErrors;
+ decaf::util::concurrent::atomic::AtomicInteger invalidMessages;
+ decaf::util::concurrent::atomic::AtomicInteger badSequenceMessages;
+ decaf::util::concurrent::atomic::AtomicInteger sequenceDifferences;
+ decaf::util::concurrent::atomic::AtomicInteger* lastSequence;
+ } TESTINFO;
+
+}}
+
+#endif /** _CMS_STRESS_TESTSENDERANDRECEIVER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/examples/stress-test/TestSenderAndReceiver.h
------------------------------------------------------------------------------
svn:eol-style = native