You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/16 00:26:51 UTC

svn commit: r1398544 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/activemq/cmsutil/

Author: tabish
Date: Mon Oct 15 22:26:50 2012
New Revision: 1398544

URL: http://svn.apache.org/viewvc?rev=1398544&view=rev
Log:
fix: https://issues.apache.org/jira/browse/AMQCPP-436

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Oct 15 22:26:50 2012
@@ -265,6 +265,7 @@ cc_sources = \
     cms/InvalidSelectorException.cpp \
     cms/MapMessage.cpp \
     cms/Message.cpp \
+    cms/MessageAvailableListener.cpp \
     cms/MessageConsumer.cpp \
     cms/MessageEOFException.cpp \
     cms/MessageEnumeration.cpp \
@@ -805,6 +806,7 @@ h_sources = \
     cms/InvalidSelectorException.h \
     cms/MapMessage.h \
     cms/Message.h \
+    cms/MessageAvailableListener.h \
     cms/MessageConsumer.h \
     cms/MessageEOFException.h \
     cms/MessageEnumeration.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/CachedConsumer.h Mon Oct 15 22:26:50 2012
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-#ifndef ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
-#define ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#ifndef _ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#define _ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
 
 #include <cms/MessageConsumer.h>
 #include <activemq/util/Config.h>
