You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [2/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.h (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LossyQueue.h Fri Aug 10 12:04:27 2012
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_LOSSYQUEUE_H
+#define QPID_BROKER_LOSSYQUEUE_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,21 +21,21 @@
  * under the License.
  *
  */
-#include "qpid/broker/ExpiryPolicy.h"
-#include "qpid/broker/Message.h"
-#include "qpid/sys/Time.h"
+#include "qpid/broker/Queue.h"
 
 namespace qpid {
 namespace broker {
 
-ExpiryPolicy::~ExpiryPolicy() {}
-
-bool ExpiryPolicy::hasExpired(Message& m) {
-    return m.getExpiration() < sys::AbsTime::now();
-}
-
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
-    return sys::AbsTime::now();
-}
-
+/**
+ * Drops messages to prevent a breach of any configured maximum depth.
+ */
+class LossyQueue : public Queue
+{
+  public:
+    LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+    bool checkDepth(const QueueDepth& increment, const Message&);
+  private:
+};
 }} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_LOSSYQUEUE_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Lvq.h"
+#include "MessageMap.h"
+#include "qpid/sys/ClusterSafe.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace broker {
+Lvq::Lvq(const std::string& n, std::auto_ptr<MessageMap> m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b)
+    : Queue(n, s, ms, p, b), messageMap(*m)
+{
+    messages = m;
+}
+
+void Lvq::push(Message& message, bool isRecovery)
+{
+    qpid::sys::assertClusterSafe();
+    QueueListeners::NotificationSet copy;
+    Message old;
+    bool removed;
+    {
+        qpid::sys::Mutex::ScopedLock locker(messageLock);
+        message.setSequence(++sequence);
+        removed = messageMap.update(message, old);
+        listeners.populate(copy);
+        observeEnqueue(message, locker);
+        if (removed) {
+            if (mgmtObject) {
+                mgmtObject->inc_acquires();
+                mgmtObject->inc_discardsLvq();
+                if (brokerMgmtObject) {
+                    brokerMgmtObject->inc_acquires();
+                    brokerMgmtObject->inc_discardsLvq();
+                }
+            }
+            observeDequeue(old, locker);
+        }
+    }
+    copy.notify();
+    if (removed) {
+        if (isRecovery) pendingDequeues.push_back(old);
+        else dequeueFromStore(old.getPersistentContext());//do outside of lock
+    }
+}
+}} // namespace qpid::broker

Copied: qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.h (from r1371647, qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.h?p2=qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.h&p1=qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h&r1=1371647&r2=1371676&rev=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExpiryPolicy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Lvq.h Fri Aug 10 12:04:27 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_EXPIRYPOLICY_H
-#define QPID_BROKER_EXPIRYPOLICY_H
+#ifndef QPID_BROKER_LVQ_H
+#define QPID_BROKER_LVQ_H
 
 /*
  *
@@ -21,30 +21,25 @@
  * under the License.
  *
  */
-
-#include "qpid/RefCounted.h"
-#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/Queue.h"
 
 namespace qpid {
-
-namespace sys {
-class AbsTime;
-}
-
 namespace broker {
-
-class Message;
+class MessageMap;
 
 /**
- * Default expiry policy.
+ * Subclass of queue that handles last-value-queue semantics in
+ * conjunction with the MessageMap class. This requires an existing
+ * message to be 'replaced' by a newer message with the same key.
  */
-class QPID_BROKER_CLASS_EXTERN ExpiryPolicy : public RefCounted
+class Lvq : public Queue
 {
   public:
-    QPID_BROKER_EXTERN virtual ~ExpiryPolicy();
-    QPID_BROKER_EXTERN virtual bool hasExpired(Message&);
-    QPID_BROKER_EXTERN virtual qpid::sys::AbsTime getCurrentTime();
+    Lvq(const std::string&, std::auto_ptr<MessageMap>, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*);
+    void push(Message& msg, bool isRecovery=false);
+  private:
+    MessageMap& messageMap;
 };
 }} // namespace qpid::broker
 
-#endif  /*!QPID_BROKER_EXPIRYPOLICY_H*/
+#endif  /*!QPID_BROKER_LVQ_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MapHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MapHandler.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MapHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MapHandler.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_MAPHANDLER_H
+#define QPID_BROKER_MAPHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/IntegerTypes.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Interface for processing entries in some map-like object
+ */
+class MapHandler
+{
+  public:
+    typedef struct {
+        const char* data;
+        size_t size;
+    } CharSequence;
+
+    virtual ~MapHandler() {}
+    virtual void handleVoid(const CharSequence& key) = 0;
+    virtual void handleUint8(const CharSequence& key, uint8_t value) = 0;
+    virtual void handleUint16(const CharSequence& key, uint16_t value) = 0;
+    virtual void handleUint32(const CharSequence& key, uint32_t value) = 0;
+    virtual void handleUint64(const CharSequence& key, uint64_t value) = 0;
+    virtual void handleInt8(const CharSequence& key, int8_t value) = 0;
+    virtual void handleInt16(const CharSequence& key, int16_t value) = 0;
+    virtual void handleInt32(const CharSequence& key, int32_t value) = 0;
+    virtual void handleInt64(const CharSequence& key, int64_t value) = 0;
+    virtual void handleFloat(const CharSequence& key, float value) = 0;
+    virtual void handleDouble(const CharSequence& key, double value) = 0;
+    virtual void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& encoding) = 0;
+  private:
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_MAPHANDLER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Aug 10 12:04:27 2012
@@ -20,19 +20,12 @@
  */
 
 #include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/MapHandler.h"
 #include "qpid/StringUtils.h"
-#include "qpid/framing/frame_functors.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/framing/MessageTransferBody.h"
-#include "qpid/framing/SendContent.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/TypeFilter.h"
-#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
+#include <algorithm>
+#include <string.h>
 #include <time.h>
 
 using boost::intrusive_ptr;
@@ -41,492 +34,261 @@ using qpid::sys::Duration;
 using qpid::sys::TIME_MSEC;
 using qpid::sys::FAR_FUTURE;
 using std::string;
-using namespace qpid::framing;
 
 namespace qpid {
 namespace broker {
 
-TransferAdapter Message::TRANSFER;
-
-Message::Message(const framing::SequenceNumber& id) :
-    frames(id), persistenceId(0), redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
-    expiration(FAR_FUTURE), dequeueCallback(0),
-    inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
-{}
-
-Message::~Message() {}
-
-void Message::forcePersistent()
+Message::Message() : deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false) {}
+Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage> p)
+    : encoding(e), persistentContext(p), deliveryCount(0), publisher(0), expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false)
 {
-    sys::Mutex::ScopedLock l(lock);
-    // only set forced bit if we actually need to force.
-    if (! getAdapter().isPersistent(frames) ){
-        forcePersistentPolicy = true;
-    }
+    if (persistentContext) persistentContext->setIngressCompletion(e);
 }
+Message::~Message() {}
 
-bool Message::isForcedPersistent()
-{
-    return forcePersistentPolicy;
-}
 
 std::string Message::getRoutingKey() const
 {
-    return getAdapter().getRoutingKey(frames);
-}
-
-std::string Message::getExchangeName() const
-{
-    return getAdapter().getExchange(frames);
-}
-
-const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
-{
-    if (!exchange) {
-        exchange = registry.get(getExchangeName());
-    }
-    return exchange;
-}
-
-bool Message::isImmediate() const
-{
-    return getAdapter().isImmediate(frames);
-}
-
-const FieldTable* Message::getApplicationHeaders() const
-{
-    sys::Mutex::ScopedLock l(lock);
-    return getAdapter().getApplicationHeaders(frames);
-}
-
-std::string Message::getAppId() const
-{
-    sys::Mutex::ScopedLock l(lock);
-    return getAdapter().getAppId(frames);
+    return getEncoding().getRoutingKey();
 }
 
 bool Message::isPersistent() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
+    return getEncoding().isPersistent();
 }
 
-bool Message::requiresAccept()
+uint64_t Message::getContentSize() const
 {
-    return getAdapter().requiresAccept(frames);
+    return getEncoding().getContentSize();
 }
 
-uint32_t Message::getRequiredCredit()
+boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    if (!requiredCredit) {
-        //add up payload for all header and content frames in the frameset
-        SumBodySize sum;
-        frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
-        requiredCredit = sum.getSize();
-    }
-    return requiredCredit;
+    return encoding;
 }
 
