You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/04 20:52:50 UTC

svn commit: r711365 - in /incubator/qpid/trunk/qpid/cpp: rubygen/ rubygen/framing.0-10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/tests/

Author: aconway
Date: Tue Nov  4 11:52:49 2008
New Revision: 711365

URL: http://svn.apache.org/viewvc?rev=711365&view=rev
Log:

constants.rb: generate type code constants for AMQP types. Useful with Array.

framing/Array:
 - added some std:::vector like functions & typedefs.
 - use TypeCode enums, human readable  ostream << operator.

Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
    incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Tue Nov  4 11:52:49 2008
@@ -165,7 +165,7 @@
   # The root <amqp> element.
   def root() @root ||=parent ? parent.root : self; end
 
- def to_s() "#<#{self.class}(#{fqname})>"; end
+  def to_s() "#<#{self.class}(#{fqname})>"; end
   def inspect() to_s; end
 
   # Text of doc child if there is one.
@@ -181,6 +181,21 @@
     return self if is_a? AmqpClass
     return parent && parent.containing_class
   end
+
+  # 0-10 array domains are missing element type information, add it here.
+  ArrayTypes={
+    "str16-array" => "str-16",
+    "amqp-host-array" => "connection.amqp-host-url",
+    "command-fragments" => "session.command-fragment",
+    "in-doubt" => "dtx.xid",
+    "tx-publish" => "str-8",
+    "queues" => "str-8"
+  }
+
+  def array_type(name)
+    return  ArrayTypes[name] if ArrayTypes[name]
+    raise "Missing ArrayType entry for " + name
+  end
   
 end
 
@@ -204,14 +219,6 @@
   amqp_child_reader :choice
 end
 
-# 0-10 array domains are missing element type information, add it here.
-ArrayTypes={
-  "str16-array" => "str-16",
-  "amqp-host-array" => "connection.amqp-host-url",
-  "command-fragments" => "session.command-fragment",
-  "in-doubt" => "dtx.xid"
-}
-
 class AmqpDomain < AmqpElement
   def initialize(xml, parent)
     super

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/cppgen.rb Tue Nov  4 11:52:49 2008
@@ -147,7 +147,7 @@
 class AmqpElement
   # convert my amqp type_ attribute to a C++ type.
   def amqp2cpp()
-    return "ArrayDomain<#{ArrayTypes[name].amqp2cpp}> " if type_=="array" 
+    return "ArrayDomain<#{array_type(name).amqp2cpp}> " if type_=="array"
     return type_.amqp2cpp
   end
 

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Tue Nov  4 11:52:49 2008
@@ -46,6 +46,62 @@
     }
   end
 
+  def typecode_enum(t) "TYPE_CODE_#{t.name.shout}" end
+
+  def typecode_h_cpp
+    path="#{@dir}/TypeCode"
+    h_file(path) {
+      include("<iosfwd>")
+      namespace(@namespace) { 
+        scope("enum TypeCode {", "};") {
+          genl @amqp.types.map { |t| "#{typecode_enum t} = #{t.code}" if t.code }.compact.join(",\n")
+        }
+        genl <<EOS
+
+/** True if t is a valid TypeCode value */
+bool isTypeCode(uint8_t t);
+
+/** Throw exception if not a valid TypeCode */
+TypeCode typeCode(uint8_t);
+
+/**@return 0 if t is not a valid enum TypeCode value. */
+const char* typeName(TypeCode t);
+
+std::ostream& operator<<(std::ostream&, TypeCode);
+EOS
+      }
+    }
+
+    cpp_file(path) {
+      include(path);
+      include("qpid/Exception.h")
+      include("<ostream>")
+      namespace(@namespace) { 
+        scope("const char* typeName(TypeCode t) {") {
+          scope("switch (t) {") {
+            @amqp.types.each { |t| genl "case #{typecode_enum t}: return \"#{t.name}\";" if t.code }
+            genl "default: break;"
+          }
+          genl "return 0;";
+        }
+        genl <<EOS
+
+bool isTypeCode(uint8_t t) { return typeName(TypeCode(t)); }
+
+TypeCode typeCode(uint8_t t) {
+    if (!isTypeCode(t)) throw Exception(QPID_MSG("Invalid TypeCode " << t));
+    return TypeCode(t);
+}
+
+std::ostream& operator<<(std::ostream& o, TypeCode t) {
+    if (isTypeCode(t)) return o << typeName(t);
+    else return o << "Invalid TypeCode " << t;
+}
+EOS
+      }
+    }
+  end
+  
   def enum_h()
     h_file("#{@dir}/enum") {
       # Constants for enum domains.
@@ -134,6 +190,7 @@
     enum_h
     reply_exceptions_h
     reply_exceptions_cpp
+    typecode_h_cpp
   end
 end
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Nov  4 11:52:49 2008
@@ -101,11 +101,8 @@
     void readyToSend();
 
     // Used by cluster to create replica sessions.
-    template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
-    template <class F> void eachUnacked(F f) { semanticState.eachUnacked(f); }
-    SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); } 
+    SemanticState& getSemanticState() { return semanticState; }
     boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