@@ -27,20 +27,19 @@ namespace cmsutil {
     /**
      * A cached message consumer contained within a pooled session.
      */
-    class AMQCPP_API CachedConsumer : public cms::MessageConsumer {
+    class AMQCPP_API CachedConsumer: public cms::MessageConsumer {
     private:
 
         cms::MessageConsumer* consumer;
 
     private:
 
-        CachedConsumer( const CachedConsumer& );
-        CachedConsumer& operator= ( const CachedConsumer& );
+        CachedConsumer(const CachedConsumer&);
+        CachedConsumer& operator=(const CachedConsumer&);
 
     public:
 
-        CachedConsumer( cms::MessageConsumer* consumer ) : consumer( consumer ) {
-        }
+        CachedConsumer(cms::MessageConsumer* consumer) : consumer(consumer) {}
 
         virtual ~CachedConsumer() {}
 
@@ -64,7 +63,7 @@ namespace cmsutil {
             return consumer->receive();
         }
 
-        virtual cms::Message* receive( int millisecs ) {
+        virtual cms::Message* receive(int millisecs) {
             return consumer->receive(millisecs);
         }
 
@@ -72,8 +71,8 @@ namespace cmsutil {
             return consumer->receiveNoWait();
         }
 
-        virtual void setMessageListener( cms::MessageListener* listener ) {
-            consumer->setMessageListener( listener );
+        virtual void setMessageListener(cms::MessageListener* listener) {
+            consumer->setMessageListener(listener);
         }
 
         virtual cms::MessageListener* getMessageListener() const {
@@ -84,6 +83,14 @@ namespace cmsutil {
             return consumer->getMessageSelector();
         }
 
+        virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+            consumer->setMessageAvailableListener(listener);
+        }
+
+        virtual cms::MessageAvailableListener* getMessageAvailableListener() const {
+            return consumer->getMessageAvailableListener();
+        }
+
         virtual void setMessageTransformer(cms::MessageTransformer* transformer) {
             consumer->setMessageTransformer(transformer);
         }
@@ -96,4 +103,4 @@ namespace cmsutil {
 
 }}
 
-#endif /*ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/
+#endif /*_ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Oct 15 22:26:50 2012
@@ -199,3 +199,13 @@ void ActiveMQConsumer::setMessageTransfo
 cms::MessageTransformer* ActiveMQConsumer::getMessageTransformer() const {
     return this->config->kernel->getMessageTransformer();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+    this->config->kernel->setMessageAvailableListener(listener);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageAvailableListener* ActiveMQConsumer::getMessageAvailableListener() const {
+    return this->config->kernel->getMessageAvailableListener();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Oct 15 22:26:50 2012
@@ -79,6 +79,10 @@ namespace core{
 
         virtual cms::MessageListener* getMessageListener() const;
 
+        virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
+
+        virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
+
         virtual std::string getMessageSelector() const;
 
         virtual void setMessageTransformer(cms::MessageTransformer* transformer);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Oct 15 22:26:50 2012
@@ -76,6 +76,7 @@ namespace kernels {
     public:
 
         cms::MessageListener* listener;
+        cms::MessageAvailableListener* messageAvailableListener;
         cms::MessageTransformer* transformer;
         decaf::util::concurrent::Mutex listenerMutex;
         AtomicBoolean deliveringAcks;
@@ -95,6 +96,7 @@ namespace kernels {
         Pointer<Scheduler> scheduler;
 
         ActiveMQConsumerKernelConfig() : listener(NULL),
+                                         messageAvailableListener(NULL),
                                          transformer(NULL),
                                          listenerMutex(),
                                          deliveringAcks(),
@@ -1111,6 +1113,9 @@ void ActiveMQConsumerKernel::dispatch(co
                         // No listener, add it to the unconsumed messages list it will get pushed on the
                         // next receive call or when a new listener is added.
                         this->internal->unconsumedMessages->enqueue(dispatch);
+                        if (this->internal->messageAvailableListener != NULL) {
+                            this->internal->messageAvailableListener->onMessageAvailable(this);
+                        }
                     }
                 }
             }
@@ -1400,3 +1405,13 @@ void ActiveMQConsumerKernel::setPrefetch
 bool ActiveMQConsumerKernel::isInUse(Pointer<ActiveMQDestination> destination) const {
     return this->consumerInfo->getDestination()->equals(destination.get());
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+    this->internal->messageAvailableListener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageAvailableListener* ActiveMQConsumerKernel::getMessageAvailableListener() const {
+    return this->internal->messageAvailableListener;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Oct 15 22:26:50 2012
@@ -19,6 +19,7 @@
 
 #include <cms/MessageConsumer.h>
 #include <cms/MessageListener.h>
+#include <cms/MessageAvailableListener.h>
 #include <cms/Message.h>
 #include <cms/CMSException.h>
 
@@ -102,6 +103,10 @@ namespace kernels {
 
         virtual cms::MessageListener* getMessageListener() const;
 
+        virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener);
+
+        virtual cms::MessageAvailableListener* getMessageAvailableListener() const;
+
         virtual std::string getMessageSelector() const;
 
         virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp?rev=1398544&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp Mon Oct 15 22:26:50 2012
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageAvailableListener.h"
+
+using namespace cms;
+
+////////////////////////////////////////////////////////////////////////////////
+MessageAvailableListener::~MessageAvailableListener() {
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h?rev=1398544&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h Mon Oct 15 22:26:50 2012
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_MESSAGEAVAILABLELISTENER_H_
+#define _CMS_MESSAGEAVAILABLELISTENER_H_
+
+#include <cms/Config.h>
+
+namespace cms {
+
+    class MessageConsumer;
+
+    /**
+     * A listener interface similar to the MessageListener interface.  This listener
+     * is notified by its associated MessageConsumer that a new Message is available
+     * for processing when the consumer is in sync consumption mode.  A consumer with
+     * a registered MessageListener will not use this listener.
+     */
+    class CMS_API MessageAvailableListener {
+    public:
+
+        virtual ~MessageAvailableListener();
+
+        /**
+         * Indicates that a new Message has become available for consumption
+         * from the provided consumer, a call to receiveNoWait should return
+         * the message immediately.
+         *
+         * @param consumer
+         *      The consumer that now has a message queued for consumption.
+         */
+        virtual void onMessageAvailable(cms::MessageConsumer* consumer) = 0;
+
+    };
+
+}
+#endif /* _CMS_MESSAGEAVAILABLELISTENER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageAvailableListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageConsumer.h Mon Oct 15 22:26:50 2012
@@ -20,6 +20,7 @@
 
 #include <cms/Config.h>
 #include <cms/MessageListener.h>
+#include <cms/MessageAvailableListener.h>
 #include <cms/Message.h>
 #include <cms/Closeable.h>
 #include <cms/Startable.h>
@@ -142,6 +143,27 @@ namespace cms{
          */
         virtual cms::MessageTransformer* getMessageTransformer() const = 0;
 
+        /**
+         * Sets the MessageAvailableListener that this class will send events to if the consumer
+         * is in synchronous consumption mode and a new Message has arrived.
+         *
+         * @param listener
+         *      The listener of new message events fired by this consumer.
+         *
+         * @throws CMSException - If an internal error occurs.
+         */
+        virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) = 0;
+
+        /**
+         * Gets the MessageAvailableListener that this class will send mew Message
+         * notification events to.
+         *
+         * @return The listener of message events received by this consumer.
+         *
+         * @throws CMSException - If an internal error occurs.
+         */
+        virtual cms::MessageAvailableListener* getMessageAvailableListener() const = 0;
+
     };
 
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h?rev=1398544&r1=1398543&r2=1398544&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/cmsutil/DummyConsumer.h Mon Oct 15 22:26:50 2012
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-#ifndef ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
-#define ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
+#ifndef _ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
+#define _ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_
 
 #include <cms/MessageConsumer.h>
 #include <activemq/cmsutil/MessageContext.h>
@@ -24,11 +24,12 @@
 namespace activemq {
 namespace cmsutil {
 
-    class DummyConsumer : public cms::MessageConsumer {
+    class DummyConsumer: public cms::MessageConsumer {
     private:
 
         std::string selector;
         cms::MessageListener* listener;
+        cms::MessageAvailableListener* messageAvailableListener;
         cms::MessageTransformer* transformer;
         MessageContext* messageContext;
         const cms::Destination* dest;
@@ -44,36 +45,48 @@ namespace cmsutil {
             this->listener = NULL;
             this->transformer = NULL;
         }
-        virtual ~DummyConsumer() {}
 
-        virtual void close() {}
+        virtual ~DummyConsumer() {
+        }
 
-        virtual void start() {}
+        virtual void close() {
+        }
 
-        virtual void stop() {}
+        virtual void start() {
+        }
+
+        virtual void stop() {
+        }
 
-        virtual cms::Message* receive() throw ( cms::CMSException ) {
+        virtual cms::Message* receive() {
             return messageContext->receive(dest, selector, noLocal, 0);
         }
 
-        virtual cms::Message* receive( int millisecs ) throw ( cms::CMSException ) {
+        virtual cms::Message* receive(int millisecs) {
             return messageContext->receive(dest, selector, noLocal, millisecs);
         }
 
-        virtual cms::Message* receiveNoWait() throw ( cms::CMSException ) {
+        virtual cms::Message* receiveNoWait() {
             return messageContext->receive(dest, selector, noLocal, -1);
         }
 
-        virtual void setMessageListener( cms::MessageListener* listener ) throw ( cms::CMSException ) {
+        virtual void setMessageListener(cms::MessageListener* listener) {
             this->listener = listener;
         }
 
-        virtual cms::MessageListener* getMessageListener() const throw ( cms::CMSException ) {
+        virtual cms::MessageListener* getMessageListener() const {
             return listener;
         }
 
-        virtual std::string getMessageSelector() const
-            throw ( cms::CMSException ) {
+        virtual void setMessageAvailableListener(cms::MessageAvailableListener* listener) {
+            messageAvailableListener = listener;
+        }
+
+        virtual cms::MessageAvailableListener* getMessageAvailableListener() const {
+            return messageAvailableListener;
+        }
+
+        virtual std::string getMessageSelector() const {
             return selector;
         }
 
@@ -89,4 +102,4 @@ namespace cmsutil {
 
 }}
 
-#endif /*ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_*/
+#endif /*_ACTIVEMQ_CMSUTIL_DUMMYCONSUMER_H_*/