-void Message::encode(framing::Buffer& buffer) const
+namespace
 {
-    sys::Mutex::ScopedLock l(lock);
-    //encode method and header frames
-    EncodeFrame f1(buffer);
-    frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-
-    //then encode the payload of each content frame
-    framing::EncodeBody f2(buffer);
-    frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+const std::string X_QPID_TRACE("x-qpid.trace");
 }
 
-void Message::encodeContent(framing::Buffer& buffer) const
+bool Message::isExcluded(const std::vector<std::string>& excludes) const
 {
-    sys::Mutex::ScopedLock l(lock);
-    //encode the payload of each content frame
-    EncodeBody f2(buffer);
-    frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+    std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+    if (traceStr.size()) {
+        std::vector<std::string> trace = split(traceStr, ", ");
+        for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
+            for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
+                if (*i == *j) {
+                    return true;
+                }
+            }
+        }
+    }
+    return false;
 }
 
-uint32_t Message::encodedSize() const
+void Message::addTraceId(const std::string& id)
 {
-    return encodedHeaderSize() + encodedContentSize();
+    std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
+    if (trace.empty()) {
+        annotations[X_QPID_TRACE] = id;
+    } else if (trace.find(id) == std::string::npos) {
+        trace += ",";
+        trace += id;
+        annotations[X_QPID_TRACE] = trace;
+    }
+    annotationsChanged();
 }
 
-uint32_t Message::encodedContentSize() const
+void Message::clearTrace()
 {
-    sys::Mutex::ScopedLock l(lock);
-    return  frames.getContentSize();
+    annotations[X_QPID_TRACE] = std::string();
+    annotationsChanged();
 }
 
-uint32_t Message::encodedHeaderSize() const
+void Message::setTimestamp()
 {
-    sys::Mutex::ScopedLock l(lock);   // prevent modifications while computing size
-    //add up the size for all method and header frames in the frameset
-    SumFrameSize sum;
-    frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
-    return sum.getSize();
+    timestamp = ::time(0);   // AMQP-0.10: posix time_t - secs since Epoch
 }
 
-void Message::decodeHeader(framing::Buffer& buffer)
+uint64_t Message::getTimestamp() const
 {
-    AMQFrame method;
-    method.decode(buffer);
-    frames.append(method);
-
-    AMQFrame header;
-    header.decode(buffer);
-    frames.append(header);
+    return timestamp;
 }
 
-void Message::decodeContent(framing::Buffer& buffer)
+uint64_t Message::getTtl() const
 {
-    if (buffer.available()) {
-        //get the data as a string and set that as the content
-        //body on a frame then add that frame to the frameset
-        AMQFrame frame((AMQContentBody()));
-        frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
-        frame.setFirstSegment(false);
-        frames.append(frame);
+    uint64_t ttl;
+    if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
+        sys::AbsTime current(
+            expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+        sys::Duration ttl(current, getExpiration());
+        // convert from ns to ms; set to 1 if expired
+        return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
     } else {
-        //adjust header flags
-        MarkLastSegment f;
-        frames.map_if(f, TypeFilter<HEADER_BODY>());
-    }
-    //mark content loaded
-    loaded = true;
-}
-
-// Used for testing only
-void Message::tryReleaseContent()
-{
-    if (checkContentReleasable()) {
-        releaseContent();
-    }
-}
-
-void Message::releaseContent(MessageStore* s)
-{
-    //deprecated, use setStore(store); releaseContent(); instead
-    if (!store) setStore(s);
-    releaseContent();
-}
-
-void Message::releaseContent()
-{
-    sys::Mutex::ScopedLock l(lock);
-    if (store) {
-        if (!getPersistenceId()) {
-            intrusive_ptr<PersistableMessage> pmsg(this);
-            store->stage(pmsg);
-            staged = true;
-        }
-        //ensure required credit and size is cached before content frames are released
-        getRequiredCredit();
-        contentSize();
-        //remove any content frames from the frameset
-        frames.remove(TypeFilter<CONTENT_BODY>());
-        setContentReleased();
+        return 0;
     }
 }
 
-void Message::destroy()
+void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
 {
-    if (staged) {
-        if (store) {
-            store->destroy(*this);
-        } else {
-            QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
+    //TODO: this is still quite 0-10 specific...
+    uint64_t ttl;
+    if (getEncoding().getTtl(ttl)) {
+        if (e) {
+            // Use higher resolution time for the internal expiry calculation.
+            // Prevent overflow as a signed int64_t
+            Duration duration(std::min(ttl * TIME_MSEC,
+                                       (uint64_t) std::numeric_limits<int64_t>::max()));
+            expiration = AbsTime(e->getCurrentTime(), duration);
+            setExpiryPolicy(e);
         }
     }
 }
 
-bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
+void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
 {
-    intrusive_ptr<const PersistableMessage> pmsg(this);
-
-    bool done = false;
-    string& data = frame.castBody<AMQContentBody>()->getData();
-    store->loadContent(queue, pmsg, data, offset, maxContentSize);
-    done = data.size() < maxContentSize;
-    frame.setBof(false);
-    frame.setEof(true);
-    QPID_LOG(debug, "loaded frame" << frame);
-    if (offset > 0) {
-        frame.setBos(false);
-    }
-    if (!done) {
-        frame.setEos(false);
-    } else return false;
-    return true;
-}
-
-void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
-{
-    sys::Mutex::ScopedLock l(lock);
-    if (isContentReleased() && !frames.isComplete()) {
-        sys::Mutex::ScopedUnlock u(lock);
-        uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
-        bool morecontent = true;
-        for (uint64_t offset = 0; morecontent; offset += maxContentSize)
-        {
-            AMQFrame frame((AMQContentBody()));
-            morecontent = getContentFrame(queue, frame, maxContentSize, offset);
-            out.handle(frame);
-        }
-        queue.countLoadedFromDisk(contentSize());
-    } else {
-        Count c;
-        frames.map_if(c, TypeFilter<CONTENT_BODY>());
-
-        SendContent f(out, maxFrameSize, c.getCount());
-        frames.map_if(f, TypeFilter<CONTENT_BODY>());
-    }
+    annotations[key] = value;
+    annotationsChanged();
 }
 
-void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
+void Message::annotationsChanged()
 {
-    sys::Mutex::ScopedLock l(lock);
-    Relay f(out);
-    frames.map_if(f, TypeFilter<HEADER_BODY>());
-    //as frame (and pointer to body) has now been passed to handler,
-    //subsequent modifications should use a copy
-    copyHeaderOnWrite = true;
-}
-
-// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
-// 0-8/0-9 message differences.
-MessageAdapter& Message::getAdapter() const
-{
-    if (!adapter) {
-        if(frames.isA<MessageTransferBody>()) {
-            adapter = &TRANSFER;
-        } else {
-            const AMQMethodBody* method = frames.getMethod();
-            if (!method) throw Exception("Can't adapt message with no method");
-            else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
-        }
+    if (persistentContext) {
+        persistentContext = persistentContext->merge(annotations);
+        persistentContext->setIngressCompletion(encoding);
     }
-    return *adapter;
 }
 
-uint64_t Message::contentSize() const
-{
-    return frames.getContentSize();
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+    expiryPolicy = e;
 }
 
-bool Message::isContentLoaded() const
+bool Message::hasExpired() const
 {
-    return loaded;
+    return expiryPolicy && expiryPolicy->hasExpired(*this);
 }
 
-
-namespace
+uint8_t Message::getPriority() const
 {
-const std::string X_QPID_TRACE("x-qpid.trace");
+    return getEncoding().getPriority();
 }
 
-bool Message::isExcluded(const std::vector<std::string>& excludes) const
+bool Message::getIsManagementMessage() const { return isManagementMessage; }
+void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+qpid::framing::SequenceNumber Message::getSequence() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    const FieldTable* headers = getApplicationHeaders();
-    if (headers) {
-        std::string traceStr = headers->getAsString(X_QPID_TRACE);
-        if (traceStr.size()) {
-            std::vector<std::string> trace = split(traceStr, ", ");
-
-            for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
-                for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
-                    if (*i == *j) {
-                        return true;
-                    }
-                }
-            }
-        }
-    }
-    return false;
+    return sequence;
 }
-
-class CloneHeaderBody
+void Message::setSequence(const qpid::framing::SequenceNumber& s)
 {
-public:
-    void operator()(AMQFrame& f)
-    {
-        f.cloneBody();
-    }
-};
-
-AMQHeaderBody* Message::getHeaderBody()
-{
-    // expects lock to be held
-    if (copyHeaderOnWrite) {
-        CloneHeaderBody f;
-        frames.map_if(f, TypeFilter<HEADER_BODY>());
-        copyHeaderOnWrite = false;
-    }
-    return frames.getHeaders();
+    sequence = s;
 }
 
-void Message::addTraceId(const std::string& id)
+MessageState Message::getState() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    if (isA<MessageTransferBody>()) {
-        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
-        std::string trace = headers.getAsString(X_QPID_TRACE);
-        if (trace.empty()) {
-            headers.setString(X_QPID_TRACE, id);
-        } else if (trace.find(id) == std::string::npos) {
-            trace += ",";
-            trace += id;
-            headers.setString(X_QPID_TRACE, trace);
-        }
-    }
+    return state;
 }