-    void record(const DeliveryRecord& delivery) { semanticState.record(delivery); }
 
   private:
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Nov  4 11:52:49 2008
@@ -24,6 +24,7 @@
 
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxPublish.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AllInvoker.h"
@@ -35,6 +36,14 @@
 
 #include <boost/current_function.hpp>
 
+// FIXME aconway 2008-11-03:
+// 
+// Disproportionate amount of code here is dedicated to receiving a
+// brain-dump when joining a cluster and building initial
+// state. Should be separated out into its own classes.
+//
+
+
 namespace qpid {
 namespace cluster {
 
@@ -180,10 +189,16 @@
         delivered(mcastDecoder.frame);
 }
 
+broker::SessionState& Connection::sessionState() {
+    return *connection.getChannel(currentChannel).getSession();
+}
+
+broker::SemanticState& Connection::semanticState() {
+    return sessionState().getSemanticState();
+}
+
 void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
-    broker::SessionHandler& h = connection.getChannel(currentChannel);
-    broker::SessionState* s = h.getSession();
-    broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
     c.setBlocked(blocked);
     if (notifyEnabled) c.enableNotify(); else c.disableNotify();
 }
@@ -197,9 +212,7 @@
     const SequenceSet& unknownCompleted,
     const SequenceSet& receivedIncomplete)
 {
-    broker::SessionHandler& h = connection.getChannel(currentChannel);
-    broker::SessionState* s = h.getSession();
-    s->setState(
+    sessionState().setState(
         replayStart,
         sendCommandPoint,
         sentIncomplete,
@@ -207,7 +220,7 @@
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, cluster << " received session state dump for " << s->getId());
+    QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -234,6 +247,15 @@
     return self.first == cluster.getId() && self.second == 0;
 }
 
+broker::QueuedMessage Connection::getDumpMessage() {
+    // Get a message from the DUMP queue.
+    broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+    if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
+    broker::QueuedMessage m = dumpQueue->get();
+    if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+    return m;
+}
+
 void Connection::deliveryRecord(const string& qname,
                                 const SequenceNumber& position,
                                 const string& tag,
@@ -245,15 +267,14 @@
                                 bool ended,
                                 bool windowing)
 {
-    broker::QueuedMessage m;
     broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
     if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
-    broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
-    if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
-
+    broker::QueuedMessage m;
     if (!ended) {               // Has a message
-        if (acquired)           // Message at front of dump queue
+        if (acquired) {          // Message at front of dump queue
+            broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
             m = dumpQueue->get();
+        }
         else                    // Message at original position in original queue
             m = queue->find(position);
         if (!m.payload)
@@ -266,10 +287,7 @@
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
 
-    broker::SessionHandler& h = connection.getChannel(currentChannel);
-    broker::SessionState* s = h.getSession();
-    assert(s);
-    s->record(dr);
+    semanticState().record(dr);
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -286,7 +304,7 @@
     return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
 }
 
-    
+
 
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Nov  4 11:52:49 2008
@@ -40,6 +40,13 @@
 
 namespace framing { class AMQFrame; }
 
+namespace broker {
+class SemanticState;
+class QueuedMessage;
+class TxBuffer;
+class TxAccept;
+}
+
 namespace cluster {
 class Cluster;
 
@@ -117,15 +124,17 @@
                         bool windowing);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
-    
-  private:
-    bool catcUp;
 
+  private:
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
     void deliverDoOutput(uint32_t requested);
     void sendDoOutput();
 
+    broker::SessionState& sessionState();
+    broker::SemanticState& semanticState();
+    broker::QueuedMessage getDumpMessage();
+
     static NoOpConnectionOutputHandler discardHandler;
 
     Cluster& cluster;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Tue Nov  4 11:52:49 2008
@@ -39,10 +39,12 @@
 #include "qpid/framing/ClusterConnectionConsumerStateBody.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/TypeCode.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
 #include <boost/bind.hpp>
 
+
 namespace qpid {
 namespace cluster {
 
@@ -103,7 +105,7 @@
     // Dump exchange is used to route messages to the proper queue without modifying routing key.
     session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
     b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
-// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+    // Dump queue is used to transfer acquired messages that are no longer on their original queue.
     session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
     session.sync();
     session.close();
@@ -154,7 +156,7 @@
         session.exchangeUnbind(queue, DumpClient::DUMP);
     }
 
-    void dump(const broker::QueuedMessage& message) {
+    void dumpQueuedMessage(const broker::QueuedMessage& message) {
         if (!haveLastPos || message.position - lastPos != 1)  {
             ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
             haveLastPos = true;
@@ -165,6 +167,10 @@
             framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
         sb.get()->send(transfer, message.payload->getFrames());
     }
+
+    void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
+        dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
+    }
 };
 
 
@@ -178,7 +184,7 @@
         arg::autoDelete=q->isAutoDelete(),
         arg::arguments=q->getSettings());
     MessageDumper dumper(q->getName(), session);
-    q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
+    q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
     q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
 }
 
@@ -217,11 +223,14 @@
     // Re-create session state on remote connection.
 
     // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
-    ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
-    ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+    QPID_LOG(debug, dumperId << " dumping consumers.");
+    ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+
+    QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
+    ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
 
+    //  Adjust for command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
-    //  Adjust for message in progress, will be sent after state update.
     SequenceNumber received = ss->receiverGetReceived().command;
     if (inProgress)  
         --received;
@@ -274,14 +283,22 @@
 }
     
 void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
