You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/04 20:52:50 UTC
svn commit: r711365 - in /incubator/qpid/trunk/qpid/cpp: rubygen/
rubygen/framing.0-10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/
src/tests/
Author: aconway
Date: Tue Nov 4 11:52:49 2008
New Revision: 711365
URL: http://svn.apache.org/viewvc?rev=711365&view=rev
Log:
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Tue Nov 4 11:52:49 2008
@@ -165,7 +165,7 @@
# The root <amqp> element.
def root() @root ||=parent ? parent.root : self; end
- def to_s() "#<#{self.class}(#{fqname})>"; end
+ def to_s() "#<#{self.class}(#{fqname})>"; end
def inspect() to_s; end
# Text of doc child if there is one.
@@ -181,6 +181,21 @@
return self if is_a? AmqpClass
return parent && parent.containing_class
end
+
+ # 0-10 array domains are missing element type information, add it here.
+ ArrayTypes={
+ "str16-array" => "str-16",
+ "amqp-host-array" => "connection.amqp-host-url",
+ "command-fragments" => "session.command-fragment",
+ "in-doubt" => "dtx.xid",
+ "tx-publish" => "str-8",
+ "queues" => "str-8"
+ }
+
+ def array_type(name)
+ return ArrayTypes[name] if ArrayTypes[name]
+ raise "Missing ArrayType entry for " + name
+ end
end
@@ -204,14 +219,6 @@
amqp_child_reader :choice
end
-# 0-10 array domains are missing element type information, add it here.
-ArrayTypes={
- "str16-array" => "str-16",
- "amqp-host-array" => "connection.amqp-host-url",
- "command-fragments" => "session.command-fragment",
- "in-doubt" => "dtx.xid"
-}
-
class AmqpDomain < AmqpElement
def initialize(xml, parent)
super
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb Tue Nov 4 11:52:49 2008
@@ -147,7 +147,7 @@
class AmqpElement
# convert my amqp type_ attribute to a C++ type.
def amqp2cpp()
- return "ArrayDomain<#{ArrayTypes[name].amqp2cpp}> " if type_=="array"
+ return "ArrayDomain<#{array_type(name).amqp2cpp}> " if type_=="array"
return type_.amqp2cpp
end
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Tue Nov 4 11:52:49 2008
@@ -46,6 +46,62 @@
}
end
+ def typecode_enum(t) "TYPE_CODE_#{t.name.shout}" end
+
+ def typecode_h_cpp
+ path="#{@dir}/TypeCode"
+ h_file(path) {
+ include("<iosfwd>")
+ namespace(@namespace) {
+ scope("enum TypeCode {", "};") {
+ genl @amqp.types.map { |t| "#{typecode_enum t} = #{t.code}" if t.code }.compact.join(",\n")
+ }
+ genl <<EOS
+
+/** True if t is a valid TypeCode value */
+bool isTypeCode(uint8_t t);
+
+/** Throw exception if not a valid TypeCode */
+TypeCode typeCode(uint8_t);
+
+/**@return 0 if t is not a valid enum TypeCode value. */
+const char* typeName(TypeCode t);
+
+std::ostream& operator<<(std::ostream&, TypeCode);
+EOS
+ }
+ }
+
+ cpp_file(path) {
+ include(path);
+ include("qpid/Exception.h")
+ include("<ostream>")
+ namespace(@namespace) {
+ scope("const char* typeName(TypeCode t) {") {
+ scope("switch (t) {") {
+ @amqp.types.each { |t| genl "case #{typecode_enum t}: return \"#{t.name}\";" if t.code }
+ genl "default: break;"
+ }
+ genl "return 0;";
+ }
+ genl <<EOS
+
+bool isTypeCode(uint8_t t) { return typeName(TypeCode(t)); }
+
+TypeCode typeCode(uint8_t t) {
+ if (!isTypeCode(t)) throw Exception(QPID_MSG("Invalid TypeCode " << t));
+ return TypeCode(t);
+}
+
+std::ostream& operator<<(std::ostream& o, TypeCode t) {
+ if (isTypeCode(t)) return o << typeName(t);
+ else return o << "Invalid TypeCode " << t;
+}
+EOS
+ }
+ }
+ end
+
def enum_h()
h_file("#{@dir}/enum") {
# Constants for enum domains.
@@ -134,6 +190,7 @@
enum_h
reply_exceptions_h
reply_exceptions_cpp
+ typecode_h_cpp
end
end
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Nov 4 11:52:49 2008
@@ -101,11 +101,8 @@
void readyToSend();
// Used by cluster to create replica sessions.
- template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
- template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); }
- SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); }
+ SemanticState& getSemanticState() { return semanticState; }
boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
- void record(const DeliveryRecord& delivery) { semanticState.record(delivery); }
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Nov 4 11:52:49 2008
@@ -24,6 +24,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxPublish.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -35,6 +36,14 @@
#include <boost/current_function.hpp>
+// FIXME aconway 2008-11-03:
+//
+// Disproportionate amount of code here is dedicated to receiving a
+// brain-dump when joining a cluster and building initial
+// state. Should be separated out into its own classes.
+//
+
+
namespace qpid {
namespace cluster {
@@ -180,10 +189,16 @@
delivered(mcastDecoder.frame);
}
+broker::SessionState& Connection::sessionState() {
+ return *connection.getChannel(currentChannel).getSession();
+}
+
+broker::SemanticState& Connection::semanticState() {
+ return sessionState().getSemanticState();
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+ broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
}
@@ -197,9 +212,7 @@
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete)
{
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- s->setState(
+ sessionState().setState(
replayStart,
sendCommandPoint,
sentIncomplete,
@@ -207,7 +220,7 @@
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state dump for " << s->getId());
+ QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -234,6 +247,15 @@
return self.first == cluster.getId() && self.second == 0;
}
+broker::QueuedMessage Connection::getDumpMessage() {
+ // Get a message from the DUMP queue.
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+ if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
+ broker::QueuedMessage m = dumpQueue->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+ return m;
+}
+
void Connection::deliveryRecord(const string& qname,
const SequenceNumber& position,
const string& tag,
@@ -245,15 +267,14 @@
bool ended,
bool windowing)
{
- broker::QueuedMessage m;
broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
-
+ broker::QueuedMessage m;
if (!ended) { // Has a message
- if (acquired) // Message at front of dump queue
+ if (acquired) { // Message at front of dump queue
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
m = dumpQueue->get();
+ }
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -266,10 +287,7 @@
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- assert(s);
- s->record(dr);
+ semanticState().record(dr);
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -286,7 +304,7 @@
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
-
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Nov 4 11:52:49 2008
@@ -40,6 +40,13 @@
namespace framing { class AMQFrame; }
+namespace broker {
+class SemanticState;
+class QueuedMessage;
+class TxBuffer;
+class TxAccept;
+}
+
namespace cluster {
class Cluster;
@@ -117,15 +124,17 @@
bool windowing);
void queuePosition(const std::string&, const framing::SequenceNumber&);
-
- private:
- bool catcUp;
+ private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ broker::SessionState& sessionState();
+ broker::SemanticState& semanticState();
+ broker::QueuedMessage getDumpMessage();
+
static NoOpConnectionOutputHandler discardHandler;
Cluster& cluster;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Tue Nov 4 11:52:49 2008
@@ -39,10 +39,12 @@
#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+
namespace qpid {
namespace cluster {
@@ -103,7 +105,7 @@
// Dump exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
-// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+ // Dump queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
session.sync();
session.close();
@@ -154,7 +156,7 @@
session.exchangeUnbind(queue, DumpClient::DUMP);
}
- void dump(const broker::QueuedMessage& message) {
+ void dumpQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
@@ -165,6 +167,10 @@
framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
}
+
+ void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
+ dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
+ }
};
@@ -178,7 +184,7 @@
arg::autoDelete=q->isAutoDelete(),
arg::arguments=q->getSettings());
MessageDumper dumper(q->getName(), session);
- q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
+ q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
}
@@ -217,11 +223,14 @@
// Re-create session state on remote connection.
// Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
- ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ QPID_LOG(debug, dumperId << " dumping consumers.");
+ ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+
+ QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
+ ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ // Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
- // Adjust for message in progress, will be sent after state update.
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
@@ -274,14 +283,22 @@
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- assert(dr.isEnded() || dr.getMessage().payload);
+ dumpDeliveryRecordMessage(dr);
+ dumpDeliveryRecord(dr);
+}
- if (!dr.isEnded() && dr.isAcquired()) {
+void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
+ // Dump the message associated with a dr if need be.
+ if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
- MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+ MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
+}
+
+void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
+ // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Tue Nov 4 11:52:49 2008
@@ -44,6 +44,8 @@
class QueuedMessage;
class SessionHandler;
class DeliveryRecord;
+class SessionState;
+class SemanticState;
} // namespace broker
@@ -79,8 +81,9 @@
void dumpSession(broker::SessionHandler& s);
void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
void dumpUnacked(const broker::DeliveryRecord&);
-
- private:
+ void dumpDeliveryRecord(const broker::DeliveryRecord&);
+ void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
+
MemberId dumperId;
MemberId dumpeeId;
Url dumpeeUrl;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp Tue Nov 4 11:52:49 2008
@@ -28,20 +28,21 @@
namespace qpid {
namespace framing {
-Array::Array() : typeOctet(0xF0/*void*/) {}
+Array::Array() : type(TYPE_CODE_VOID) {}
-Array::Array(uint8_t type) : typeOctet(type) {}
+Array::Array(TypeCode t) : type(t) {}
+
+Array::Array(uint8_t t) : type(typeCode(t)) {}
Array::Array(const std::vector<std::string>& in)
{
- typeOctet = 0xA4;
+ type = TYPE_CODE_STR16;
for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
- ValuePtr value(new StringValue(*i));
+ ValuePtr value(new Str16Value(*i));
values.push_back(value);
}
}
-
uint32_t Array::encodedSize() const {
//note: size is only included when used as a 'top level' type
uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/);
@@ -55,18 +56,18 @@
return values.size();
}
-std::ostream& operator<<(std::ostream& out, const Array& t) {
- out << "{";
- for(Array::ValueVector::const_iterator i = t.values.begin(); i != t.values.end(); ++i) {
- if (i != t.values.begin()) out << ", ";
- out << *(i->get());
+std::ostream& operator<<(std::ostream& out, const Array& a) {
+ out << typeName(a.getType()) << "{";
+ for(Array::ValueVector::const_iterator i = a.values.begin(); i != a.values.end(); ++i) {
+ if (i != a.values.begin()) out << ", ";
+ (*i)->print(out);
}
return out << "}";
}
void Array::encode(Buffer& buffer) const{
buffer.putLong(encodedSize() - 4);//size added only when array is a top-level type
- buffer.putOctet(typeOctet);
+ buffer.putOctet(type);
buffer.putLong(count());
for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) {
(*i)->getData().encode(buffer);
@@ -81,11 +82,11 @@
<< size << " bytes but only " << available << " available"));
}
if (size) {
- typeOctet = buffer.getOctet();
+ type = TypeCode(buffer.getOctet());
uint32_t count = buffer.getLong();
FieldValue dummy;
- dummy.setType(typeOctet);
+ dummy.setType(type);
available = buffer.available();
if (available < count * dummy.getData().encodedSize()) {
throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected "
@@ -95,7 +96,7 @@
for (uint32_t i = 0; i < count; i++) {
ValuePtr value(new FieldValue);
- value->setType(typeOctet);
+ value->setType(type);
value->getData().decode(buffer);
values.push_back(ValuePtr(value));
}
@@ -104,7 +105,7 @@
bool Array::operator==(const Array& x) const {
- if (typeOctet != x.typeOctet) return false;
+ if (type != x.type) return false;
if (values.size() != x.values.size()) return false;
for (ValueVector::const_iterator i = values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) {
@@ -114,12 +115,13 @@
return true;
}
-void Array::add(ValuePtr value)
-{
- if (typeOctet != value->getType()) {
- throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, expected " << typeOctet << " but found " << value->getType()));
+void Array::insert(iterator i, ValuePtr value) {
+ if (type != value->getType()) {
+ // FIXME aconway 2008-10-31: put meaningful strings in this message.
+ throw Exception(QPID_MSG("Wrong type of value in Array, expected " << type
+ << " but found " << TypeCode(value->getType())));
}
- values.push_back(value);
+ values.insert(i, value);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h Tue Nov 4 11:52:49 2008
@@ -18,12 +18,12 @@
* under the License.
*
*/
-#include <iostream>
-#include <vector>
-#include <boost/shared_ptr.hpp>
-#include <map>
#include "amqp_types.h"
#include "FieldValue.h"
+#include "qpid/framing/TypeCode.h"
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <vector>
#ifndef _Array_
#define _Array_
@@ -38,6 +38,8 @@
public:
typedef boost::shared_ptr<FieldValue> ValuePtr;
typedef std::vector<ValuePtr> ValueVector;
+ typedef ValueVector::const_iterator const_iterator;
+ typedef ValueVector::iterator iterator;
uint32_t encodedSize() const;
void encode(Buffer& buffer) const;
@@ -47,11 +49,30 @@
bool operator==(const Array& other) const;
Array();
+ Array(TypeCode type);
Array(uint8_t type);
//creates a longstr array
Array(const std::vector<std::string>& in);
- void add(ValuePtr value);
+ TypeCode getType() const { return type; }
+
+ // std collection interface.
+ const_iterator begin() const { return values.begin(); }
+ const_iterator end() const { return values.end(); }
+ iterator begin() { return values.begin(); }
+ iterator end(){ return values.end(); }
+
+ ValuePtr front() const { return values.front(); }
+ ValuePtr back() const { return values.back(); }
+ size_t size() const { return values.size(); }
+
+ void insert(iterator i, ValuePtr value);
+ void erase(iterator i) { values.erase(i); }
+ void push_back(ValuePtr value) { values.insert(end(), value); }
+ void pop_back() { values.pop_back(); }
+
+ // Non-std interface
+ void add(ValuePtr value) { push_back(value); }
template <class T>
void collect(std::vector<T>& out) const
@@ -60,12 +81,9 @@
out.push_back((*i)->get<T>());
}
}
-
- ValueVector::const_iterator begin() const { return values.begin(); }
- ValueVector::const_iterator end() const { return values.end(); }
private:
- uint8_t typeOctet;
+ TypeCode type;
ValueVector values;
friend std::ostream& operator<<(std::ostream& out, const Array& body);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp Tue Nov 4 11:52:49 2008
@@ -109,10 +109,10 @@
*data == *v.data;
}
-StringValue::StringValue(const std::string& v) :
+Str8Value::Str8Value(const std::string& v) :
FieldValue(
- 0xA4,
- new VariableWidthValue<4>(
+ TYPE_CODE_STR8,
+ new VariableWidthValue<1>(
reinterpret_cast<const uint8_t*>(v.data()),
reinterpret_cast<const uint8_t*>(v.data()+v.size())))
{
@@ -168,4 +168,13 @@
{
}
+void FieldValue::print(std::ostream& out) const {
+ data->print(out);
+ out << TypeCode(typeOctet) << '(';
+ if (data->convertsToString()) out << data->getString();
+ else if (data->convertsToInt()) out << data->getInt();
+ else data->print(out);
+ out << ')';
+}
+
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h Tue Nov 4 11:52:49 2008
@@ -89,7 +89,8 @@
void decode(Buffer& buffer);
bool operator==(const FieldValue&) const;
bool operator!=(const FieldValue& v) const { return !(*this == v); }
- void print(std::ostream& out) const { out << "(0x" << std::hex << int(typeOctet) << ")"; data->print(out); }
+
+ void print(std::ostream& out) const;
template <typename T> bool convertsTo() const { return false; }
template <typename T> T get() const { throw InvalidConversionException(); }
@@ -239,12 +240,9 @@
void print(std::ostream& o) const { o << "[" << value << "]"; };
};
-/*
- * Basic string value encodes as iso-8859-15 with 32 bit length
- */
-class StringValue : public FieldValue {
+class Str8Value : public FieldValue {
public:
- StringValue(const std::string& v);
+ Str8Value(const std::string& v);
};
class Str16Value : public FieldValue {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp Tue Nov 4 11:52:49 2008
@@ -24,17 +24,17 @@
using namespace qpid::framing;
-StringValue s("abc");
+Str16Value s("abc");
IntegerValue i(42);
//DecimalValue d(1234,2);
//FieldTableValue ft;
//EmptyValue e;
-QPID_AUTO_TEST_CASE(testStringValueEquals)
+QPID_AUTO_TEST_CASE(testStr16ValueEquals)
{
- BOOST_CHECK(StringValue("abc") == s);
- BOOST_CHECK(StringValue("foo") != s);
+ BOOST_CHECK(Str16Value("abc") == s);
+ BOOST_CHECK(Str16Value("foo") != s);
BOOST_CHECK(s != i);
BOOST_CHECK(s.convertsTo<std::string>() == true);
BOOST_CHECK(s.convertsTo<int>() == false);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Nov 4 11:52:49 2008
@@ -216,6 +216,51 @@
uint16_t channel;
};
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
+ ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1
+ Client c0(cluster[0], "c0");
+ c0.session.queueDeclare(arg::queue="q");
+ c0.session.messageTransfer(arg::content=Message("A", "q"));
+ c0.session.messageTransfer(arg::content=Message("B", "q"));
+
+ // Start a transaction that will commit.
+ Session commitSession = c0.connection.newSession("commit");
+ SubscriptionManager commitSubs(commitSession);
+ commitSession.txSelect();
+ commitSession.messageTransfer(arg::content=Message("a", "q"));
+ commitSession.messageTransfer(arg::content=Message("b", "q"));
+ BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+
+ // Start a transaction that will roll back.
+ Session rollbackSession = c0.connection.newSession("rollback");
+ SubscriptionManager rollbackSubs(rollbackSession);
+ rollbackSession.txSelect();
+ rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+ BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B");
+
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+ // Add new member mid transaction.
+ cluster.add();
+ Client c1(cluster[1], "c1");
+
+ // More transactional work
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ rollbackSession.messageTransfer(arg::content=Message("2", "q"));
+ commitSession.messageTransfer(arg::content=Message("c", "q"));
+ rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+ // Commit/roll back.
+ commitSession.txCommit();
+ rollbackSession.txRollback();
+ // Verify queue status: just the comitted messages
+ BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
+ BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+}
+
QPID_AUTO_TEST_CASE(testUnsupported) {
ScopedSuppressLogging sl;
ClusterFixture cluster(1);