-
-void Message::clearTrace()
+void Message::setState(MessageState s)
 {
-    sys::Mutex::ScopedLock l(lock);
-    if (isA<MessageTransferBody>()) {
-        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
-        std::string trace = headers.getAsString(X_QPID_TRACE);
-        if (!trace.empty()) {
-            headers.setString(X_QPID_TRACE, "");
-        }
-    }
+    state = s;
 }
 
-void Message::setTimestamp()
+const qpid::types::Variant::Map& Message::getAnnotations() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
-    time_t now = ::time(0);
-    props->setTimestamp(now);   // AMQP-0.10: posix time_t - secs since Epoch
+    return annotations;
 }
 
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+qpid::types::Variant Message::getAnnotation(const std::string& key) const
 {
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
-    if (props->getTtl()) {
-        // AMQP requires setting the expiration property to be posix
-        // time_t in seconds. TTL is in milliseconds
-        if (!props->getExpiration()) {
-            //only set expiration in delivery properties if not already set
-            time_t now = ::time(0);
-            props->setExpiration(now + (props->getTtl()/1000));
-        }
-        if (e) {
-            // Use higher resolution time for the internal expiry calculation.
-            // Prevent overflow as a signed int64_t
-            Duration ttl(std::min(props->getTtl() * TIME_MSEC,
-                                  (uint64_t) std::numeric_limits<int64_t>::max()));
-            expiration = AbsTime(e->getCurrentTime(), ttl);
-            setExpiryPolicy(e);
-        }
-    }
+    qpid::types::Variant::Map::const_iterator i = annotations.find(key);
+    if (i != annotations.end()) return i->second;
+    //FIXME: modify Encoding interface to allow retrieval of
+    //annotations of different types from the message data as received
+    //off the wire
+    return qpid::types::Variant(getEncoding().getAnnotationAsString(key));
 }
 
-void Message::adjustTtl()
+std::string Message::getUserId() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
-    if (props->getTtl()) {
-        if (expiration < FAR_FUTURE) {
-            sys::AbsTime current(
-                expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
-            sys::Duration ttl(current, getExpiration());
-            // convert from ns to ms; set to 1 if expired
-            props->setTtl(int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
-        }
-    }
+    return encoding->getUserId();
 }
 
-void Message::setRedelivered()
+Message::Encoding& Message::getEncoding()
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+    return *encoding;
 }
-
-void Message::insertCustomProperty(const std::string& key, int64_t value)
+const Message::Encoding& Message::getEncoding() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+    return *encoding;
 }
-
-void Message::insertCustomProperty(const std::string& key, const std::string& value)
+Message::operator bool() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+    return encoding;
 }
 
-void Message::removeCustomProperty(const std::string& key)
+std::string Message::getContent() const
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+    return encoding->getContent();
 }
 
-void Message::setExchange(const std::string& exchange)
+std::string Message::getPropertyAsString(const std::string& key) const
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+    return encoding->getPropertyAsString(key);
 }
-
-void Message::clearApplicationHeadersFlag()
+namespace {
+class PropertyRetriever : public MapHandler
 {
-    sys::Mutex::ScopedLock l(lock);
-    getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
-}
-
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
-    expiryPolicy = e;
-}
+  public:
+    PropertyRetriever(const std::string& key) : name(key) {}
+    void handleVoid(const CharSequence&) {}
+    void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
+    void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
+    void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
+    void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
+    void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
+    void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
+    void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
+    void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
+    void handleFloat(const CharSequence& key, float value) { handle(key, value); }
+    void handleDouble(const CharSequence& key, double value) { handle(key, value); }
+    void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
+    {
+        if (matches(key)) result = std::string(value.data, value.size);
+    }
+    qpid::types::Variant getResult() { return result; }
 
-bool Message::hasExpired()
-{
-    return expiryPolicy && expiryPolicy->hasExpired(*this);
-}
+  private:
+    std::string name;
+    qpid::types::Variant result;
 