-    assert(dr.isEnded() || dr.getMessage().payload);
+    dumpDeliveryRecordMessage(dr);
+    dumpDeliveryRecord(dr);
+}
 
-    if (!dr.isEnded() && dr.isAcquired()) {
+void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
+    // Dump the message associated with a dr if need be.
+    if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
         // If the message is acquired then it is no longer on the
         // dumpees queue, put it on the dump queue for dumpee to pick up.
         //
-        MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+        MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
     }
+}
+
+void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
+    // Assumes the associated message has already been dumped (if needed)
     ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Tue Nov  4 11:52:49 2008
@@ -44,6 +44,8 @@
 class QueuedMessage;
 class SessionHandler;
 class DeliveryRecord;
+class SessionState;
+class SemanticState;
 
 } // namespace broker
 
@@ -79,8 +81,9 @@
     void dumpSession(broker::SessionHandler& s);
     void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
     void dumpUnacked(const broker::DeliveryRecord&);
-    
-  private:
+    void dumpDeliveryRecord(const broker::DeliveryRecord&);
+    void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
+
     MemberId dumperId;
     MemberId dumpeeId;
     Url dumpeeUrl;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.cpp Tue Nov  4 11:52:49 2008
@@ -28,20 +28,21 @@
 namespace qpid {
 namespace framing {
 
-Array::Array() : typeOctet(0xF0/*void*/) {}
+Array::Array() : type(TYPE_CODE_VOID) {}
 
-Array::Array(uint8_t type) : typeOctet(type) {}
+Array::Array(TypeCode t) : type(t) {}
+
+Array::Array(uint8_t t) : type(typeCode(t)) {}
 
 Array::Array(const std::vector<std::string>& in)
 {
-    typeOctet = 0xA4;
+    type = TYPE_CODE_STR16;
     for (std::vector<std::string>::const_iterator i = in.begin(); i != in.end(); ++i) {
-        ValuePtr value(new StringValue(*i));
+        ValuePtr value(new Str16Value(*i));
         values.push_back(value);
     }
 } 
 
-
 uint32_t Array::encodedSize() const {
     //note: size is only included when used as a 'top level' type
     uint32_t len(4/*size*/ + 1/*type*/ + 4/*count*/);
@@ -55,18 +56,18 @@
     return values.size();
 }
 
-std::ostream& operator<<(std::ostream& out, const Array& t) {
-    out << "{";
-    for(Array::ValueVector::const_iterator i = t.values.begin(); i != t.values.end(); ++i) {
-        if (i != t.values.begin()) out << ", ";
-        out << *(i->get());
+std::ostream& operator<<(std::ostream& out, const Array& a) {
+    out << typeName(a.getType()) << "{";
+    for(Array::ValueVector::const_iterator i = a.values.begin(); i != a.values.end(); ++i) {
+        if (i != a.values.begin()) out << ", ";
+        (*i)->print(out);
     }
     return out << "}";
 }
 
 void Array::encode(Buffer& buffer) const{
     buffer.putLong(encodedSize() - 4);//size added only when array is a top-level type
-    buffer.putOctet(typeOctet);
+    buffer.putOctet(type);
     buffer.putLong(count());
     for (ValueVector::const_iterator i = values.begin(); i!=values.end(); ++i) {
     	(*i)->getData().encode(buffer);
@@ -81,11 +82,11 @@
                                             << size << " bytes but only " << available << " available"));
     }
     if (size) {
-        typeOctet = buffer.getOctet();
+        type = TypeCode(buffer.getOctet());
         uint32_t count = buffer.getLong();
         
         FieldValue dummy;
-        dummy.setType(typeOctet);
+        dummy.setType(type);
         available = buffer.available();
         if (available < count * dummy.getData().encodedSize()) {
             throw IllegalArgumentException(QPID_MSG("Not enough data for array, expected " 
@@ -95,7 +96,7 @@
         
         for (uint32_t i = 0; i < count; i++) {
             ValuePtr value(new FieldValue);
-            value->setType(typeOctet);
+            value->setType(type);
             value->getData().decode(buffer);
             values.push_back(ValuePtr(value));
         }    
@@ -104,7 +105,7 @@
 
 
 bool Array::operator==(const Array& x) const {
-    if (typeOctet != x.typeOctet) return false;
+    if (type != x.type) return false;
     if (values.size() != x.values.size()) return false;
 
     for (ValueVector::const_iterator i =  values.begin(), j = x.values.begin(); i != values.end(); ++i, ++j) {
@@ -114,12 +115,13 @@
     return true;
 }
 
-void Array::add(ValuePtr value)
-{
-    if (typeOctet != value->getType()) {
-        throw IllegalArgumentException(QPID_MSG("Wrong type of value in Array, expected " << typeOctet << " but found " << value->getType()));
+void Array::insert(iterator i, ValuePtr value) {
+    if (type != value->getType()) {
+        // FIXME aconway 2008-10-31:  put meaningful strings in this message.
+        throw Exception(QPID_MSG("Wrong type of value in Array, expected " << type
+                                 << " but found " << TypeCode(value->getType())));
     }
-    values.push_back(value);
+    values.insert(i, value);
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Array.h Tue Nov  4 11:52:49 2008
@@ -18,12 +18,12 @@
  * under the License.
  *
  */
-#include <iostream>
-#include <vector>
-#include <boost/shared_ptr.hpp>
-#include <map>
 #include "amqp_types.h"
 #include "FieldValue.h"
+#include "qpid/framing/TypeCode.h"
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <vector>
 
 #ifndef _Array_
 #define _Array_
@@ -38,6 +38,8 @@
   public:
     typedef boost::shared_ptr<FieldValue> ValuePtr;
     typedef std::vector<ValuePtr> ValueVector;
+    typedef ValueVector::const_iterator const_iterator;
+    typedef ValueVector::iterator iterator;
 
     uint32_t encodedSize() const;
     void encode(Buffer& buffer) const;
@@ -47,11 +49,30 @@
     bool operator==(const Array& other) const;
 
     Array();
+    Array(TypeCode type);
     Array(uint8_t type);
     //creates a longstr array
     Array(const std::vector<std::string>& in);
 
-    void add(ValuePtr value);
+    TypeCode getType() const { return type; }
+    
+    // std collection interface.
+    const_iterator begin() const { return values.begin(); }
+    const_iterator end() const { return values.end(); }
+    iterator begin() { return values.begin(); }
+    iterator end(){ return values.end(); }
+
+    ValuePtr front() const { return values.front(); }
+    ValuePtr back() const { return values.back(); }
+    size_t size() const { return values.size(); }
+
+    void insert(iterator i, ValuePtr value);
+    void erase(iterator i) { values.erase(i); }
+    void push_back(ValuePtr value) { values.insert(end(), value); }
+    void pop_back() { values.pop_back(); }
+    
+    // Non-std interface
+    void add(ValuePtr value) { push_back(value); }
 
     template <class T>
     void collect(std::vector<T>& out) const
@@ -60,12 +81,9 @@
             out.push_back((*i)->get<T>());
         }
     }
-    
-    ValueVector::const_iterator begin() const { return values.begin(); }
-    ValueVector::const_iterator end() const { return values.end(); }
 
   private:
-    uint8_t typeOctet;
+    TypeCode type;
     ValueVector values;
 
     friend std::ostream& operator<<(std::ostream& out, const Array& body);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp Tue Nov  4 11:52:49 2008
@@ -109,10 +109,10 @@
         *data == *v.data;
 }
 
-StringValue::StringValue(const std::string& v) :
+Str8Value::Str8Value(const std::string& v) :
     FieldValue(
-        0xA4,
-        new VariableWidthValue<4>(
+        TYPE_CODE_STR8,
+        new VariableWidthValue<1>(
             reinterpret_cast<const uint8_t*>(v.data()),
             reinterpret_cast<const uint8_t*>(v.data()+v.size())))
 {
@@ -168,4 +168,13 @@
 {
 }
 
+void FieldValue::print(std::ostream& out) const {
+    data->print(out);
+    out << TypeCode(typeOctet) << '(';
+    if (data->convertsToString()) out << data->getString();
+    else if (data->convertsToInt()) out << data->getInt();
+    else data->print(out);
+    out << ')';
+}
+
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.h Tue Nov  4 11:52:49 2008
@@ -89,7 +89,8 @@
     void decode(Buffer& buffer);
     bool operator==(const FieldValue&) const;
     bool operator!=(const FieldValue& v) const { return !(*this == v); }
-    void print(std::ostream& out) const { out << "(0x" << std::hex << int(typeOctet) << ")"; data->print(out); }
+    
+    void print(std::ostream& out) const;
     
     template <typename T> bool convertsTo() const { return false; }
     template <typename T> T get() const { throw InvalidConversionException(); }
@@ -239,12 +240,9 @@
     void print(std::ostream& o) const { o << "[" << value << "]"; };
 };
 
-/*
- * Basic string value encodes as iso-8859-15 with 32 bit length
- */ 
-class StringValue : public FieldValue {
+class Str8Value : public FieldValue {
   public:
-    StringValue(const std::string& v);
+    Str8Value(const std::string& v);
 };
 
 class Str16Value : public FieldValue {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/FieldValue.cpp Tue Nov  4 11:52:49 2008
@@ -24,17 +24,17 @@
 
 using namespace qpid::framing;
 
-StringValue s("abc");
+Str16Value s("abc");
 IntegerValue i(42);
 //DecimalValue d(1234,2);
 //FieldTableValue ft;
 //EmptyValue e;
 
-QPID_AUTO_TEST_CASE(testStringValueEquals)
+QPID_AUTO_TEST_CASE(testStr16ValueEquals)
 {
     
-    BOOST_CHECK(StringValue("abc") == s);
-    BOOST_CHECK(StringValue("foo") != s);
+    BOOST_CHECK(Str16Value("abc") == s);
+    BOOST_CHECK(Str16Value("foo") != s);
     BOOST_CHECK(s != i);
     BOOST_CHECK(s.convertsTo<std::string>() == true);
     BOOST_CHECK(s.convertsTo<int>() == false);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711365&r1=711364&r2=711365&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Nov  4 11:52:49 2008
@@ -216,6 +216,51 @@
     uint16_t channel;
 };
 
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
+    ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare(arg::queue="q");
+    c0.session.messageTransfer(arg::content=Message("A", "q"));
+    c0.session.messageTransfer(arg::content=Message("B", "q"));
+
+    // Start a transaction that will commit.
+    Session commitSession = c0.connection.newSession("commit");
+    SubscriptionManager commitSubs(commitSession);
+    commitSession.txSelect();
+    commitSession.messageTransfer(arg::content=Message("a", "q"));
+    commitSession.messageTransfer(arg::content=Message("b", "q"));
+    BOOST_CHECK_EQUAL(commitSubs.get("q", TIME_SEC).getData(), "A");
+
+    // Start a transaction that will roll back.
+    Session rollbackSession = c0.connection.newSession("rollback");
+    SubscriptionManager rollbackSubs(rollbackSession);
+    rollbackSession.txSelect();
+    rollbackSession.messageTransfer(arg::content=Message("1", "q"));
+    BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B");
+
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    // Add new member mid transaction.
+    cluster.add();            
+    Client c1(cluster[1], "c1");
+
+    // More transactional work
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    rollbackSession.messageTransfer(arg::content=Message("2", "q"));
+    commitSession.messageTransfer(arg::content=Message("c", "q"));
+    rollbackSession.messageTransfer(arg::content=Message("3", "q"));
+
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);    
+    // Commit/roll back.
+    commitSession.txCommit();
+    rollbackSession.txRollback();
+    // Verify queue status: just the comitted messages
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "b");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
+}
+
 QPID_AUTO_TEST_CASE(testUnsupported) {
     ScopedSuppressLogging sl;
     ClusterFixture cluster(1);