You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/13 19:29:18 UTC
svn commit: r575377 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/
cpp/src/qpid/client/ cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/
Author: gsim
Date: Thu Sep 13 10:29:16 2007
New Revision: 575377
URL: http://svn.apache.org/viewvc?rev=575377&view=rev
Log:
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages).
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
incubator/qpid/trunk/qpid/python/qpid/connection.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Thu Sep 13 10:29:16 2007
@@ -126,6 +126,7 @@
void Queue::requestDispatch(Consumer* c, bool sync){
if (!c || c->preAcquires()) {
if (sync) {
+ Mutex::ScopedLock locker(messageLock);
dispatch();
} else {
serializer.execute(dispatchCallback);
@@ -153,7 +154,9 @@
int start = next;
while(c){
next++;
- if(c->deliver(msg)) return true;
+ if(c->deliver(msg)) {
+ return true;
+ }
next = next % acquirers.size();
c = next == start ? 0 : acquirers[next];
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Sep 13 10:29:16 2007
@@ -30,12 +30,15 @@
Queue::shared_ptr _queue,
const string _consumerTag,
const DeliveryId _id,
- bool _acquired) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- id(_id),
- acquired(_acquired),
- pull(false){}
+ bool _acquired, bool _confirmed) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ id(_id),
+ acquired(_acquired),
+ confirmed(_confirmed),
+ pull(false)
+{
+}
DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
@@ -44,11 +47,12 @@
consumerTag(""),
id(_id),
acquired(true),
+ confirmed(false),
pull(true){}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired) {
+ if (acquired && !confirmed) {
queue->dequeue(ctxt, msg.payload);
}
}
@@ -70,24 +74,30 @@
}
void DeliveryRecord::redeliver(Session* const session) const{
- if(pull){
- //if message was originally sent as response to get, we must requeue it
- requeue();
- }else{
- session->deliver(msg.payload, consumerTag, id);
+ if (!confirmed) {
+ if(pull){
+ //if message was originally sent as response to get, we must requeue it
+ requeue();
+ }else{
+ session->deliver(msg.payload, consumerTag, id);
+ }
}
}
void DeliveryRecord::requeue() const
{
- msg.payload->redeliver();
- queue->requeue(msg);
+ if (!confirmed) {
+ msg.payload->redeliver();
+ queue->requeue(msg);
+ }
}
void DeliveryRecord::release()
{
- queue->requeue(msg);
- acquired = false;
+ if (!confirmed) {
+ queue->requeue(msg);
+ acquired = false;
+ }
}
void DeliveryRecord::reject()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Sep 13 10:29:16 2007
@@ -45,11 +45,12 @@
const std::string consumerTag;
const DeliveryId id;
bool acquired;
+ const bool confirmed;
const bool pull;
public:
DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag,
- const DeliveryId id, bool acquired);
+ const DeliveryId id, bool acquired, bool confirmed = false);
DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
void dequeue(TransactionContext* ctxt = 0) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Sep 13 10:29:16 2007
@@ -144,7 +144,7 @@
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- uint64_t expectedSize(frames.getHeaders()->getContentLength());//TODO: how do we know how much data to load?
+ uint64_t expectedSize(frames.getHeaders()->getContentLength());
for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
{
uint64_t remaining = expectedSize - offset;
@@ -153,11 +153,22 @@
store->loadContent(*this, data, offset,
remaining > maxContentSize ? maxContentSize : remaining);
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
out.handle(frame);
}
} else {
- SendContent f(out, channel, maxFrameSize);
+ Count c;
+ frames.map_if(c, TypeFilter(CONTENT_BODY));
+
+ SendContent f(out, channel, maxFrameSize, c.getCount());
frames.map_if(f, TypeFilter(CONTENT_BODY));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Thu Sep 13 10:29:16 2007
@@ -39,7 +39,7 @@
struct BaseToken : DeliveryToken
{
virtual ~BaseToken() {}
- virtual void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id) = 0;
+ virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0;
};
struct BasicGetToken : BaseToken
@@ -50,12 +50,11 @@
BasicGetToken(Queue::shared_ptr q) : queue(q) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicGetOkBody(
- channel.getVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
+ return AMQFrame(0, BasicGetOkBody(
+ ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
msg->getRoutingKey(), queue->getMessageCount()));
-
}
};
@@ -67,10 +66,10 @@
BasicConsumeToken(const string c) : consumer(c) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId id)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
{
- channel.send(BasicDeliverBody(
- channel.getVersion(), consumer, id.getValue(),
+ return AMQFrame(0, BasicDeliverBody(
+ ProtocolVersion(), consumer, id.getValue(),
msg->getRedelivered(), msg->getExchangeName(), msg->getRoutingKey()));
}
@@ -85,16 +84,13 @@
MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
destination(d), confirmMode(c), acquireMode(a) {}
- void sendMethod(Message::shared_ptr msg, ChannelAdapter& channel, DeliveryId /*id*/)
+ AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/)
{
- //TODO; need to figure out how the acquire mode gets
- //communicated (this is just a temporary solution)
- channel.send(MessageTransferBody(channel.getVersion(), 0, destination, confirmMode, acquireMode));
-
//may need to set the redelivered flag:
if (msg->getRedelivered()){
msg->getProperties<DeliveryProperties>()->setRedelivered(true);
}
+ return AMQFrame(0, MessageTransferBody(ProtocolVersion(), 0, destination, confirmMode, acquireMode));
}
};
@@ -127,11 +123,15 @@
//another may well have the wrong headers; however we will only
//have one content class for 0-10 proper
+ FrameHandler& handler = channel.getHandlers().out;
+
//send method
boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
- t->sendMethod(msg, channel, id);
+ AMQFrame method = t->sendMethod(msg, id);
+ method.setEof(false);
+ method.setChannel(channel.getId());
+ handler.handle(method);
- FrameHandler& handler = channel.getHandlers().out;
msg->sendHeader(handler, channel.getId(), framesize);
msg->sendContent(handler, channel.getId(), framesize);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Sep 13 10:29:16 2007
@@ -172,10 +172,11 @@
DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
{
Mutex::ScopedLock l(outLock);
- SequenceNumber copy(outgoing.hwm);
- ++copy;
- MessageDelivery::deliver(msg, *this, copy.getValue(), token, connection.getFrameMax());
- return outgoing.hwm.getValue();
+ //SequenceNumber copy(outgoing.hwm);
+ //++copy;
+ MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+ return outgoing.hwm;
+ //return outgoing.hwm.getValue();
}
void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Thu Sep 13 10:29:16 2007
@@ -268,8 +268,8 @@
DeliveryId deliveryTag =
parent->deliveryAdapter->deliver(msg.payload, token);
- if (ackExpected) {
- parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire));
+ if (windowing || ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag, acquire, !ackExpected));
}
}
return !blocked;
@@ -565,12 +565,14 @@
ack_iterator start = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
ack_iterator end = start;
- if (first == last) {
- //just acked single element (move end past it)
- ++end;
- } else {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ if (start != unacked.end()) {
+ if (first == last) {
+ //just acked single element (move end past it)
+ ++end;
+ } else {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ }
}
return AckRange(start, end);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckMode.h Thu Sep 13 10:29:16 2007
@@ -1,72 +1,25 @@
#ifndef _client_AckMode_h
#define _client_AckMode_h
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-namespace qpid {
-namespace client {
-
-/**
- * The available acknowledgements modes.
- *
- * \ingroup clientapi
- */
-enum AckMode {
- /** No acknowledgement will be sent, broker can
- discard messages as soon as they are delivered
- to a consumer using this mode. **/
- NO_ACK = 0,
- /** Each message will be automatically
- acknowledged as soon as it is delivered to the
- application **/
- AUTO_ACK = 1,
- /** Acknowledgements will be sent automatically,
- but not for each message. **/
- LAZY_ACK = 2,
- /** The application is responsible for explicitly
- acknowledging messages. **/
- CLIENT_ACK = 3
-};
-
-}} // namespace qpid::client
-
-
-
-#endif /*!_client_AckMode_h*/
-#ifndef _client_AckMode_h
-#define _client_AckMode_h
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
@@ -97,6 +50,4 @@
}} // namespace qpid::client
-
-
-#endif /*!_client_AckMode_h*/
+#endif
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ChannelHandler.cpp Thu Sep 13 10:29:16 2007
@@ -58,7 +58,7 @@
if (body->getMethod())
handleMethod(body->getMethod());
else
- throw new ConnectionException(504, "Channel not open.");
+ throw ConnectionException(504, "Channel not open for content.");
}
}
@@ -68,7 +68,7 @@
frame.setChannel(id);
out(frame);
} else if (getState() == CLOSED) {
- throw Exception("Channel not open");
+ throw Exception(QPID_MSG("Channel not open, can't send " << frame));
} else if (getState() == CLOSED_BY_PEER) {
throw ChannelException(code, text);
}
@@ -120,7 +120,7 @@
} //else just ignore it
break;
case CLOSED:
- throw ConnectionException(504, "Channel not opened.");
+ throw ConnectionException(504, "Channel is closed.");
default:
throw Exception("Unexpected state encountered in ChannelHandler!");
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Sep 13 10:29:16 2007
@@ -69,7 +69,7 @@
void Channel::open(const Session& s)
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
@@ -80,7 +80,7 @@
}
bool Channel::isOpen() const {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
return active;
}
@@ -146,7 +146,7 @@
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ c.count = 0;
}
uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
@@ -205,7 +205,7 @@
{
session.close();
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
active = false;
}
stop();
@@ -231,20 +231,18 @@
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- MessageListener* listener(0);
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- listener = i->second.listener;
- }
- }
- if (listener) {
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
Message msg;
msg.populate(content);
+ MessageListener* listener = i->second.listener;
listener->received(msg);
+ if (isOpen() && i->second.ackMode != CLIENT_ACK) {
+ bool send = i->second.ackMode == AUTO_ACK
+ || (prefetch && ++(i->second.count) > (prefetch / 2));
+ if (send) i->second.count = 0;
+ session.execution().completed(content.getId(), true, send);
+ }
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Sep 13 10:29:16 2007
@@ -63,8 +63,7 @@
struct Consumer{
MessageListener* listener;
AckMode ackMode;
- int count;
- u_int64_t lastDeliveryTag;
+ uint32_t count;
};
typedef std::map<std::string, Consumer> ConsumerMap;
@@ -75,7 +74,7 @@
const bool transactional;
framing::ProtocolVersion version;
- sys::Mutex stopLock;
+ mutable sys::Mutex stopLock;
bool running;
ConsumerMap consumers;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.cpp Thu Sep 13 10:29:16 2007
@@ -25,14 +25,15 @@
using namespace qpid::framing;
using namespace boost;
-void Correlator::receive(AMQMethodBody* response)
+bool Correlator::receive(const AMQMethodBody* response)
{
if (listeners.empty()) {
- throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
+ return false;
} else {
Listener l = listeners.front();
if (l) l(response);
listeners.pop();
+ return true;
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Correlator.h Thu Sep 13 10:29:16 2007
@@ -36,9 +36,9 @@
class Correlator
{
public:
- typedef boost::function<void(framing::AMQMethodBody*)> Listener;
+ typedef boost::function<void(const framing::AMQMethodBody*)> Listener;
- void receive(framing::AMQMethodBody*);
+ bool receive(const framing::AMQMethodBody*);
void listen(Listener l);
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Sep 13 10:29:16 2007
@@ -62,19 +62,16 @@
{
AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
- if (isContentFrame(frame)) {
- if (!arriving) {
- arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
- }
- arriving->append(frame);
- if (arriving->isComplete()) {
+ if (!arriving) {
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
+ }
+ arriving->append(frame);
+ if (arriving->isComplete()) {
+ if (arriving->isContentBearing() || !correlation.receive(arriving->getMethod())) {
demux.handle(arriving);
- arriving.reset();
- }
- } else {
- ++incoming.hwm;
- correlation.receive(body->getMethod());
- }
+ }
+ arriving.reset();
+ }
}
}
@@ -168,11 +165,19 @@
SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l)
{
+ return send(command, l, false);
+}
+
+SequenceNumber ExecutionHandler::send(const AMQBody& command, CompletionTracker::ResultListener l, bool hasContent)
+{
SequenceNumber id = ++outgoing.hwm;
if(l) {
completion.listenForResult(id, l);
}
AMQFrame frame(0/*channel will be filled in be channel handler*/, command);
+ if (hasContent) {
+ frame.setEof(false);
+ }
out(frame);
return id;
}
@@ -180,7 +185,7 @@
SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodContent& content,
CompletionTracker::ResultListener l)
{
- SequenceNumber id = send(command, l);
+ SequenceNumber id = send(command, l, true);
sendContent(content);
return id;
}
@@ -188,14 +193,16 @@
void ExecutionHandler::sendContent(const MethodContent& content)
{
AMQFrame header(0, content.getHeader());
- out(header);
-
+ header.setBof(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
+ header.setEof(false);
+ out(header);
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
AMQFrame frame(0, AMQContentBody(content.getData()));
+ frame.setBof(false);
out(frame);
}else{
u_int32_t offset = 0;
@@ -204,10 +211,20 @@
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
- out(frame);
+ frame.setBof(false);
+ if (offset > 0) {
+ frame.setBos(false);
+ }
offset += length;
remaining = data_length - offset;
+ if (remaining) {
+ frame.setEos(false);
+ frame.setEof(false);
+ }
+ out(frame);
}
}
+ } else {
+ out(header);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Thu Sep 13 10:29:16 2007
@@ -59,6 +59,7 @@
void sendCompletion();
+ framing::SequenceNumber send(const framing::AMQBody&, CompletionTracker::ResultListener, bool hasContent);
void sendContent(const framing::MethodContent&);
public:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Thu Sep 13 10:29:16 2007
@@ -35,7 +35,7 @@
return response.get();
}
-void FutureResponse::received(AMQMethodBody* r)
+void FutureResponse::received(const AMQMethodBody* r)
{
Monitor::ScopedLock l(lock);
response = *r;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.h Thu Sep 13 10:29:16 2007
@@ -36,7 +36,7 @@
framing::MethodHolder response;
public:
framing::AMQMethodBody* getResponse(SessionCore& session);
- void received(framing::AMQMethodBody* response);
+ void received(const framing::AMQMethodBody* response);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Thu Sep 13 10:29:16 2007
@@ -156,7 +156,9 @@
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
- return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+ return out << "Frame["
+ //<< "B=" << f.getBof() << "E=" << f.getEof() << "b=" << f.getBos() << "e=" << f.getEos() << "; "
+ << "channel=" << f.getChannel() << "; " << *f.getBody()
<< "]";
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Thu Sep 13 10:29:16 2007
@@ -74,6 +74,17 @@
void encode(Buffer& buffer) const;
bool decode(Buffer& buffer);
uint32_t size() const;
+
+ bool getBof() const { return bof; }
+ void setBof(bool isBof) { bof = isBof; }
+ bool getEof() const { return eof; }
+ void setEof(bool isEof) { eof = isEof; }
+
+ bool getBos() const { return bos; }
+ void setBos(bool isBos) { bos = isBos; }
+ bool getEos() const { return eos; }
+ void setEos(bool isEos) { eos = isEos; }
+
static uint32_t frameOverhead();
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Thu Sep 13 10:29:16 2007
@@ -38,20 +38,13 @@
bool FrameSet::isComplete() const
{
- //TODO: should eventually use the 0-10 frame header flags when available
+ return !parts.empty() && parts.back().getEof();
+}
+
+bool FrameSet::isContentBearing() const
+{
const AMQMethodBody* method = getMethod();
- if (!method) {
- return false;
- } else if (method->isContentBearing()) {
- const AMQHeaderBody* header = getHeaders();
- if (header) {
- return header->getContentLength() == getContentSize();
- } else {
- return false;
- }
- } else {
- return true;
- }
+ return method && method->isContentBearing();
}
const AMQMethodBody* FrameSet::getMethod() const
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Thu Sep 13 10:29:16 2007
@@ -50,6 +50,8 @@
void getContent(std::string&) const;
std::string getContent() const;
+ bool isContentBearing() const;
+
const AMQMethodBody* getMethod() const;
const AMQHeaderBody* getHeaders() const;
AMQHeaderBody* getHeaders();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Thu Sep 13 10:29:16 2007
@@ -21,31 +21,47 @@
#include "SendContent.h"
-qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c),
+ maxFrameSize(mfs),
+ expectedFrameCount(efc), frameCount(0) {}
-void qpid::framing::SendContent::operator()(AMQFrame& f) const
+void qpid::framing::SendContent::operator()(const AMQFrame& f)
{
+ bool first = frameCount == 0;
+ bool last = ++frameCount == expectedFrameCount;
+
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
const AMQContentBody* body(f.castBody<AMQContentBody>());
if (body->size() > maxContentSize) {
uint32_t offset = 0;
for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
- sendFragment(*body, offset, maxContentSize);
+ sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->size());
offset += maxContentSize;
}
uint32_t remainder = body->size() % maxContentSize;
if (remainder) {
- sendFragment(*body, offset, remainder);
+ sendFragment(*body, offset, remainder, first && offset == 0, last);
}
} else {
AMQFrame copy(f);
+ setFlags(copy, first, last);
copy.setChannel(channel);
handler.handle(copy);
}
}
-void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
{
AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ setFlags(fragment, first, last);
handler.handle(fragment);
}
+
+void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const
+{
+ f.setBof(false);
+ f.setBos(first);
+ f.setEof(last);
+ f.setEos(last);
+}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Thu Sep 13 10:29:16 2007
@@ -39,11 +39,14 @@
mutable FrameHandler& handler;
const uint16_t channel;
const uint16_t maxFrameSize;
+ uint expectedFrameCount;
+ uint frameCount;
- void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+ void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const;
+ void setFlags(AMQFrame& f, bool first, bool last) const;
public:
- SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
- void operator()(AMQFrame& f) const;
+ SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount);
+ void operator()(const AMQFrame& f);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Thu Sep 13 10:29:16 2007
@@ -63,7 +63,10 @@
void TransferContent::populate(const FrameSet& frameset)
{
- header = *frameset.getHeaders();
+ const AMQHeaderBody* h = frameset.getHeaders();
+ if (h) {
+ header = *h;
+ }
frameset.getContent(data);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Thu Sep 13 10:29:16 2007
@@ -49,6 +49,15 @@
uint64_t getSize() { return size; }
};
+class Count
+{
+ uint count;
+public:
+ Count() : count(0) {}
+ void operator()(const AMQFrame&) { count++; }
+ uint getCount() { return count; }
+};
+
class EncodeFrame
{
Buffer& buffer;
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Sep 13 10:29:16 2007
@@ -126,6 +126,10 @@
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
AMQFrame header(0, AMQHeaderBody());
AMQFrame content(0, AMQContentBody(data));
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content.setBof(false);
header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data.size());
header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
@@ -156,6 +160,12 @@
AMQFrame header(0, AMQHeaderBody());
AMQFrame content1(0, AMQContentBody(data1));
AMQFrame content2(0, AMQContentBody(data2));
+ method.setEof(false);
+ header.setBof(false);
+ header.setEof(false);
+ content1.setBof(false);
+ content1.setEof(false);
+ content2.setBof(false);
header.castBody<AMQHeaderBody>()->get<MessageProperties>(true)->setContentLength(data1.size() + data2.size());
header.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setRoutingKey(key);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp Thu Sep 13 10:29:16 2007
@@ -114,6 +114,7 @@
channel.consume(control, "c1", &listener, AckMode(args.ackmode));
cout << "topic_listener: Consuming." << endl;
channel.run();
+ cout << "topic_listener: run returned, closing connection" << endl;
connection.close();
cout << "topic_listener: normal exit" << endl;
}
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=575377&r1=575376&r2=575377&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Thu Sep 13 10:29:16 2007
@@ -122,7 +122,13 @@
def write_0_10(self, frame):
c = self.codec
- c.encode_octet(0x0f) # TODO: currently fixed at ver=0, B=E=b=e=1
+ flags = 0
+ if frame.bof: flags |= 0x08
+ if frame.eof: flags |= 0x04
+ if frame.bos: flags |= 0x02
+ if frame.eos: flags |= 0x01
+
+ c.encode_octet(flags) # TODO: currently fixed at ver=0, B=E=b=e=1
c.encode_octet(self.spec.constants.byname[frame.type].id)
body = StringIO()
enc = codec.Codec(body, self.spec)
@@ -197,6 +203,10 @@
def init(self, args, kwargs):
self.channel = kwargs.pop("channel", 0)
self.subchannel = kwargs.pop("subchannel", 0)
+ self.bos = True
+ self.eos = True
+ self.bof = True
+ self.eof = True
def encode(self, enc): abstract
@@ -216,6 +226,7 @@
self.method = method
self.method_type = method
self.args = args
+ self.eof = not method.content
def encode(self, c):
c.encode_short(self.method.klass.id)
@@ -302,6 +313,8 @@
self.weight = weight
self.size = size
self.properties = properties
+ self.eof = size == 0
+ self.bof = False
def __getitem__(self, name):
return self.properties[name]
@@ -429,6 +442,8 @@
def __init__(self, content):
self.content = content
+ self.eof = True
+ self.bof = False
def encode(self, enc):
enc.write(self.content)