You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/16 00:26:51 UTC
svn commit: r1398544 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/ main/activemq/cmsutil/ main/activemq/core/
main/activemq/core/kernels/ main/cms/ test/activemq/cmsutil/
Author: tabish
Date: Mon Oct 15 22:26:50 2012
New Revision: 1398544
URL: http://svn.apache.org/viewvc?rev=1398544&view=rev
Log:
fix: https://issues.apache.org/jira/browse/AMQCPP-436
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Oct 15 22:26:50 2012
@@ -265,6 +265,7 @@ cc_sources = \
cms/InvalidSelectorException.cpp \
cms/MapMessage.cpp \
cms/Message.cpp \
+ cms/MessageAvailableListener.cpp \
cms/MessageConsumer.cpp \
cms/MessageEOFException.cpp \
cms/MessageEnumeration.cpp \
@@ -805,6 +806,7 @@ h_sources = \
cms/InvalidSelectorException.h \
cms/MapMessage.h \
cms/Message.h \
+ cms/MessageAvailableListener.h \
cms/MessageConsumer.h \
cms/MessageEOFException.h \
cms/MessageEnumeration.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h Mon Oct 15 22:26:50 2012
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-#ifndef ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
-#define ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#ifndef _ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#define _ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
#include <cms/MessageConsumer.h>
#include <activemq/util/Config.h>
@@ -27,20 +27,19 @@ namespace cmsutil {
/**
* A cached message consumer contained within a pooled session.
*/
- class AMQCPP_API CachedConsumer : public cms::MessageConsumer {
+ class AMQCPP_API CachedConsumer: public cms::MessageConsumer {
private:
cms::MessageConsumer* consumer;
private:
- CachedConsumer( const CachedConsumer& );
- CachedConsumer& operator= ( const CachedConsumer& );
+ CachedConsumer(const CachedConsumer&);
+ CachedConsumer& operator=(const CachedConsumer&);
public:
- CachedConsumer( cms::MessageConsumer* consumer ) : consumer( consumer ) {
- }
+ CachedConsumer(cms::MessageConsumer* consumer) : consumer(consumer) {}
virtual ~CachedConsumer() {}
@@ -64,7 +63,7 @@ namespace cmsutil {
return consumer->receive();
}
- virtual cms::Message* receive( int millisecs ) {
+ virtual cms::Message* receive(int millisecs) {
return consumer->receive(millisecs);
}
@@ -72,8 +71,8 @@ namespace cmsutil {
return consumer->receiveNoWait();
}
- virtual void setMessageListener( cms::MessageListener* listener ) {
- consumer->setMessageListener( listener );
+ virtual void setMessageListener(cms::MessageListener* listener) {
+ consumer->setMessageListener(listener);
}
virtual cms::MessageListener* getMessageListener() const {
@@ -84,6 +83,14 @@ namespace cmsutil {
return consumer->getMessageSelector();
}
+ virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+ consumer->setMessageAvailableListener(listener);
+ }
+
+ virtual cms::MessageAvailableListener* getMessageAvailableListener() const {
+ return consumer->getMessageAvailableListener();
+ }
+
virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
consumer->setMessageTransformer(transformer);
}
@@ -96,4 +103,4 @@ namespace cmsutil {
}}
-#endif /*ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/
+#endif /*_ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Oct 15 22:26:50 2012
@@ -199,3 +199,13 @@ void ActiveMQConsumer::setMessageTransfo
cms::MessageTransformer* ActiveMQConsumer::getMessageTransformer() const {
return this->config->kernel->getMessageTransformer();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+ this->config->kernel->setMessageAvailableListener(listener);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageAvailableListener* ActiveMQConsumer::getMessageAvailableListener() const {
+ return this->config->kernel->getMessageAvailableListener();
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Oct 15 22:26:50 2012
@@ -79,6 +79,10 @@ namespace core{
virtual cms::MessageListener* getMessageListener() const;
+ virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
+
+ virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
+
virtual std::string getMessageSelector() const;
virtual void setMessageTransformer(cms::MessageTransformer* transformer);
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Oct 15 22:26:50 2012
@@ -76,6 +76,7 @@ namespace kernels {
public:
cms::MessageListener* listener;
+ cms::MessageAvailableListener* messageAvailableListener;
cms::MessageTransformer* transformer;
decaf::util::concurrent::Mutex listenerMutex;
AtomicBoolean deliveringAcks;
@@ -95,6 +96,7 @@ namespace kernels {
Pointer<Scheduler> scheduler;
ActiveMQConsumerKernelConfig() : listener(NULL),
+ messageAvailableListener(NULL),
transformer(NULL),
listenerMutex(),
deliveringAcks(),
@@ -1111,6 +1113,9 @@ void ActiveMQConsumerKernel::dispatch(co
// No listener, add it to the unconsumed messages list it will get pushed on the
// next receive call or when a new listener is added.
this->internal->unconsumedMessages->enqueue(dispatch);
+ if (this->internal->messageAvailableListener != NULL) {
+ this->internal->messageAvailableListener->onMessageAvailable(this);
+ }
}
}
}
@@ -1400,3 +1405,13 @@ void ActiveMQConsumerKernel::setPrefetch
bool ActiveMQConsumerKernel::isInUse(Pointer<ActiveMQDestination> destination) const {
return this->consumerInfo->getDestination()->equals(destination.get());
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+ this->internal->messageAvailableListener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageAvailableListener* ActiveMQConsumerKernel::getMessageAvailableListener() const {
+ return this->internal->messageAvailableListener;
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Oct 15 22:26:50 2012
@@ -19,6 +19,7 @@
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>
+#include <cms/MessageAvailableListener.h>
#include <cms/Message.h>
#include <cms/CMSException.h>
@@ -102,6 +103,10 @@ namespace kernels {
virtual cms::MessageListener* getMessageListener() const;
+ virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
+
+ virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
+
virtual std::string getMessageSelector() const;
virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp?rev=1398544&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp Mon Oct 15 22:26:50 2012
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageAvailableListener.h"
+
+using namespace cms;
+
+////////////////////////////////////////////////////////////////////////////////
+MessageAvailableListener::~MessageAvailableListener() {
+}
+
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h?rev=1398544&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h Mon Oct 15 22:26:50 2012
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_MESSAGEAVAILABLELISTENER_H_
+#define _CMS_MESSAGEAVAILABLELISTENER_H_
+
+#include <cms/Config.h>
+
+namespace cms {
+
+ class MessageConsumer;
+
+ /**
+ * A listener interface similar to the MessageListener interface. This listener
+ * is notified by its associated MessageConsumer that a new Message is available
+ * for processing when the consumer is in sync consumption mode. A consumer with
+ * a registered MessageListener will not use this listener.
+ */
+ class CMS_API MessageAvailableListener {
+ public:
+
+ virtual ~MessageAvailableListener();
+
+ /**
+ * Indicates that a new Message has become available for consumption
+ * from the provided consumer, a call to receiveNoWait should return
+ * the message immediately.
+ *
+ * @param consumer
+ * The consumer that now has a message queued for consumption.
+ */
+ virtual void onMessageAvailable(cms::MessageConsumer* consumer) = 0;
+
+ };
+
+}
+#endif /* _CMS_MESSAGEAVAILABLELISTENER_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h Mon Oct 15 22:26:50 2012
@@ -20,6 +20,7 @@
#include <cms/Config.h>
#include <cms/MessageListener.h>
+#include <cms/MessageAvailableListener.h>
#include <cms/Message.h>
#include <cms/Closeable.h>
#include <cms/Startable.h>
@@ -142,6 +143,27 @@ namespace cms{
*/
virtual cms::MessageTransformer* getMessageTransformer() const = 0;
+ /**
+ * Sets the MessageAvailableListener that this class will send events to if the consumer
+ * is in synchronous consumption mode and a new Message has arrived.
+ *
+ * @param listener
+ * The listener of new message events fired by this consumer.
+ *
+ * @throws CMSException - If an internal error occurs.
+ */
+ virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) = 0;
+
+ /**
+ * Gets the MessageAvailableListener that this class will send mew Message
+ * notification events to.
+ *
+ * @return The listener of message events received by this consumer.
+ *
+ * @throws CMSException - If an internal error occurs.
+ */
+ virtual cms::MessageAvailableListener* getMessageAvailableListener() const = 0;
+
};
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h Mon Oct 15 22:26:50 2012
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-#ifndef ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
-#define ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
+#ifndef _ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
+#define _ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
#include <cms/MessageConsumer.h>
#include <activemq/cmsutil/MessageContext.h>
@@ -24,11 +24,12 @@
namespace activemq {
namespace cmsutil {
- class DummyConsumer : public cms::MessageConsumer {
+ class DummyConsumer: public cms::MessageConsumer {
private:
std::string selector;
cms::MessageListener* listener;
+ cms::MessageAvailableListener* messageAvailableListener;
cms::MessageTransformer* transformer;
MessageContext* messageContext;
const cms::Destination* dest;
@@ -44,36 +45,48 @@ namespace cmsutil {
this->listener = NULL;
this->transformer = NULL;
}
- virtual ~DummyConsumer() {}
- virtual void close() {}
+ virtual ~DummyConsumer() {
+ }
- virtual void start() {}
+ virtual void close() {
+ }
- virtual void stop() {}
+ virtual void start() {
+ }
+
+ virtual void stop() {
+ }
- virtual cms::Message* receive() throw ( cms::CMSException ) {
+ virtual cms::Message* receive() {
return messageContext->receive(dest, selector, noLocal, 0);
}
- virtual cms::Message* receive( int millisecs ) throw ( cms::CMSException ) {
+ virtual cms::Message* receive(int millisecs) {
return messageContext->receive(dest, selector, noLocal, millisecs);
}
- virtual cms::Message* receiveNoWait() throw ( cms::CMSException ) {
+ virtual cms::Message* receiveNoWait() {
return messageContext->receive(dest, selector, noLocal, -1);
}
- virtual void setMessageListener( cms::MessageListener* listener ) throw ( cms::CMSException ) {
+ virtual void setMessageListener(cms::MessageListener* listener) {
this->listener = listener;
}
- virtual cms::MessageListener* getMessageListener() const throw ( cms::CMSException ) {
+ virtual cms::MessageListener* getMessageListener() const {
return listener;
}
- virtual std::string getMessageSelector() const
- throw ( cms::CMSException ) {
+ virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+ messageAvailableListener = listener;
+ }
+
+ virtual cms::MessageAvailableListener* getMessageAvailableListener() const {
+ return messageAvailableListener;
+ }
+
+ virtual std::string getMessageSelector() const {
return selector;
}
@@ -89,4 +102,4 @@ namespace cmsutil {
}}
-#endif /*ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_*/
+#endif /*_ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_*/