You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/14 16:09:09 UTC
svn commit: r575689 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
tests/
Author: gsim
Date: Fri Sep 14 07:09:08 2007
New Revision: 575689
URL: http://svn.apache.org/viewvc?rev=575689&view=rev
Log:
Handle asynchronous enqueue of messages.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Sep 14 07:09:08 2007
@@ -178,6 +178,7 @@
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
+ qpid/broker/IncomingExecutionContext.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -278,6 +279,7 @@
qpid/broker/Daemon.h \
qpid/broker/DeliveryRecord.h \
qpid/broker/HeadersExchange.h \
+ qpid/broker/IncomingExecutionContext.h \
qpid/broker/MessageStore.h \
qpid/broker/PersistableExchange.h \
qpid/broker/PersistableMessage.h \
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp?rev=575689&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp Fri Sep 14 07:09:08 2007
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "IncomingExecutionContext.h"
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace broker {
+
+using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceNumber;
+using qpid::framing::SequenceNumberSet;
+
+void IncomingExecutionContext::noop()
+{
+ complete(next());
+}
+
+void IncomingExecutionContext::flush()
+{
+ for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ) {
+ if ((*i)->isEnqueueComplete()) {
+ complete((*i)->getCommandId());
+ i = incomplete.erase(i);
+ } else {
+ i++;
+ }
+ }
+ window.lwm = completed.mark;
+}
+
+void IncomingExecutionContext::sync()
+{
+ while (completed.mark < window.hwm) {
+ wait();
+ }
+}
+
+void IncomingExecutionContext::sync(const SequenceNumber& point)
+{
+ while (!isComplete(point)) {
+ wait();
+ }
+}
+
+/**
+ * Every call to next() should be followed be either a call to
+ * complete() - in the case of commands, which are always synchronous
+ * - or track() - in the case of messages which may be asynchronously
+ * stored.
+ */
+SequenceNumber IncomingExecutionContext::next()
+{
+ return ++window.hwm;
+}
+
+void IncomingExecutionContext::complete(const SequenceNumber& command)
+{
+ completed.update(command, command);
+}
+
+void IncomingExecutionContext::track(Message::shared_ptr msg)
+{
+ if (msg->isEnqueueComplete()) {
+ complete(msg->getCommandId());
+ } else {
+ incomplete.push_back(msg);
+ }
+}
+
+bool IncomingExecutionContext::isComplete(const SequenceNumber& command)
+{
+ if (command > window.hwm) {
+ throw Exception(QPID_MSG("Bad sync request: point exceeds last command received ["
+ << command.getValue() << " > " << window.hwm.getValue() << "]"));
+ }
+
+ return completed.covers(command);
+}
+
+
+const SequenceNumber& IncomingExecutionContext::getMark()
+{
+ return completed.mark;
+}
+
+SequenceNumberSet IncomingExecutionContext::getRange()
+{
+ SequenceNumberSet range;
+ completed.collectRanges(range);
+ return range;
+}
+
+void IncomingExecutionContext::wait()
+{
+ check();
+ incomplete.front()->waitForEnqueueComplete();
+ flush();
+}
+
+/**
+ * This is a check of internal state consistency.
+ */
+void IncomingExecutionContext::check()
+{
+ if (incomplete.empty()) {
+ if (window.hwm != completed.mark) {
+ //can only happen if there is a call to next() without a
+ //corresponding call to completed() or track() - or if
+ //there is a logical error in flush() or
+ //AccumulatedAck::update()
+ throw Exception(QPID_MSG("Completion tracking error: window.hwm="
+ << window.hwm.getValue() << ", completed.mark="
+ << completed.mark.getValue()));
+ }
+ }
+}
+
+}}
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h?rev=575689&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h Fri Sep 14 07:09:08 2007
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _IncomingExecutionContext_
+#define _IncomingExecutionContext_
+
+#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "Message.h"
+
+namespace qpid {
+namespace broker {
+
+class IncomingExecutionContext
+{
+ typedef std::list<Message::shared_ptr> Messages;
+ framing::Window window;
+ framing::AccumulatedAck completed;
+ Messages incomplete;
+
+ bool isComplete(const framing::SequenceNumber& command);
+ void check();
+ void wait();
+public:
+ void noop();
+ void flush();
+ void sync();
+ void sync(const framing::SequenceNumber& point);
+ framing::SequenceNumber next();
+ void complete(const framing::SequenceNumber& command);
+ void track(Message::shared_ptr);
+
+ const framing::SequenceNumber& getMark();
+ framing::SequenceNumberSet getRange();
+
+};
+
+
+}}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Sep 14 07:09:08 2007
@@ -57,6 +57,8 @@
const ConnectionToken* getPublisher() const { return publisher; }
void setPublisher(ConnectionToken* p) { publisher = p; }
+ const framing::SequenceNumber& getCommandId() { return frames.getId(); }
+
uint64_t contentSize() const;
std::string getRoutingKey() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Sep 14 07:09:08 2007
@@ -26,6 +26,7 @@
#include <boost/shared_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Monitor.h"
namespace qpid {
namespace broker {
@@ -36,31 +37,23 @@
*/
class PersistableMessage : public Persistable
{
+ sys::Monitor asyncEnqueueLock;
-
- /**
- * Needs to be set false on Message construction, then
- * set once the broker has taken responsibility for the
- * message. For transient, once enqueued, for durable, once
- * stored.
- */
- bool enqueueCompleted;
-
/**
- * Counts the number of times the message has been processed
- * async - thus when it == 0 the broker knows it has ownership
- * -> an async store can increment this counter if it writes a
- * copy to each queue, and case use this counter to know when all
- * the write are complete
- */
- int asyncCounter;
+ * Tracks the number of outstanding asynchronous enqueue
+ * operations. When the message is enqueued asynchronously the
+ * count is incremented; when that enqueue completes it is
+ * decremented. Thus when it is 0, there are no outstanding
+ * enqueues.
+ */
+ int asyncEnqueueCounter;
/**
- * Needs to be set false on Message construction, then
- * set once the dequeueis complete, it gets set
- * For transient, once dequeued, for durable, once
- * dequeue record has been stored.
- */
+ * Needs to be set false on Message construction, then
+ * set once the dequeueis complete, it gets set
+ * For transient, once dequeued, for durable, once
+ * dequeue record has been stored.
+ */
bool dequeueCompleted;
public:
@@ -73,23 +66,36 @@
virtual ~PersistableMessage() {};
- PersistableMessage():
- enqueueCompleted(false),
- asyncCounter(0),
- dequeueCompleted(false){};
+ PersistableMessage(): asyncEnqueueCounter(0), dequeueCompleted(false) {}
- inline bool isEnqueueComplete() {return enqueueCompleted;};
+ inline void waitForEnqueueComplete() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ while (asyncEnqueueCounter > 0) {
+ asyncEnqueueLock.wait();
+ }
+ }
+
+ inline bool isEnqueueComplete() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ return asyncEnqueueCounter == 0;
+ }
+
inline void enqueueComplete() {
- if (asyncCounter<=1) {
- asyncCounter =0;
- enqueueCompleted = true;
- }else{
- asyncCounter--;
- }
- };
- inline void enqueueAsync() {asyncCounter++;};
- inline bool isDequeueComplete() {return dequeueCompleted;};
- inline void dequeueComplete() {dequeueCompleted = true;};
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ if (asyncEnqueueCounter > 0) {
+ if (--asyncEnqueueCounter == 0) {
+ asyncEnqueueLock.notify();
+ }
+ }
+ }
+
+ inline void enqueueAsync() {
+ sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ asyncEnqueueCounter++;
+ }
+
+ inline bool isDequeueComplete() { return dequeueCompleted; }
+ inline void dequeueComplete() { dequeueCompleted = true; }
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Sep 14 07:09:08 2007
@@ -97,27 +97,29 @@
}
}
-void SemanticHandler::flush()
+void SemanticHandler::sendCompletion()
{
- //flush doubles as a sync to begin with - send an execution.complete
if (isOpen()) {
+ SequenceNumber mark = incoming.getMark();
+ SequenceNumberSet range = incoming.getRange();
Mutex::ScopedLock l(outLock);
- ChannelAdapter::send(ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()));
+ ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
}
}
+void SemanticHandler::flush()
+{
+ incoming.flush();
+ sendCompletion();
+}
void SemanticHandler::sync()
{
- //for now, just treat as flush; will need to get more clever when we deal with async publication
- flush();
+ incoming.sync();
+ sendCompletion();
}
void SemanticHandler::noop()
{
- //Do nothing...
- //
- //is this an L3 control? or is it an L4 command?
- //if the former, of what use is it?
- //if the latter it may contain a synch request... but its odd to have it in this class
+ incoming.noop();
}
void SemanticHandler::result(uint32_t /*command*/, const std::string& /*data*/)
@@ -127,17 +129,18 @@
void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
{
- ++(incoming.lwm);
+ SequenceNumber id = incoming.next();
InvocationVisitor v(&adapter);
method->accept(v);
- //TODO: need to account for async store operations and interleaving
- ++(incoming.hwm);
+ incoming.complete(id);
if (!v.wasHandled()) {
throw ConnectionException(540, "Not implemented");
} else if (v.hasResult()) {
- ChannelAdapter::send(ExecutionResultBody(getVersion(), incoming.lwm.getValue(), v.getResult()));
+ ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
}
+ //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
+ //TODO: if window gets too large send unsolicited completion
}
void SemanticHandler::handleL3(framing::AMQMethodBody* method)
@@ -151,16 +154,16 @@
{
Message::shared_ptr msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
- msgBuilder.start(++(incoming.lwm));
+ msgBuilder.start(incoming.next());
msg = msgBuilder.getMessage();
}
msgBuilder.handle(frame);
if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
msg->setPublisher(&connection);
- session.handle(msg);
+ session.handle(msg);
msgBuilder.end();
- //TODO: need to account for async store operations and interleaving
- ++(incoming.hwm);
+ incoming.track(msg);
+ //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
}
}
@@ -172,11 +175,8 @@
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- //SequenceNumber copy(outgoing.hwm);
- //++copy;
MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
return outgoing.hwm;
- //return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Fri Sep 14 07:09:08 2007
@@ -25,6 +25,7 @@
#include "BrokerAdapter.h"
#include "DeliveryAdapter.h"
#include "MessageBuilder.h"
+#include "IncomingExecutionContext.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/AMQP_ServerOperations.h"
@@ -53,7 +54,7 @@
Session& session;
Connection& connection;
BrokerAdapter adapter;
- framing::Window incoming;
+ IncomingExecutionContext incoming;
framing::Window outgoing;
sys::Mutex outLock;
MessageBuilder msgBuilder;
@@ -64,6 +65,8 @@
void handleL3(framing::AMQMethodBody* method);
void handleCommand(framing::AMQMethodBody* method);
void handleContent(framing::AMQFrame& frame);
+
+ void sendCompletion();
//ChannelAdapter virtual methods:
void handleMethod(framing::AMQMethodBody* method);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=575689&r1=575688&r2=575689&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Sep 14 07:09:08 2007
@@ -91,6 +91,7 @@
//Test basic delivery:
Message::shared_ptr msg1 = message("e", "A");
+ msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -107,6 +108,7 @@
void testAsyncMessageCount(){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
Message::shared_ptr msg1 = message("e", "A");
+ msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);