-namespace {
-struct ScopedSet {
-    sys::Monitor& lock;
-    bool& flag;
-    ScopedSet(sys::Monitor& l, bool& f) : lock(l), flag(f) {
-        sys::Monitor::ScopedLock sl(lock);
-        flag = true;
+    bool matches(const CharSequence& key)
+    {
+        return ::strncmp(key.data, name.data(), std::min(key.size, name.size())) == 0;
     }
-    ~ScopedSet(){
-        sys::Monitor::ScopedLock sl(lock);
-        flag = false;
-        lock.notifyAll();
+
+    template <typename T> void handle(const CharSequence& key, T value)
+    {
+        if (matches(key)) result = value;
     }
 };
 }
-
-void Message::allDequeuesComplete() {
-    ScopedSet ss(callbackLock, inCallback);
-    MessageCallback* cb = dequeueCallback;
-    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
-}
-
-void Message::setDequeueCompleteCallback(MessageCallback& cb) {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    dequeueCallback = &cb;
+qpid::types::Variant Message::getProperty(const std::string& key) const
+{
+    PropertyRetriever r(key);
+    encoding->processProperties(r);
+    return r.getResult();
 }
 
-void Message::resetDequeueCompleteCallback() {
-    sys::Mutex::ScopedLock l(callbackLock);
-    while (inCallback) callbackLock.wait();
-    dequeueCallback = 0;
+boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
+{
+    return persistentContext;
 }
 
-uint8_t Message::getPriority() const {
-    sys::Mutex::ScopedLock l(lock);
-    return getAdapter().getPriority(frames);
+void Message::processProperties(MapHandler& handler) const
+{
+    encoding->processProperties(handler);
 }
 
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Aug 10 12:04:27 2012
@@ -23,194 +23,131 @@
  */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/PersistableMessage.h"
-#include "qpid/broker/MessageAdapter.h"
-#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
-#include <boost/function.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <memory>
+#include "qpid/types/Variant.h"
+//TODO: move the following out of framing or replace it
+#include "qpid/framing/SequenceNumber.h"
 #include <string>
 #include <vector>
 
-namespace qpid {
-
-namespace framing {
-class AMQBody;
-class AMQHeaderBody;
-class FieldTable;
-class SequenceNumber;
-}
+#include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableMessage.h"
 
+namespace qpid {
 namespace broker {
 class ConnectionToken;
-class Exchange;
-class ExchangeRegistry;
-class MessageStore;
-class Queue;
-class ExpiryPolicy;
+class MapHandler;
+
+enum MessageState
+{
+    AVAILABLE=1,
+    ACQUIRED=2,
+    DELETED=4,
+    UNAVAILABLE=8
+};
 
-class Message : public PersistableMessage {
+class Message {
 public:
-    typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
+    class Encoding : public AsyncCompletion
+    {
+      public:
+        virtual ~Encoding() {}
+        virtual std::string getRoutingKey() const = 0;
+        virtual bool isPersistent() const = 0;
+        virtual uint8_t getPriority() const = 0;
+        virtual uint64_t getContentSize() const = 0;
+        virtual std::string getPropertyAsString(const std::string& key) const = 0;
+        virtual std::string getAnnotationAsString(const std::string& key) const = 0;
+        virtual bool getTtl(uint64_t&) const = 0;
+        virtual bool hasExpiration() const = 0;
+        virtual std::string getContent() const = 0;
+        virtual void processProperties(MapHandler&) const = 0;
+        virtual std::string getUserId() const = 0;
+    };
 
-    QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
+    QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
+    QPID_BROKER_EXTERN Message();
     QPID_BROKER_EXTERN ~Message();
 
-    uint64_t getPersistenceId() const { return persistenceId; }
-    void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; }
-
-    bool getRedelivered() const { return redelivered; }
-    void redeliver() { redelivered = true; }
+    bool isRedelivered() const { return deliveryCount > 1; }
+    void deliver() { ++deliveryCount; }
+    void undeliver() { --deliveryCount; }
+    int getDeliveryCount() const { return deliveryCount; }
+    void resetDeliveryCount() { deliveryCount = 0; }
 
     const ConnectionToken* getPublisher() const {  return publisher; }
     void setPublisher(ConnectionToken* p) {  publisher = p; }
 
-    const framing::SequenceNumber& getCommandId() { return frames.getId(); }
-
-    QPID_BROKER_EXTERN uint64_t contentSize() const;
 
     QPID_BROKER_EXTERN std::string getRoutingKey() const;
-    const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
-    QPID_BROKER_EXTERN std::string getExchangeName() const;
-    bool isImmediate() const;
-    QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
-    QPID_BROKER_EXTERN std::string getAppId() const;
     QPID_BROKER_EXTERN bool isPersistent() const;
-    bool requiresAccept();
 
     /** determine msg expiration time using the TTL value if present */
     QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e);
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
-    bool hasExpired();
+
+    bool hasExpired() const;
     sys::AbsTime getExpiration() const { return expiration; }
     void setExpiration(sys::AbsTime exp) { expiration = exp; }
-    void adjustTtl();
-    void setRedelivered();
-    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
-    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
-    QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
-    void setExchange(const std::string&);
-    void clearApplicationHeadersFlag();
+    uint64_t getTtl() const;
+
     /** set the timestamp delivery property to the current time-of-day */
     QPID_BROKER_EXTERN void setTimestamp();
+    QPID_BROKER_EXTERN uint64_t getTimestamp() const;
 
-    framing::FrameSet& getFrames() { return frames; }
-    const framing::FrameSet& getFrames() const { return frames; }
-
-    template <class T> const T* getProperties() const {
-        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>();
-    }
-
-    template <class T> const T* hasProperties() const {
-        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>();
-    }
-
-    template <class T> void eraseProperties() {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        p->erase<T>();
-    }
-
-    template <class T> const T* getMethod() const {
-        return frames.as<T>();
-    }
-
-    template <class T> T* getMethod() {
-        return frames.as<T>();
-    }
-
-    template <class T> bool isA() const {
-        return frames.isA<T>();
-    }
-
-    uint32_t getRequiredCredit();
-
-    void encode(framing::Buffer& buffer) const;
-    void encodeContent(framing::Buffer& buffer) const;
-
-    /**
-     * @returns the size of the buffer needed to encode this
-     * message in its entirety
-     */
-    uint32_t encodedSize() const;
-    /**
-     * @returns the size of the buffer needed to encode the
-     * 'header' of this message (not just the header frame,
-     * but other meta data e.g.routing key and exchange)
-     */
-    uint32_t encodedHeaderSize() const;
-    uint32_t encodedContentSize() const;
-
-    QPID_BROKER_EXTERN void decodeHeader(framing::Buffer& buffer);
-    QPID_BROKER_EXTERN void decodeContent(framing::Buffer& buffer);
-
-    void QPID_BROKER_EXTERN tryReleaseContent();
-    void releaseContent();
-    void releaseContent(MessageStore* s);//deprecated, use 'setStore(store); releaseContent();' instead
-    void destroy();
-
-    bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
-    QPID_BROKER_EXTERN void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
-    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
-
-    QPID_BROKER_EXTERN bool isContentLoaded() const;
-
-    bool isExcluded(const std::vector<std::string>& excludes) const;
-    void addTraceId(const std::string& id);
-    void clearTrace();
-
-    void forcePersistent();
-    bool isForcedPersistent();
-
-    /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
-    void setDequeueCompleteCallback(MessageCallback& cb);
-    void resetDequeueCompleteCallback();
+    QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
+    QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const;
+    QPID_BROKER_EXTERN void addTraceId(const std::string& id);
+    QPID_BROKER_EXTERN void clearTrace();
+    QPID_BROKER_EXTERN uint8_t getPriority() const;
+    QPID_BROKER_EXTERN std::string getPropertyAsString(const std::string& key) const;
+    QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
+    void processProperties(MapHandler&) const;
+
+    QPID_BROKER_EXTERN uint64_t getContentSize() const;
+
+    Encoding& getEncoding();
+    const Encoding& getEncoding() const;
+    QPID_BROKER_EXTERN operator bool() const;
 
-    uint8_t getPriority() const;
     bool getIsManagementMessage() const;
     void setIsManagementMessage(bool b);
-  private:
-    MessageAdapter& getAdapter() const;
-    void allDequeuesComplete();
 
-    mutable sys::Mutex lock;
-    framing::FrameSet frames;
-    mutable boost::shared_ptr<Exchange> exchange;
-    mutable uint64_t persistenceId;
-    bool redelivered;
-    bool loaded;
-    bool staged;
-    bool forcePersistentPolicy; // used to force message as durable, via a broker policy
-    ConnectionToken* publisher;
-    mutable MessageAdapter* adapter;
-    qpid::sys::AbsTime expiration;
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const;
+    QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&);
 
-    static TransferAdapter TRANSFER;
+    MessageState getState() const;
+    void setState(MessageState);
 
-    mutable boost::intrusive_ptr<Message> empty;
+    QPID_BROKER_EXTERN qpid::types::Variant getAnnotation(const std::string& key) const;
+    QPID_BROKER_EXTERN const qpid::types::Variant::Map& getAnnotations() const;
+    std::string getUserId() const;
 
-    sys::Monitor callbackLock;
-    MessageCallback* dequeueCallback;
-    bool inCallback;
+    QPID_BROKER_EXTERN std::string getContent() const;//TODO: may be better to get rid of this...
 
-    uint32_t requiredCredit;
+    QPID_BROKER_EXTERN boost::intrusive_ptr<AsyncCompletion> getIngressCompletion() const;
+    QPID_BROKER_EXTERN boost::intrusive_ptr<PersistableMessage> getPersistentContext() const;
+  private:
+    boost::intrusive_ptr<Encoding> encoding;
+    boost::intrusive_ptr<PersistableMessage> persistentContext;
+    int deliveryCount;
+    ConnectionToken* publisher;
+    qpid::sys::AbsTime expiration;
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    uint64_t timestamp;
+    qpid::types::Variant::Map annotations;
     bool isManagementMessage;
-      mutable bool copyHeaderOnWrite;
+    MessageState state;
+    qpid::framing::SequenceNumber sequence;
 
-    /**
-     * Expects lock to be held
-     */
-    template <class T> T* getModifiableProperties() {
-        return getHeaderBody()->get<T>(true);
-    }
-    qpid::framing::AMQHeaderBody* getHeaderBody();
+    void annotationsChanged();
 };
 
+QPID_BROKER_EXTERN void encode(const Message&, std::string&);
+QPID_BROKER_EXTERN void decode(const std::string&, Message&);
+
 }}
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Fri Aug 10 12:04:27 2012
@@ -21,10 +21,11 @@
 #include "qpid/broker/MessageBuilder.h"
 
 #include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
 
 using boost::intrusive_ptr;
 using namespace qpid::broker;
@@ -36,8 +37,7 @@ namespace
     const std::string QPID_MANAGEMENT("qpid.management");
 }
 
-MessageBuilder::MessageBuilder(MessageStore* const _store) :
-    state(DORMANT), store(_store) {}
+MessageBuilder::MessageBuilder() : state(DORMANT) {}
 
 void MessageBuilder::handle(AMQFrame& frame)
 {
@@ -45,6 +45,7 @@ void MessageBuilder::handle(AMQFrame& fr
     switch(state) {
     case METHOD:
         checkType(METHOD_BODY, type);
+        exchange = frame.castBody<qpid::framing::MessageTransferBody>()->getDestination();
         state = HEADER;
         break;
     case HEADER:
@@ -55,7 +56,9 @@ void MessageBuilder::handle(AMQFrame& fr
             header.setBof(false);
             header.setEof(false);
             message->getFrames().append(header);
-        } else if (type != HEADER_BODY) {
+        } else if (type == HEADER_BODY) {
+            frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+        } else {
             throw CommandInvalidException(
                 QPID_MSG("Invalid frame sequence for message, expected header or content got "
                          << type_str(type) << ")"));
@@ -73,14 +76,14 @@ void MessageBuilder::handle(AMQFrame& fr
 
 void MessageBuilder::end()
 {
+    message->computeRequiredCredit();
     message = 0;
     state = DORMANT;
 }
 
 void MessageBuilder::start(const SequenceNumber& id)
 {
-    message = intrusive_ptr<Message>(new Message(id));
-    message->setStore(store);
+    message = intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer>(new qpid::broker::amqp_0_10::MessageTransfer(id));
     state = METHOD;
 }
 
@@ -112,3 +115,5 @@ void MessageBuilder::checkType(uint8_t e
                                                << type_str(expected) << " got " << type_str(actual) << ")"));
     }
 }
+
+boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> MessageBuilder::getMessage() { return message; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Fri Aug 10 12:04:27 2012
@@ -30,21 +30,22 @@
 
 namespace qpid {
     namespace broker {
-        class Message;
-        class MessageStore;
+        namespace amqp_0_10 {
+        class MessageTransfer;
+        }
 
         class QPID_BROKER_CLASS_EXTERN MessageBuilder : public framing::FrameHandler{
         public:
-            QPID_BROKER_EXTERN MessageBuilder(MessageStore* const store);
+            QPID_BROKER_EXTERN MessageBuilder();
             QPID_BROKER_EXTERN void handle(framing::AMQFrame& frame);
-            boost::intrusive_ptr<Message> getMessage() { return message; }
+            boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
             QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
             void end();
         private:
             enum State {DORMANT, METHOD, HEADER, CONTENT};
             State state;
-            boost::intrusive_ptr<Message> message;
-            MessageStore* const store;
+            boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
+            std::string exchange;
 
             void checkType(uint8_t expected, uint8_t actual);
         };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.cpp Fri Aug 10 12:04:27 2012
@@ -19,218 +19,71 @@
  *
  */
 #include "qpid/broker/MessageDeque.h"
-#include "qpid/broker/QueuedMessage.h"
-#include "qpid/log/Statement.h"
 #include "assert.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
+namespace {
+Message padding(qpid::framing::SequenceNumber id) {
+    Message m;
+    m.setState(DELETED);
+    m.setSequence(id);
+    return m;
+}
+}
 
-MessageDeque::MessageDeque() : available(0), head(0) {}
+using qpid::framing::SequenceNumber;
 
-size_t MessageDeque::index(const framing::SequenceNumber& position)
+MessageDeque::MessageDeque() : messages(&padding) {}
+
+
+bool MessageDeque::deleted(const QueueCursor& cursor)
 {
-    //assuming a monotonic sequence, with no messages removed except
-    //from the ends of the deque, we can use the position to determin
-    //an index into the deque
-    if (messages.empty() || position < messages.front().position) return 0;
-    return position - messages.front().position;
-}
-
-bool MessageDeque::deleted(const QueuedMessage& m)
-{
-    size_t i = index(m.position);
-    if (i < messages.size()) {
-        QueuedMessage *qm = &messages[i];
-        if (qm->status != QueuedMessage::DELETED) {
-            qm->status = QueuedMessage::DELETED;
-            qm->payload = 0; // message no longer needed
-            clean();
-            return true;
-        }
-    }
-    return false;
+    return messages.deleted(cursor);
 }
 
-size_t MessageDeque::size()
+void MessageDeque::publish(const Message& added)
 {
-    return available;
+    messages.publish(added);
 }
 
-QueuedMessage* MessageDeque::releasePtr(const QueuedMessage& message)
+Message* MessageDeque::release(const QueueCursor& cursor)
 {
-    size_t i = index(message.position);
-    if (i < messages.size()) {
-        QueuedMessage& m = messages[i];
-        if (m.status == QueuedMessage::ACQUIRED) {
-            if (head > i) head = i;
-            m.status = QueuedMessage::AVAILABLE;
-            ++available;
-            return &messages[i];
-        }
-    } else {
-        assert(0);
-        QPID_LOG(error, "Failed to release message at " << message.position << " " << message.payload->getFrames().getContent() << "; no such message (index=" << i << ", size=" << messages.size() << ")");
-    }
-    return 0;
-}
-
-void MessageDeque::release(const QueuedMessage& message) { releasePtr(message); }
-
-bool MessageDeque::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
-{
-    if (position < messages.front().position) return false;
-    size_t i = index(position);
-    if (i < messages.size()) {
-        QueuedMessage& temp = messages[i];
-        if (temp.status == QueuedMessage::AVAILABLE) {
-            temp.status = QueuedMessage::ACQUIRED;
-            --available;
-            message = temp;
-            return true;
-        }
-    }
-    return false;
-}
-
-bool MessageDeque::find(const framing::SequenceNumber& position, QueuedMessage& message)
-{
-    size_t i = index(position);
-    if (i < messages.size()) {
-        message = messages[i];
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool MessageDeque::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
-{
-    //get first message that is greater than position
-    size_t i = index(position + 1);
-    while (i < messages.size()) {
-        QueuedMessage& m = messages[i++];
-        if (m.status == QueuedMessage::AVAILABLE || (!unacquired && m.status == QueuedMessage::ACQUIRED)) {
-            message = m;
-            return true;
-        }
-    }
-    return false;
-}
-
-bool MessageDeque::consume(QueuedMessage& message)
-{
-    while (head < messages.size()) {
-        QueuedMessage& i = messages[head++];
-        if (i.status == QueuedMessage::AVAILABLE) {
-            i.status = QueuedMessage::ACQUIRED;
-            --available;
-            message = i;
-            return true;
-        }
-    }
-    return false;
+    return messages.release(cursor);
 }
 
-namespace {
-QueuedMessage padding(uint32_t pos) {
-    return QueuedMessage(0, 0, pos, QueuedMessage::DELETED);
+Message* MessageDeque::next(QueueCursor& cursor)
+{
+    return messages.next(cursor);
 }
-} // namespace
 
-QueuedMessage* MessageDeque::pushPtr(const QueuedMessage& added) {
-    //add padding to prevent gaps in sequence, which break the index
-    //calculation (needed for queue replication)
-    while (messages.size() && (added.position - messages.back().position) > 1)
-        messages.push_back(padding(messages.back().position + 1));
-    messages.push_back(added);
-    messages.back().status = QueuedMessage::AVAILABLE;
-    if (head >= messages.size()) head = messages.size() - 1;
-    ++available;
-    clean();  // QPID-4046: let producer help clean the backlog of deleted messages
-    return &messages.back();
-}
-
-bool MessageDeque::push(const QueuedMessage& added, QueuedMessage& /*not needed*/) {
-    pushPtr(added);
-    return false; // adding a message never causes one to be removed for deque
-}
-
-void MessageDeque::updateAcquired(const QueuedMessage& acquired)
-{
-    // Pad the front of the queue if necessary
-    while (messages.size() && (acquired.position < messages.front().position))
-        messages.push_front(padding(uint32_t(messages.front().position) - 1));
-    size_t i = index(acquired.position);
-    if (i < messages.size()) {  // Replace an existing padding message
-        assert(messages[i].status == QueuedMessage::DELETED);
-        messages[i] = acquired;
-        messages[i].status = QueuedMessage::ACQUIRED;
-    }
-    else {                      // Push to the back
-        // Pad the back of the queue if necessary
-        while (messages.size() && (acquired.position - messages.back().position) > 1)
-            messages.push_back(padding(messages.back().position + 1));
-        assert(!messages.size() || (acquired.position - messages.back().position) == 1);
-        messages.push_back(acquired);
-        messages.back().status = QueuedMessage::ACQUIRED;
-    }
+size_t MessageDeque::size()
+{
+    return messages.size();
 }
 
-namespace {
-bool isNotDeleted(const QueuedMessage& qm) { return qm.status != QueuedMessage::DELETED; }
-} // namespace
+Message* MessageDeque::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+    return messages.find(position, cursor);
+}
 
-void MessageDeque::setPosition(const framing::SequenceNumber& n) {
-    size_t i = index(n+1);
-    if (i >= messages.size()) return; // Nothing to do.
-
-    // Assertion to verify the precondition: no messaages after n.
-    assert(std::find_if(messages.begin()+i, messages.end(), &isNotDeleted) ==
-           messages.end());
-    messages.erase(messages.begin()+i, messages.end());
-    if (head >= messages.size()) head = messages.size() - 1;
-    // Re-count the available messages
-    available = 0;
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE) ++available;
-    }
-}
-
-void MessageDeque::clean()
-{
-    // QPID-4046: If a queue has multiple consumers, then it is possible for a large
-    // collection of deleted messages to build up.  Limit the number of messages cleaned
-    // up on each call to clean().
-    size_t count = 0;
-    while (messages.size() && messages.front().status == QueuedMessage::DELETED && count < 10) {
-        messages.pop_front();
-        count += 1;
-    }
-    head = (head > count) ? head - count : 0;
+Message* MessageDeque::find(const QueueCursor& cursor)
+{
+    return messages.find(cursor);
 }
 
 void MessageDeque::foreach(Functor f)
 {
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE) {
-            f(*i);
-        }
-    }
-}
-
-void MessageDeque::removeIf(Predicate p)
-{
-    for (Deque::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->status == QueuedMessage::AVAILABLE && p(*i)) {
-            //Use special status for this as messages are not yet
-            //dequeued, but should not be considered on the queue
-            //either (used for purging and moving)
-            i->status = QueuedMessage::REMOVED;
-            --available;
-        }
-    }
-    clean();
+    messages.foreach(f);
+}
+
+void MessageDeque::resetCursors()
+{
+    messages.resetCursors();
 }
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDeque.h Fri Aug 10 12:04:27 2012
@@ -22,8 +22,7 @@
  *
  */
 #include "qpid/broker/Messages.h"
-#include "qpid/broker/QueuedMessage.h"
-#include <deque>
+#include "qpid/broker/IndexedDeque.h"
 
 namespace qpid {
 namespace broker {
@@ -36,31 +35,20 @@ class MessageDeque : public Messages
   public:
     MessageDeque();
     size_t size();
-    bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void updateAcquired(const QueuedMessage& acquired);
-    void setPosition(const framing::SequenceNumber&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
+
     void foreach(Functor);
-    void removeIf(Predicate);
 
-    // For use by other Messages implementations that use MessageDeque as a FIFO index
-    // and keep pointers to its elements in their own indexing strctures.
-    void clean();
-    QueuedMessage* releasePtr(const QueuedMessage&);
-    QueuedMessage* pushPtr(const QueuedMessage& added);
+    void resetCursors();
 
   private:
-    typedef std::deque<QueuedMessage> Deque;
+    typedef IndexedDeque<Message> Deque;
     Deque messages;
-    size_t available;
-    size_t head;
-
-    size_t index(const framing::SequenceNumber&);
 };
 }} // namespace qpid::broker
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageDistributor.h Fri Aug 10 12:04:27 2012
@@ -21,51 +21,28 @@
  * under the License.
  *
  */
-
+#include "qpid/types/Variant.h"
 /** Abstraction used by Queue to determine the next "most desirable" message to provide to
  * a particular consuming client
  */
 
-
-#include "qpid/broker/Consumer.h"
-
 namespace qpid {
 namespace broker {
 
-struct QueuedMessage;
+class Message;
 
 class MessageDistributor
 {
  public:
     virtual ~MessageDistributor() {};
 
-    /** Locking Note: all methods assume the caller is holding the Queue::messageLock
-     * during the method call.
-     */
-
-    /** Determine the next message available for consumption by the consumer
-     * @param consumer the consumer that needs a message to consume
-     * @param next set to the next message that the consumer may consume.
-     * @return true if message is available and next is set
-     */
-    virtual bool nextConsumableMessage( Consumer::shared_ptr& consumer,
-                                        QueuedMessage& next ) = 0;
-
-    /** Allow the comsumer to take ownership of the given message.
+    /**
+     * Determine whether the named consumer can take ownership of the specified message.
      * @param consumer the name of the consumer that is attempting to acquire the message
-     * @param qm the message to be acquired, previously returned from nextConsumableMessage()
+     * @param target the message to be acquired
      * @return true if ownership is permitted, false if ownership cannot be assigned.
      */
-    virtual bool allocate( const std::string& consumer,
-                           const QueuedMessage& target) = 0;
-
-    /** Determine the next message available for browsing by the consumer
-     * @param consumer the consumer that is browsing the queue
-     * @param next set to the next message that the consumer may browse.
-     * @return true if a message is available and next is returned
-     */
-    virtual bool nextBrowsableMessage( Consumer::shared_ptr& consumer,
-                                       QueuedMessage& next ) = 0;
+    virtual bool acquire(const std::string& consumer, Message& target) = 0;
 
     /** hook to add any interesting management state to the status map */
     virtual void query(qpid::types::Variant::Map&) const = 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp Fri Aug 10 12:04:27 2012
@@ -1,4 +1,4 @@
-/*
+ /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,10 +20,16 @@
  */
 
 #include "qpid/broker/MessageGroupManager.h"
-
-#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Messages.h"
+#include "qpid/broker/MessageDeque.h"
+#include "qpid/broker/QueueSettings.h"
+#include "qpid/framing/Array.h"
+#include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/framing/TypeCode.h"
+#include "qpid/types/Variant.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Variant.h"
 
@@ -75,24 +81,16 @@ void MessageGroupManager::disown( GroupS
     freeGroups[state.members.front().position] = &state;
 }
 
-MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm )
+MessageGroupManager::GroupState& MessageGroupManager::findGroup( const Message& m )
 {
-    uint32_t thisMsg = qm.position.getValue();
+    uint32_t thisMsg = m.getSequence().getValue();
     if (cachedGroup && lastMsg == thisMsg) {
         hits++;
         return *cachedGroup;
     }
 
-    std::string group = defaultGroupId;
-    const qpid::framing::FieldTable* headers = qm.payload->getApplicationHeaders();
-    if (headers) {
-        qpid::framing::FieldTable::ValuePtr id = headers->get( groupIdHeader );
-        if (id && id->convertsTo<std::string>()) {
-            std::string tmp = id->get<std::string>();
-            if (!tmp.empty())   // empty group is reserved
-                group = tmp;
-        }
-    }
+    std::string group = m.getPropertyAsString(groupIdHeader);
+    if (group.empty()) group = defaultGroupId; //empty group is reserved
 
     if (cachedGroup && group == lastGroup) {
         hits++;
@@ -112,48 +110,48 @@ MessageGroupManager::GroupState& Message
 }
 
 
-void MessageGroupManager::enqueued( const QueuedMessage& qm )
+void MessageGroupManager::enqueued( const Message& m )
 {
     // @todo KAG optimization - store reference to group state in QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageState mState(qm.position);
+    GroupState& state = findGroup(m);
+    GroupState::MessageState mState(m.getSequence());
     state.members.push_back(mState);
     uint32_t total = state.members.size();
     QPID_LOG( trace, "group queue " << qName <<
               ": added message to group id=" << state.group << " total=" << total );
     if (total == 1) {
         // newly created group, no owner
-        assert(freeGroups.find(qm.position) == freeGroups.end());
-        freeGroups[qm.position] = &state;
+        assert(freeGroups.find(m.getSequence()) == freeGroups.end());
+        freeGroups[m.getSequence()] = &state;
     }
 }
 
 
-void MessageGroupManager::acquired( const QueuedMessage& qm )
+void MessageGroupManager::acquired( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    m->acquired = true;
+    GroupState& state = findGroup(m);
+    GroupState::MessageFifo::iterator gm = state.findMsg(m.getSequence());
+    assert(gm != state.members.end());
+    gm->acquired = true;
     state.acquired += 1;
     QPID_LOG( trace, "group queue " << qName <<
               ": acquired message in group id=" << state.group << " acquired=" << state.acquired );
 }
 
 
-void MessageGroupManager::requeued( const QueuedMessage& qm )
+void MessageGroupManager::requeued( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
+    GroupState& state = findGroup(m);
     assert( state.acquired != 0 );
     state.acquired -= 1;
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    m->acquired = false;
+    GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+    assert(i != state.members.end());
+    i->acquired = false;
     if (state.acquired == 0 && state.owned()) {
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
@@ -164,14 +162,14 @@ void MessageGroupManager::requeued( cons
 }
 
 
-void MessageGroupManager::dequeued( const QueuedMessage& qm )
+void MessageGroupManager::dequeued( const Message& m )
 {
     // @todo KAG  avoid lookup: retrieve direct reference to group state from QueuedMessage
     // issue: const-ness??
-    GroupState& state = findGroup(qm);
-    GroupState::MessageFifo::iterator m = state.findMsg(qm.position);
-    assert(m != state.members.end());
-    if (m->acquired) {
+    GroupState& state = findGroup(m);
+    GroupState::MessageFifo::iterator i = state.findMsg(m.getSequence());
+    assert(i != state.members.end());
+    if (i->acquired) {
         assert( state.acquired != 0 );
         state.acquired -= 1;
     }
@@ -179,7 +177,7 @@ void MessageGroupManager::dequeued( cons
     // special case if qm is first (oldest) message in the group:
     // may need to re-insert it back on the freeGroups list, as the index will change
     bool reFreeNeeded = false;
-    if (m == state.members.begin()) {
+    if (i == state.members.begin()) {
         if (!state.owned()) {
             // will be on the freeGroups list if mgmt is dequeueing rather than a consumer!
             // if on freelist, it is indexed by first member, which is about to be removed!
@@ -188,7 +186,7 @@ void MessageGroupManager::dequeued( cons
         }
         state.members.pop_front();
     } else {
-        state.members.erase(m);
+        state.members.erase(i);
     }
 
     uint32_t total = state.members.size();
@@ -206,6 +204,12 @@ void MessageGroupManager::dequeued( cons
         QPID_LOG( trace, "group queue " << qName <<
                   ": consumer name=" << state.owner << " released group id=" << state.group);
         disown(state);
+        MessageDeque* md = dynamic_cast<MessageDeque*>(&messages);
+        if (md) {
+            md->resetCursors();
+        } else {
+            QPID_LOG(warning, "Could not reset cursors for message group, unexpected container type");
+        }
     } else if (reFreeNeeded) {
         disown(state);
     }
@@ -215,55 +219,27 @@ MessageGroupManager::~MessageGroupManage
 {
     QPID_LOG( debug, "group queue " << qName << " cache results: hits=" << hits << " misses=" << misses );
 }
-bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
+
+bool MessageGroupManager::acquire(const std::string& consumer, Message& m)
 {
-    if (!messages.size())
-        return false;
+    if (m.getState() == AVAILABLE) {
+        // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
+        GroupState& state = findGroup(m);
 
-    next.position = c->getPosition();
-    if (!freeGroups.empty()) {
-        const framing::SequenceNumber& nextFree = freeGroups.begin()->first;
-        if (nextFree <= next.position) {  // take oldest free
-            next.position = nextFree;
-            --next.position;
+        if (!state.owned()) {
+            own( state, consumer );
+            QPID_LOG( trace, "group queue " << qName <<
+                      ": consumer name=" << consumer << " has acquired group id=" << state.group);
         }
-    }
-
-    while (messages.browse( next.position, next, true )) {
-        GroupState& group = findGroup(next);
-        if (!group.owned()) {
-            //TODO: make acquire more efficient when we already have the message in question
-            if (group.members.front().position == next.position && messages.acquire(next.position, next)) {    // only take from head!
-                return true;
-            }
-            QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group
-                     << "'s head message still pending. pos=" << group.members.front().position);
-        } else if (group.owner == c->getName() && messages.acquire(next.position, next)) {
+        if (state.owner == consumer) {
+            m.setState(ACQUIRED);
             return true;
+        } else {
+            return false;
         }
+    } else {
+        return false;
     }
-    return false;
-}
-
-
-bool MessageGroupManager::allocate(const std::string& consumer, const QueuedMessage& qm)
-{
-    // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage
-    GroupState& state = findGroup(qm);
-
-    if (!state.owned()) {
-        own( state, consumer );
-        QPID_LOG( trace, "group queue " << qName <<
-                  ": consumer name=" << consumer << " has acquired group id=" << state.group);
-        return true;
-    }
-    return state.owner == consumer;
-}
-
-bool MessageGroupManager::nextBrowsableMessage( Consumer::shared_ptr& c, QueuedMessage& next )
-{
-    // browse: allow access to any available msg, regardless of group ownership (?ok?)
-    return messages.browse(c->getPosition(), next, false);
 }
 
 void MessageGroupManager::query(qpid::types::Variant::Map& status) const
@@ -296,11 +272,9 @@ void MessageGroupManager::query(qpid::ty
         // set the timestamp to the arrival timestamp of the oldest (HEAD) message, if present
         info[GROUP_TIMESTAMP] = 0;
         if (g->second.members.size() != 0) {
-            QueuedMessage qm;
-            if (messages.find(g->second.members.front().position, qm) &&
-                qm.payload &&
-                qm.payload->hasProperties<framing::DeliveryProperties>()) {
-                info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp();
+            Message* m = messages.find(g->second.members.front().position, 0);
+            if (m && m->getTimestamp()) {
+                info[GROUP_TIMESTAMP] = m->getTimestamp();
             }
         }
         info[GROUP_CONSUMER] = g->second.owner;
@@ -313,33 +287,13 @@ void MessageGroupManager::query(qpid::ty
 
 boost::shared_ptr<MessageGroupManager> MessageGroupManager::create( const std::string& qName,
                                                                     Messages& messages,
-                                                                    const qpid::framing::FieldTable& settings )
+                                                                    const QueueSettings& settings )
 {
-    boost::shared_ptr<MessageGroupManager> empty;
-
-    if (settings.isSet(qpidMessageGroupKey)) {
-
-        // @todo: remove once "sticky" consumers are supported - see QPID-3347
-        if (!settings.isSet(qpidSharedGroup)) {
-            QPID_LOG( error, "Only shared groups are supported in this version of the broker. Use '--shared-groups' in qpid-config." );
-            return empty;
-        }
-
-        std::string headerKey = settings.getAsString(qpidMessageGroupKey);
-        if (headerKey.empty()) {
-            QPID_LOG( error, "A Message Group header key must be configured, queue=" << qName);
-            return empty;
-        }
-        unsigned int timestamp = settings.getAsInt(qpidMessageGroupTimestamp);
-
-        boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( headerKey, qName, messages, timestamp ) );
-
-        QPID_LOG( debug, "Configured Queue '" << qName <<
-                  "' for message grouping using header key '" << headerKey << "'" <<
-                  " (timestamp=" << timestamp << ")");
-        return manager;
-    }
-    return empty;
+    boost::shared_ptr<MessageGroupManager> manager( new MessageGroupManager( settings.groupKey, qName, messages, settings.addTimestamp ) );
+    QPID_LOG( debug, "Configured Queue '" << qName <<
+              "' for message grouping using header key '" << settings.groupKey << "'" <<
+              " (timestamp=" << settings.addTimestamp << ")");
+    return manager;
 }
 
 std::string MessageGroupManager::defaultGroupId;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageGroupManager.h Fri Aug 10 12:04:27 2012
@@ -24,8 +24,10 @@
 
 /* for managing message grouping on Queues */
 
+#include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/MessageDistributor.h"
+#include "qpid/framing/SequenceNumber.h"
 #include "qpid/sys/unordered_map.h"
 
 #include <deque>
@@ -34,6 +36,7 @@ namespace qpid {
 namespace broker {
 
 class QueueObserver;
+struct QueueSettings;
 class MessageDistributor;
 class Messages;
 
@@ -76,11 +79,7 @@ class MessageGroupManager : public State
     GroupFifo freeGroups;   // ordered by oldest free msg
     //Consumers consumers;    // index: consumer name
 
-    static const std::string qpidMessageGroupKey;
-    static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
-    static const std::string qpidMessageGroupTimestamp;
-
-    GroupState& findGroup( const QueuedMessage& qm );
+    GroupState& findGroup( const Message& m );
     unsigned long hits, misses; // for debug
     uint32_t lastMsg;
     std::string lastGroup;
@@ -91,11 +90,14 @@ class MessageGroupManager : public State
     void disown( GroupState& state );
 
  public:
+    static const std::string qpidMessageGroupKey;
+    static const std::string qpidSharedGroup;   // if specified, one group can be consumed by multiple receivers
+    static const std::string qpidMessageGroupTimestamp;
 
     static QPID_BROKER_EXTERN void setDefaults(const std::string& groupId);
     static boost::shared_ptr<MessageGroupManager> create( const std::string& qName,
                                                           Messages& messages,
-                                                          const qpid::framing::FieldTable& settings );
+                                                          const QueueSettings& settings );
 
     MessageGroupManager(const std::string& header, const std::string& _qName,
                         Messages& container, unsigned int _timestamp=0 )
@@ -106,22 +108,20 @@ class MessageGroupManager : public State
     virtual ~MessageGroupManager();
 
     // QueueObserver iface
-    void enqueued( const QueuedMessage& qm );
-    void acquired( const QueuedMessage& qm );
-    void requeued( const QueuedMessage& qm );
-    void dequeued( const QueuedMessage& qm );
+    void enqueued( const Message& qm );
+    void acquired( const Message& qm );
+    void requeued( const Message& qm );
+    void dequeued( const Message& qm );
     void consumerAdded( const Consumer& ) {};
     void consumerRemoved( const Consumer& ) {};
     void getState(qpid::framing::FieldTable& state ) const;
     void setState(const qpid::framing::FieldTable&);
 
     // MessageDistributor iface
-    bool nextConsumableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
-    bool allocate(const std::string& c, const QueuedMessage& qm);
-    bool nextBrowsableMessage(Consumer::shared_ptr& c, QueuedMessage& next);
+    bool acquire(const std::string& c, Message& );
     void query(qpid::types::Variant::Map&) const;
 
-    bool match(const qpid::types::Variant::Map*, const QueuedMessage&) const;
+    bool match(const qpid::types::Variant::Map*, const Message&) const;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.cpp Fri Aug 10 12:04:27 2012
@@ -19,7 +19,8 @@
  *
  */
 #include "qpid/broker/MessageMap.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/QueueCursor.h"
 #include "qpid/log/Statement.h"
 #include <algorithm>
 
@@ -29,29 +30,17 @@ namespace {
 const std::string EMPTY;
 }
 
-bool MessageMap::deleted(const QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(message.position);
-    if (i != messages.end()) {
-        erase(i);
-        return true;
-    } else {
-        return false;
-    }
-}
 
-std::string MessageMap::getKey(const QueuedMessage& message)
+std::string MessageMap::getKey(const Message& message)
 {
-    const framing::FieldTable* ft = message.payload->getApplicationHeaders();
-    if (ft) return ft->getAsString(key);
-    else return EMPTY;
+    return message.getPropertyAsString(key);
 }
 
 size_t MessageMap::size()
 {
     size_t count(0);
     for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) ++count;
+        if (i->second.getState() == AVAILABLE) ++count;
     }
     return count;
 }
@@ -61,116 +50,103 @@ bool MessageMap::empty()
     return size() == 0;//TODO: more efficient implementation
 }
 
-void MessageMap::release(const QueuedMessage& message)
-{
-    Ordering::iterator i = messages.find(message.position);
-    if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) {
-        i->second.status = QueuedMessage::AVAILABLE;
-    }
-}
-
-bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
+bool MessageMap::deleted(const QueueCursor& cursor)
 {
-    Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        i->second.status = QueuedMessage::ACQUIRED;
-        message = i->second;
+    Ordering::iterator i = messages.find(cursor.position);
+    if (i != messages.end()) {
+        erase(i);
         return true;
     } else {
         return false;
     }
 }
 
-bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message)
+Message* MessageMap::find(const QueueCursor& cursor)
 {
-    Ordering::iterator i = messages.find(position);
-    if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) {
-        message = i->second;
-        return true;
-    } else {
-        return false;
-    }
+    if (cursor.valid) return find(cursor.position, 0);
+    else return 0;
 }
 
-bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired)
+Message* MessageMap::find(const framing::SequenceNumber& position, QueueCursor* cursor)
 {
-    Ordering::iterator i = messages.lower_bound(position+1);
-    if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE  || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) {
-        message = i->second;
-        return true;
+    Ordering::iterator i = messages.lower_bound(position);
+    if (i != messages.end()) {
+        if (cursor) cursor->setPosition(i->first, version);
+        if (i->first == position) return &(i->second);
+        else return 0;
     } else {
-        return false;
+        //there is no message whose sequence is greater than position,
+        //i.e. haven't got there yet
+        if (cursor) cursor->setPosition(position, version);
+        return 0;
     }
 }
 
-bool MessageMap::consume(QueuedMessage& message)
+Message* MessageMap::next(QueueCursor& cursor)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) {
-            i->second.status = QueuedMessage::ACQUIRED;
-            message = i->second;
-            return true;
+    Ordering::iterator i;
+    if (!cursor.valid) i = messages.begin(); //start with oldest message
+    else i = messages.upper_bound(cursor.position); //get first message that is greater than position
+
+    while (i != messages.end()) {
+        Message& m = i->second;
+        cursor.setPosition(m.getSequence(), version);
+        if (cursor.check(m)) {
+            return &m;
+        } else {
+            ++i;
         }
     }
-    return false;
+    return 0;
 }
 
-const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update)
+const Message& MessageMap::replace(const Message& original, const Message& update)
 {
-    messages.erase(original.position);
-    messages[update.position] = update;
-    return update;
+    messages.erase(original.getSequence());
+    std::pair<Ordering::iterator, bool> i = messages.insert(Ordering::value_type(update.getSequence(), update));
+    i.first->second.setState(AVAILABLE);
+    return i.first->second;
 }
 
-bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed)
+void MessageMap::publish(const Message& added)
+{
+    Message dummy;
+    update(added, dummy);
+}
+
+bool MessageMap::update(const Message& added, Message& removed)
 {
     std::pair<Index::iterator, bool> result = index.insert(Index::value_type(getKey(added), added));
     if (result.second) {
         //there was no previous message for this key; nothing needs to
         //be removed, just add the message into its correct position
-        QueuedMessage& a = messages[added.position];
-        a = added;
-        a.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Added message " << a);
+        messages.insert(Ordering::value_type(added.getSequence(), added)).first->second.setState(AVAILABLE);
         return false;
     } else {
         //there is already a message with that key which needs to be replaced
         removed = result.first->second;
         result.first->second = replace(result.first->second, added);
-        result.first->second.status = QueuedMessage::AVAILABLE;
-        QPID_LOG(debug, "Displaced message " << removed << " with " << result.first->second << ": " << result.first->first);
+        result.first->second.setState(AVAILABLE);
+        QPID_LOG(debug, "Displaced message at " << removed.getSequence() << " with " << result.first->second.getSequence() << ": " << result.first->first);
         return true;
     }
 }
 
-void MessageMap::setPosition(const framing::SequenceNumber& seq) {
-    // Nothing to do, just assert that the precondition is respected and there
-    // are no undeleted messages after seq.
-    (void) seq; assert(messages.empty() || (--messages.end())->first <= seq);
-}
-
-void MessageMap::foreach(Functor f)
+Message* MessageMap::release(const QueueCursor& cursor)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
-        if (i->second.status == QueuedMessage::AVAILABLE) f(i->second);
+    Ordering::iterator i = messages.find(cursor.position);
+    if (i != messages.end()) {
+        i->second.setState(AVAILABLE);
+        return &i->second;
+    } else {
+        return 0;
     }
 }
 
-void MessageMap::removeIf(Predicate p)
+void MessageMap::foreach(Functor f)
 {
-    for (Ordering::iterator i = messages.begin(); i != messages.end();) {
-        if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) {
-            index.erase(getKey(i->second));
-            //Note: Removing from messages means that the subsequent
-            //call to deleted() for the same message will return
-            //false. At present that is not a problem. If this were
-            //changed to hold onto the message until dequeued
-            //(e.g. with REMOVED state), then the erase() below would
-            //need to take that into account.
-            messages.erase(i++);
-        } else {
-            ++i;
-        }
+    for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) {
+        if (i->second.getState() == AVAILABLE) f(i->second);
     }
 }
 
@@ -180,6 +156,6 @@ void MessageMap::erase(Ordering::iterato
     messages.erase(i);
 }
 
-MessageMap::MessageMap(const std::string& k) : key(k) {}
+MessageMap::MessageMap(const std::string& k) : key(k), version(0) {}
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageMap.h Fri Aug 10 12:04:27 2012
@@ -6,7 +6,7 @@
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
-o * regarding copyright ownership.  The ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -22,6 +22,7 @@ o * regarding copyright ownership.  The 
  *
  */
 #include "qpid/broker/Messages.h"
+#include "qpid/broker/Message.h"
 #include "qpid/framing/SequenceNumber.h"
 #include <map>
 #include <string>
@@ -38,32 +39,31 @@ class MessageMap : public Messages
 {
   public:
     MessageMap(const std::string& key);
-    virtual ~MessageMap() {}
 
     size_t size();
     bool empty();
 
-    virtual bool deleted(const QueuedMessage&);
-    void release(const QueuedMessage&);
-    virtual bool acquire(const framing::SequenceNumber&, QueuedMessage&);
-    bool find(const framing::SequenceNumber&, QueuedMessage&);
-    virtual bool browse(const framing::SequenceNumber&, QueuedMessage&, bool);
-    bool consume(QueuedMessage&);
-    virtual bool push(const QueuedMessage& added, QueuedMessage& removed);
-    void setPosition(const framing::SequenceNumber&);
+    bool deleted(const QueueCursor&);
+    void publish(const Message& added);//use update instead to get replaced message
+    Message* next(QueueCursor&);
+    Message* release(const QueueCursor& cursor);
+    Message* find(const QueueCursor&);
+    Message* find(const framing::SequenceNumber&, QueueCursor*);
 
     void foreach(Functor);
-    virtual void removeIf(Predicate);
+
+    bool update(const Message& added, Message& removed);
 
   protected:
-    typedef std::map<std::string, QueuedMessage> Index;
-    typedef std::map<framing::SequenceNumber, QueuedMessage> Ordering;
+    typedef std::map<std::string, Message> Index;
+    typedef std::map<framing::SequenceNumber, Message> Ordering;
     const std::string key;
     Index index;
     Ordering messages;
+    int32_t version;
 
-    std::string getKey(const QueuedMessage&);
-    virtual const QueuedMessage& replace(const QueuedMessage&, const QueuedMessage&);
+    std::string getKey(const Message&);
+    virtual const Message& replace(const Message&, const Message&);
     void erase(Ordering::iterator);
 };
 }} // namespace qpid::broker



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org