You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2012/12/17 12:22:58 UTC

svn commit: r1422853 [2/3] - in /qpid/branches/java-broker-config-qpid-4390: ./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qmf2/examples/cpp/ qpid...

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py Mon Dec 17 11:22:49 2012
@@ -307,11 +307,22 @@ class Generator:
   def testGenLogs (self, variables):
     return variables["genLogs"]
 
+  def testInBroker (self, variables):
+    return variables['genForBroker']
+
   def genDisclaimer (self, stream, variables):
     prefix = variables["commentPrefix"]
     stream.write (prefix + " This source file was created by a code generator.\n")
     stream.write (prefix + " Please do not edit.")
 
+  def genExternClass (self, stream, variables):
+    if variables['genForBroker']:
+      stream.write("QPID_BROKER_CLASS_EXTERN")
+
+  def genExternMethod (self, stream, variables):
+    if variables['genForBroker']:
+      stream.write("QPID_BROKER_EXTERN")
+
   def fileExt (self, path):
     dot = path.rfind (".")
     if dot == -1:

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py Mon Dec 17 11:22:49 2012
@@ -1476,8 +1476,11 @@ class SchemaClass:
 
   def genMethodIdDeclarations (self, stream, variables):
     number = 1
+    ext = ""
+    if variables['genForBroker']:
+        ext = "QPID_BROKER_EXTERN "
     for method in self.methods:
-      stream.write ("    QPID_BROKER_EXTERN static const uint32_t METHOD_" + method.getName().upper() +\
+      stream.write ("    " + ext + "static const uint32_t METHOD_" + method.getName().upper() +\
                     " = %d;\n" % number)
       number = number + 1
 
@@ -1520,8 +1523,12 @@ class SchemaClass:
   def genParentRefAssignment (self, stream, variables):
     for config in self.properties:
       if config.isParentRef == 1:
-        stream.write (config.getName () + \
-                      " = _parent->GetManagementObject ()->getObjectId ();")
+        if variables['genForBroker']:
+          stream.write (config.getName () + \
+                        " = _parent->GetManagementObjectShared()->getObjectId ();")
+        else:
+          stream.write (config.getName () + \
+                        " = _parent->GetManagementObject()->getObjectId ();")
         return
 
   def genSchemaMD5 (self, stream, variables):

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h Mon Dec 17 11:22:49 2012
@@ -24,7 +24,9 @@
 /*MGEN:Root.Disclaimer*/
 
 #include "qpid/management/ManagementObject.h"
+/*MGEN:IF(Root.InBroker)*/
 #include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
 #include <limits>
 
 namespace qpid {
@@ -36,7 +38,7 @@ namespace qpid {
 namespace qmf {
 /*MGEN:Class.OpenNamespaces*/
 
-QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
+/*MGEN:Root.ExternClass*/ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
 {
   private:
 
@@ -79,22 +81,22 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
   public:
     typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
 
-    QPID_BROKER_EXTERN static void writeSchema(std::string& schema);
-    QPID_BROKER_EXTERN void mapEncodeValues(::qpid::types::Variant::Map& map,
+    /*MGEN:Root.ExternMethod*/ static void writeSchema(std::string& schema);
+    /*MGEN:Root.ExternMethod*/ void mapEncodeValues(::qpid::types::Variant::Map& map,
                                           bool includeProperties=true,
                                           bool includeStatistics=true);
-    QPID_BROKER_EXTERN void mapDecodeValues(const ::qpid::types::Variant::Map& map);
-    QPID_BROKER_EXTERN void doMethod(std::string&           methodName,
+    /*MGEN:Root.ExternMethod*/ void mapDecodeValues(const ::qpid::types::Variant::Map& map);
+    /*MGEN:Root.ExternMethod*/ void doMethod(std::string&           methodName,
                                    const ::qpid::types::Variant::Map& inMap,
                                    ::qpid::types::Variant::Map& outMap,
                                    const std::string& userId);
-    QPID_BROKER_EXTERN std::string getKey() const;
+    /*MGEN:Root.ExternMethod*/ std::string getKey() const;
 /*MGEN:IF(Root.GenQMFv1)*/
-    QPID_BROKER_EXTERN uint32_t writePropertiesSize() const;
-    QPID_BROKER_EXTERN void readProperties(const std::string& buf);
-    QPID_BROKER_EXTERN void writeProperties(std::string& buf) const;
-    QPID_BROKER_EXTERN void writeStatistics(std::string& buf, bool skipHeaders = false);
-    QPID_BROKER_EXTERN void doMethod(std::string& methodName,
+    /*MGEN:Root.ExternMethod*/ uint32_t writePropertiesSize() const;
+    /*MGEN:Root.ExternMethod*/ void readProperties(const std::string& buf);
+    /*MGEN:Root.ExternMethod*/ void writeProperties(std::string& buf) const;
+    /*MGEN:Root.ExternMethod*/ void writeStatistics(std::string& buf, bool skipHeaders = false);
+    /*MGEN:Root.ExternMethod*/ void doMethod(std::string& methodName,
                                    const std::string& inBuf,
                                    std::string& outBuf,
                                    const std::string& userId);
@@ -107,15 +109,15 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
     bool hasInst() { return false; }
 /*MGEN:ENDIF*/
 
-    QPID_BROKER_EXTERN /*MGEN:Class.NameCap*/(
+    /*MGEN:Root.ExternMethod*/ /*MGEN:Class.NameCap*/(
         ::qpid::management::ManagementAgent* agent,
         ::qpid::management::Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
 
-    QPID_BROKER_EXTERN ~/*MGEN:Class.NameCap*/();
+    /*MGEN:Root.ExternMethod*/ ~/*MGEN:Class.NameCap*/();
 
     /*MGEN:Class.SetGeneralReferenceDeclaration*/
 
-    QPID_BROKER_EXTERN static void registerSelf(
+    /*MGEN:Root.ExternMethod*/ static void registerSelf(
         ::qpid::management::ManagementAgent* agent);
 
     std::string& getPackageName() const { return packageName; }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h Mon Dec 17 11:22:49 2012
@@ -24,36 +24,38 @@
 /*MGEN:Root.Disclaimer*/
 
 #include "qpid/management/ManagementEvent.h"
+/*MGEN:IF(Root.InBroker)*/
 #include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
 
 namespace qmf {
 /*MGEN:Event.OpenNamespaces*/
 
-QPID_BROKER_CLASS_EXTERN class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
+/*MGEN:Root.ExternClass*/ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
 {
   private:
     static void writeSchema (std::string& schema);
     static uint8_t md5Sum[MD5_LEN];
-    QPID_BROKER_EXTERN static std::string packageName;
-    QPID_BROKER_EXTERN static std::string eventName;
+    /*MGEN:Root.ExternMethod*/ static std::string packageName;
+    /*MGEN:Root.ExternMethod*/ static std::string eventName;
 
 /*MGEN:Event.ArgDeclarations*/
 
   public:
     writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
 
-    QPID_BROKER_EXTERN Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
-    QPID_BROKER_EXTERN ~Event/*MGEN:Event.NameCap*/() {};
+    /*MGEN:Root.ExternMethod*/ Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
+    /*MGEN:Root.ExternMethod*/ ~Event/*MGEN:Event.NameCap*/() {};
 
     static void registerSelf(::qpid::management::ManagementAgent* agent);
     std::string& getPackageName() const { return packageName; }
     std::string& getEventName() const { return eventName; }
     uint8_t* getMd5Sum() const { return md5Sum; }
     uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
-    QPID_BROKER_EXTERN void encode(std::string& buffer) const;
-    QPID_BROKER_EXTERN void mapEncode(::qpid::types::Variant::Map& map) const;
+    /*MGEN:Root.ExternMethod*/ void encode(std::string& buffer) const;
+    /*MGEN:Root.ExternMethod*/ void mapEncode(::qpid::types::Variant::Map& map) const;
 
-    QPID_BROKER_EXTERN static bool match(const std::string& evt, const std::string& pkg);
+    /*MGEN:Root.ExternMethod*/ static bool match(const std::string& evt, const std::string& pkg);
     static std::pair<std::string,std::string> getFullName() {
         return std::make_pair(packageName, eventName);
     }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h Mon Dec 17 11:22:49 2012
@@ -24,7 +24,9 @@
 /*MGEN:Root.Disclaimer*/
 
 #include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h"
+/*MGEN:IF(Root.InBroker)*/
 #include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
 
 namespace qmf {
 /*MGEN:Class.OpenNamespaces*/
@@ -32,8 +34,8 @@ namespace qmf {
 class Package
 {
   public:
-    QPID_BROKER_EXTERN Package (::qpid::management::ManagementAgent* agent);
-    QPID_BROKER_EXTERN ~Package () {}
+    /*MGEN:Root.ExternMethod*/ Package (::qpid::management::ManagementAgent* agent);
+    /*MGEN:Root.ExternMethod*/ ~Package () {}
 };
 
 }/*MGEN:Class.CloseNamespaces*/

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/CMakeLists.txt:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am Mon Dec 17 11:22:49 2012
@@ -146,7 +146,8 @@ libqpidclient_la_CXXFLAGS = $(AM_CXXFLAG
 
 qpidd_LDADD =					\
   libqpidbroker.la				\
-  libqpidcommon.la
+  libqpidcommon.la              \
+  -lboost_program_options
 
 posix_qpidd_src = posix/QpiddBroker.cpp
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp Mon Dec 17 11:22:49 2012
@@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon {
         uint16_t port=brokerPtr->getPort(options->daemon.transport);
         ready(port);            // Notify parent.
         if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) {
-            boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
+            boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
         }
         brokerPtr->run();
     }
@@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *
             uint16_t port = brokerPtr->getPort(myOptions->daemon.transport);
             cout << port << endl;
             if (options->broker.enableMgmt) {
-                boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
+                boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
             }
         }
         brokerPtr->run();

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:r1411034-1415148

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/acl:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp Mon Dec 17 11:22:49 2012
@@ -317,7 +317,7 @@ Acl::~Acl(){
     broker->getConnectionObservers().remove(connectionCounter);
 }
 
-ManagementObject::shared_ptr Acl::GetManagementObject(void) const
+ManagementObject::shared_ptr Acl::GetManagementObjectShared(void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h Mon Dec 17 11:22:49 2012
@@ -117,7 +117,7 @@ private:
     bool readAclFile(std::string& aclFile, std::string& errorText);
     Manageable::status_t lookup       (management::Args& args, std::string& text);
     Manageable::status_t lookupPublish(management::Args& args, std::string& text);
-    virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const;
+    virtual qpid::management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
 
 };

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp Mon Dec 17 11:22:49 2012
@@ -540,5 +540,6 @@ CharSequence Decoder::readRawUuid()
 }
 
 size_t Decoder::getPosition() const { return position; }
+size_t Decoder::getSize() const { return size; }
 void Decoder::resetSize(size_t s) { size = s; }
 }} // namespace qpid::amqp

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h Mon Dec 17 11:22:49 2012
@@ -71,6 +71,7 @@ class Decoder
     QPID_COMMON_EXTERN void advance(size_t);
     QPID_COMMON_EXTERN size_t getPosition() const;
     QPID_COMMON_EXTERN void resetSize(size_t size);
+    QPID_COMMON_EXTERN size_t getSize() const;
 
   private:
     const char* const start;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp Mon Dec 17 11:22:49 2012
@@ -58,29 +58,35 @@ void Sasl::endFrame(void* frame)
 
 std::size_t Sasl::read(const char* data, size_t available)
 {
-    Decoder decoder(data, available);
-    //read frame-header
-    uint32_t frameSize = decoder.readUInt();
-    QPID_LOG(trace, "Reading SASL frame of size " << frameSize);
-    decoder.resetSize(frameSize);
-    uint8_t dataOffset = decoder.readUByte();
-    uint8_t frameType = decoder.readUByte();
-    if (frameType != 0x01) {
-        QPID_LOG(error, "Expected SASL frame; got type " << frameType);
-    }
-    uint16_t ignored = decoder.readUShort();
-    if (ignored) {
-        QPID_LOG(info, "Got non null bytes at end of SASL frame header");
-    }
+    size_t consumed = 0;
+    while (available - consumed > 4/*framesize*/) {
+        Decoder decoder(data+consumed, available-consumed);
+        //read frame-header
+        uint32_t frameSize = decoder.readUInt();
+        if (frameSize > decoder.getSize()) break;//don't have all the data for this frame yet
+
+        QPID_LOG(trace, "Reading SASL frame of size " << frameSize);
+        decoder.resetSize(frameSize);
+        uint8_t dataOffset = decoder.readUByte();
+        uint8_t frameType = decoder.readUByte();
+        if (frameType != 0x01) {
+            QPID_LOG(error, "Expected SASL frame; got type " << frameType);
+        }
+        uint16_t ignored = decoder.readUShort();
+        if (ignored) {
+            QPID_LOG(info, "Got non null bytes at end of SASL frame header");
+        }
 
-    //body is at offset 4*dataOffset from the start
-    size_t skip = dataOffset*4 - 8;
-    if (skip) {
-        QPID_LOG(info, "Offset for sasl frame was not as expected");
-        decoder.advance(skip);
+        //body is at offset 4*dataOffset from the start
+        size_t skip = dataOffset*4 - 8;
+        if (skip) {
+            QPID_LOG(info, "Offset for sasl frame was not as expected");
+            decoder.advance(skip);
+        }
+        decoder.read(*this);
+        consumed += decoder.getPosition();
     }
-    decoder.read(*this);
-    return decoder.getPosition();
+    return consumed;
 }
 
 std::size_t Sasl::write(char* data, size_t size)

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp Mon Dec 17 11:22:49 2012
@@ -298,7 +298,7 @@ uint32_t Bridge::encodedSize() const
         + 2;              // sync
 }
 
-management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
+management::ManagementObject::shared_ptr Bridge::GetManagementObjectShared (void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h Mon Dec 17 11:22:49 2012
@@ -73,7 +73,7 @@ class Bridge : public PersistableConfig,
 
     bool isDetached() const { return detached; }
 
-    management::ManagementObject::shared_ptr GetManagementObject() const;
+    management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId,
                                                       management::Args& args,
                                                       std::string& text);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp Mon Dec 17 11:22:49 2012
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& co
         systemObject = System::shared_ptr(system);
 
         mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
-        mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
+        mgmtObject->set_systemRef(system->GetManagementObjectShared()->getObjectId());
         mgmtObject->set_port(conf.port);
         mgmtObject->set_workerThreads(conf.workerThreads);
         mgmtObject->set_connBacklog(conf.connectionBacklog);
@@ -454,7 +454,7 @@ Broker::~Broker() {
     QPID_LOG(notice, "Shut down");
 }
 
-ManagementObject::shared_ptr Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObjectShared(void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h Mon Dec 17 11:22:49 2012
@@ -235,7 +235,7 @@ class Broker : public sys::Runnable, pub
     SessionManager& getSessionManager() { return sessionManager; }
     const std::string& getFederationTag() const { return federationTag; }
 
-    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
     QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
         uint32_t methodId, management::Args& args, std::string& text);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp Mon Dec 17 11:22:49 2012
@@ -402,7 +402,7 @@ SessionHandler& Connection::getChannel(C
     return *ptr_map_ptr(i);
 }
 
-ManagementObject::shared_ptr Connection::GetManagementObject(void) const
+ManagementObject::shared_ptr Connection::GetManagementObjectShared(void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h Mon Dec 17 11:22:49 2012
@@ -112,7 +112,7 @@ class Connection : public sys::Connectio
     void closeChannel(framing::ChannelId channel);
 
     // Manageable entry points
-    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Dec 17 11:22:49 2012
@@ -177,7 +177,7 @@ Exchange::Exchange (const string& _name,
             mgmtExchange->set_autoDelete(false);
             agent->addObject(mgmtExchange, 0, durable);
             if (broker)
-                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
+                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
         }
     }
 }
@@ -198,7 +198,7 @@ Exchange::Exchange(const string& _name, 
             mgmtExchange->set_arguments(ManagementAgent::toMap(args));
             agent->addObject(mgmtExchange, 0, durable);
             if (broker)
-                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
+                brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
         }
     }
 
@@ -227,7 +227,7 @@ void Exchange::setAlternate(Exchange::sh
     alternate = _alternate;
     if (mgmtExchange != 0) {
         if (alternate.get() != 0)
-            mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
+            mgmtExchange->set_altExchange(alternate->GetManagementObjectShared()->getObjectId());
         else
             mgmtExchange->clr_altExchange();
     }
@@ -294,7 +294,7 @@ void Exchange::recoveryComplete(Exchange
     }
 }
 
-ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObjectShared (void) const
 {
     return mgmtExchange;
 }
@@ -352,7 +352,7 @@ Exchange::Binding::Binding(const string&
 Exchange::Binding::~Binding ()
 {
     if (mgmtBinding != 0) {
-        _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
+        _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
         if (mo != 0)
             mo->dec_bindingCount();
         mgmtBinding->resourceDestroy ();
@@ -367,7 +367,7 @@ void Exchange::Binding::startManagement(
         if (broker != 0) {
             ManagementAgent* agent = broker->getManagementAgent();
             if (agent != 0) {
-                _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
+                _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
                 if (mo != 0) {
                     management::ObjectId queueId = mo->getObjectId();
 
@@ -383,7 +383,7 @@ void Exchange::Binding::startManagement(
     }
 }
 
-ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObjectShared () const
 {
     return mgmtBinding;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h Mon Dec 17 11:22:49 2012
@@ -58,7 +58,7 @@ public:
                 framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
         ~Binding();
         void startManagement();
-        management::ManagementObject::shared_ptr GetManagementObject() const;
+        management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     };
 
 private:
@@ -210,7 +210,7 @@ public:
     static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
 
     // Manageable entry points
-    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
 
     // Federation hooks
     class DynamicBridge {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp Mon Dec 17 11:22:49 2012
@@ -292,8 +292,8 @@ void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     if (!connection) return;
 
-    if (!hideManagement() && connection->GetManagementObject()) {
-        mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+    if (!hideManagement() && connection->GetManagementObjectShared()) {
+        mgmtObject->set_connectionRef(connection->GetManagementObjectShared()->getObjectId());
     }
 
     // Get default URL from known-hosts if not already set
@@ -669,7 +669,7 @@ uint32_t Link::encodedSize() const
         + password.size() + 1;
 }
 
-ManagementObject::shared_ptr Link::GetManagementObject (void) const
+ManagementObject::shared_ptr Link::GetManagementObjectShared (void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h Mon Dec 17 11:22:49 2012
@@ -183,7 +183,7 @@ class Link : public PersistableConfig, p
     static bool isEncodedLink(const std::string& key);
 
     // Manageable entry points
-    management::ManagementObject::shared_ptr GetManagementObject(void) const;
+    management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
     management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
 
     // manage the exchange owned by this link

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp Mon Dec 17 11:22:49 2012
@@ -42,6 +42,7 @@ boost::intrusive_ptr<const qpid::broker:
     for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) {
         transfer = i->second->translate(m);
     }
+    if (!transfer) throw new Exception("Could not convert message into 0-10");
     return transfer;
 }
 boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec 17 11:22:49 2012
@@ -198,7 +198,7 @@ Queue::Queue(const string& _name, const 
                 new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete));
             mgmtObject->set_arguments(settings.asMap());
             agent->addObject(mgmtObject, 0, store != 0);
-            brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
+            brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared());
             if (brokerMgmtObject)
                 brokerMgmtObject->inc_queueCount();
         }
@@ -1108,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _p
 {
     if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
     {
-        ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
+        ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObjectShared();
         if (childObj != 0)
             childObj->setReference(mgmtObject->getObjectId());
     }
@@ -1154,7 +1154,7 @@ void Queue::setAlternateExchange(boost::
     alternateExchange = exchange;
     if (mgmtObject) {
         if (exchange.get() != 0)
-            mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
+            mgmtObject->set_altExchange(exchange->GetManagementObjectShared()->getObjectId());
         else
             mgmtObject->clr_altExchange();
     }
@@ -1258,7 +1258,7 @@ void Queue::setExternalQueueStore(Extern
     externalQueueStore = inst;
 
     if (inst) {
-        ManagementObject::shared_ptr childObj = inst->GetManagementObject();
+        ManagementObject::shared_ptr childObj = inst->GetManagementObjectShared();
         if (childObj != 0 && mgmtObject != 0)
             childObj->setReference(mgmtObject->getObjectId());
     }
@@ -1306,7 +1306,7 @@ void Queue::countLoadedFromDisk(uint64_t
 }
 
 
-ManagementObject::shared_ptr Queue::GetManagementObject (void) const
+ManagementObject::shared_ptr Queue::GetManagementObjectShared (void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h Mon Dec 17 11:22:49 2012
@@ -340,7 +340,7 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
 
     // Manageable entry points
-    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
     management::Manageable::status_t
     QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
     QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Dec 17 11:22:49 2012
@@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
         if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
         if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
         broker = queue->getBroker();
-        queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
+        queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObjectShared());
         if (queueMgmtObj) {
             queueMgmtObj->set_flowStopped(isFlowControlActive());
         }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Dec 17 11:22:49 2012
@@ -303,14 +303,14 @@ Consumer(_name, type),
     deliveryCount(0),
     protocols(parent->getSession().getBroker().getProtocolRegistry())
 {
-    if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
+    if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0)
     {
         ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
         qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
 
         if (agent != 0)
         {
-            mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+            mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(),
                                                                                !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
             agent->addObject (mgmtObject);
             mgmtObject->set_creditMode("WINDOW");
@@ -318,7 +318,7 @@ Consumer(_name, type),
     }
 }
 
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const
 {
     return mgmtObject;
 }
@@ -398,7 +398,8 @@ ostream& operator<<(ostream& o, const Co
 void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
 {
     Credit original = credit;
-    credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+    boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+    credit.consume(1, transfer->getRequiredCredit());
     QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
              << ", was " << original << " now " << credit);
 
@@ -406,9 +407,10 @@ void SemanticState::ConsumerImpl::alloca
 
 bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
 {
-    bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+    boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+    bool enoughCredit = credit.check(1, transfer->getRequiredCredit());
     QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
-             <<  " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
+             <<  " credit for message of " << transfer->getRequiredCredit() << " bytes: "
              << credit);
     return enoughCredit;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h Mon Dec 17 11:22:49 2012
@@ -163,7 +163,7 @@ class SemanticState : private boost::non
 
         // manageable entry points
         QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
-        GetManagementObject(void) const;
+        GetManagementObjectShared(void) const;
 
         QPID_BROKER_EXTERN management::Manageable::status_t
         ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Dec 17 11:22:49 2012
@@ -65,7 +65,7 @@ SessionState::SessionState(
 }
 
 void SessionState::addManagementObject() {
-    if (GetManagementObject()) return; // Already added.
+    if (GetManagementObjectShared()) return; // Already added.
     Manageable* parent = broker.GetVhostObject ();
     if (parent != 0) {
         ManagementAgent* agent = getBroker().getManagementAgent();
@@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler
     if (mgmtObject != 0)
     {
         mgmtObject->set_attached (1);
-        mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
+        mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId());
         mgmtObject->set_channelId (h.getChannel());
     }
     asyncCommandCompleter->attached();
@@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_
         getConnection().outputTasks.giveReadCredit(credit);
 }
 
-ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
+ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const
 {
     return mgmtObject;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h Mon Dec 17 11:22:49 2012
@@ -110,7 +110,7 @@ class SessionState : public qpid::Sessio
                        const qpid::types::Variant::Map& annotations, bool sync);
 
     // Manageable entry points
-    management::ManagementObject::shared_ptr GetManagementObject (void) const;
+    management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
     management::Manageable::status_t
     ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h Mon Dec 17 11:22:49 2012
@@ -45,7 +45,7 @@ class System : public management::Manage
 
     System (std::string _dataDir, Broker* broker = 0);
 
-    management::ManagementObject::shared_ptr GetManagementObject (void) const
+    management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
     { return mgmtObject; }
 
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h Mon Dec 17 11:22:49 2012
@@ -40,7 +40,7 @@ class Vhost : public management::Managea
 
     Vhost (management::Manageable* parentBroker, Broker* broker = 0);
 
-    management::ManagementObject::shared_ptr GetManagementObject (void) const
+    management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
     { return mgmtObject; }
     void setFederationTag(const std::string& tag);
 };

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Mon Dec 17 11:22:49 2012
@@ -157,6 +157,7 @@ void Connection::process()
     QPID_LOG(trace, id << " process()");
     if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
         QPID_LOG_CAT(debug, model, id << " connection opened");
+        pn_connection_set_container(connection, broker.getFederationTag().c_str());
         pn_connection_open(connection);
     }
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Mon Dec 17 11:22:49 2012
@@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int s
     connection->set_saslSsf(ssf);
 }
 
-qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const
 {
     return connection;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Mon Dec 17 11:22:49 2012
@@ -44,7 +44,7 @@ class ManagedConnection : public qpid::m
     std::string getUserid() const;
     void setSaslMechanism(const std::string&);
     void setSaslSsf(int);
-    qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+    qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     bool isLocal(const ConnectionToken* t) const;
     void incomingMessageReceived();
     void outgoingMessageSent();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp Mon Dec 17 11:22:49 2012
@@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink
 {
     qpid::management::ManagementAgent* agent = broker.getManagementAgent();
     if (agent) {
-        subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
+        subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id,
                                                            false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
         agent->addObject(subscription);
         subscription->set_creditMode("n/a");
@@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLin
     if (subscription != 0) subscription->resourceDestroy();
 }
 
-qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const
 {
     return subscription;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h Mon Dec 17 11:22:49 2012
@@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid:
   public:
     ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
     virtual ~ManagedOutgoingLink();
-    qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+    qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     void outgoingMessageSent();
     void outgoingMessageAccepted();
     void outgoingMessageRejected();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp Mon Dec 17 11:22:49 2012
@@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& b
         session->set_attached(true);
         session->set_detachedLifespan(0);
         session->clr_expireTime();
-        session->set_connectionRef(parent.GetManagementObject()->getObjectId());
+        session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId());
         agent->addObject(session);
     }
 }
@@ -48,7 +48,7 @@ ManagedSession::~ManagedSession()
     if (session) session->resourceDestroy();
 }
 
-qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const
 {
     return session;
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h Mon Dec 17 11:22:49 2012
@@ -40,7 +40,7 @@ class ManagedSession : public qpid::mana
   public:
     ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id);
     virtual ~ManagedSession();
-    qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+    qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
     bool isLocal(const ConnectionToken* t) const;
     void incomingMessageReceived();
     void incomingMessageAccepted();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp Mon Dec 17 11:22:49 2012
@@ -94,6 +94,7 @@ Message::Message(size_t size) : data(siz
 
     applicationProperties.init();
     body.init();
+    footer.init();
 }
 char* Message::getData() { return &data[0]; }
 const char* Message::getData() const { return &data[0]; }
@@ -140,6 +141,10 @@ qpid::amqp::CharSequence Message::getBod
 {
     return body;
 }
+qpid::amqp::CharSequence Message::getFooter() const
+{
+    return footer;
+}
 
 void Message::scan()
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h Mon Dec 17 11:22:49 2012
@@ -63,6 +63,7 @@ class Message : public qpid::broker::Mes
     qpid::amqp::CharSequence getApplicationProperties() const;
     qpid::amqp::CharSequence getBareMessage() const;
     qpid::amqp::CharSequence getBody() const;
+    qpid::amqp::CharSequence getFooter() const;
 
     Message(size_t size);
     char* getData();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Dec 17 11:22:49 2012
@@ -145,6 +145,7 @@ void Outgoing::detached()
 bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
 {
     Record& r = deliveries[current++];
+    if (current >= deliveries.capacity()) current = 0;
     r.cursor = cursor;
     r.msg = msg;
     pn_delivery(link, r.tag);
@@ -161,7 +162,7 @@ void Outgoing::notify()
 
 bool Outgoing::accept(const qpid::broker::Message&)
 {
-    return canDeliver();
+    return true;
 }
 
 void Outgoing::setSubjectFilter(const std::string& f)

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Dec 17 11:22:49 2012
@@ -53,32 +53,33 @@ namespace amqp {
 class Target
 {
   public:
+    Target(pn_link_t* l) : credit(100), window(0), link(l) {}
     virtual ~Target() {}
-    virtual void flow() = 0;
+    bool flow();
+    bool needFlow();
     virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message
-  private:
+  protected:
+    const uint32_t credit;
+    uint32_t window;
+    pn_link_t* link;
 };
 
 class Queue : public Target
 {
   public:
-    Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : queue(q), link(l) {}
-    void flow();
+    Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {}
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Queue> queue;
-    pn_link_t* link;
 };
 
 class Exchange : public Target
 {
   public:
-    Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : exchange(e), link(l) {}
-    void flow();
+    Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {}
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Exchange> exchange;
-    pn_link_t* link;
 };
 
 Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
@@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link)
         if (node.queue) {
             boost::shared_ptr<Target> q(new Queue(node.queue, link));
             targets[link] = q;
-            q->flow();
         } else if (node.exchange) {
             boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
             targets[link] = e;
-            e->flow();
         } else {
             pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
             throw qpid::Exception("Node not found: " + name);/*not-found*/
@@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link, 
         received->begin();
         Transfer t(delivery, shared_from_this());
         received->end(t);
-        target->second->flow();
+        if (target->second->needFlow()) out.activateOutput();
     }
 }
 void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
@@ -283,6 +282,9 @@ bool Session::dispatch()
             accepted(*i, true);
         }
     }
+    for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
+        if (t->second->flow()) output = true;
+    }
 
     return output;
 }
@@ -299,24 +301,32 @@ void Session::close()
     deleted = true;
 }
 
-void Queue::flow()
+void Queue::handle(qpid::broker::Message& message)
 {
-    pn_link_flow(link, 1);//TODO: proper flow control
+    queue->deliver(message);
+    --window;
 }
 
-void Queue::handle(qpid::broker::Message& message)
+void Exchange::handle(qpid::broker::Message& message)
 {
-    queue->deliver(message);
+    DeliverableMessage deliverable(message, 0);
+    exchange->route(deliverable);
+    --window;
 }
 
-void Exchange::flow()
+bool Target::flow()
 {
-    pn_link_flow(link, 1);//TODO: proper flow control
+    bool issue = window < credit;
+    if (issue) {
+        pn_link_flow(link, credit - window);//TODO: proper flow control
+        window = credit;
+    }
+    return issue;
 }
 
-void Exchange::handle(qpid::broker::Message& message)
+bool Target::needFlow()
 {
-    DeliverableMessage deliverable(message, 0);
-    exchange->route(deliverable);
+    return window <= (credit/2);
 }
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Mon Dec 17 11:22:49 2012
@@ -215,6 +215,9 @@ void Translation::write(Outgoing& out)
         //write bare message
         qpid::amqp::CharSequence bareMessage = message->getBareMessage();
         if (bareMessage.size) out.write(bareMessage.data, bareMessage.size);
+        //write footer:
+        qpid::amqp::CharSequence footer = message->getFooter();
+        if (footer.size) out.write(footer.data, footer.size);
     } else {
         const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding());
         if (transfer) {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Dec 17 11:22:49 2012
@@ -116,11 +116,6 @@ void MessageTransfer::computeRequiredCre
     requiredCredit = sum.getSize();
     cachedRequiredCredit = true;
 }
-uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
-{
-    //TODO: may need to reflect annotations and other modifications in this also
-    return get(msg).getRequiredCredit();
-}
 
 qpid::framing::FrameSet& MessageTransfer::getFrames()
 {

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Mon Dec 17 11:22:49 2012
@@ -109,7 +109,6 @@ class MessageTransfer : public qpid::bro
     QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const;
 
     static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
-    static uint32_t getRequiredCredit(const qpid::broker::Message&);
     static MessageTransfer& get(qpid::broker::Message& message) {
         return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
     }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp Mon Dec 17 11:22:49 2012
@@ -151,6 +151,11 @@ void TCPConnector::socketClosed(AsynchIO
         shutdownHandler->shutdown();
 }
 
+void TCPConnector::connectAborted() {
+    connector->stop();
+    connectFailed("Connection timedout");
+}
+
 void TCPConnector::abort() {
     // Can't abort a closed connection
     if (!closed) {
@@ -159,8 +164,7 @@ void TCPConnector::abort() {
             aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
         } else if (connector) {
             // We're still connecting
-            connector->stop();
-            connectFailed("Connection timedout");
+            connector->requestCallback(boost::bind(&TCPConnector::connectAborted, this));
         }
     }
 }

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h Mon Dec 17 11:22:49 2012
@@ -80,6 +80,7 @@ class TCPConnector : public Connector, p
     void close();
     void send(framing::AMQFrame& frame);
     void abort();
+    void connectAborted();
 
     void setInputHandler(framing::InputHandler* handler);
     void setShutdownHandler(sys::ShutdownHandler* handler);

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h Mon Dec 17 11:22:49 2012
@@ -71,7 +71,7 @@ class HaBroker : public management::Mana
     void initialize();
 
     // Implement Manageable.
-    qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
+    qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; }
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp Mon Dec 17 11:22:49 2012
@@ -41,6 +41,16 @@ string Manageable::StatusText (status_t 
     return "??";
 }
 
+ManagementObject* Manageable::GetManagementObject(void) const
+{
+    return 0;
+}
+
+ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const
+{
+    return ManagementObject::shared_ptr();
+}
+
 Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
 {
     return STATUS_UNKNOWN_METHOD;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Dec 17 11:22:49 2012
@@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing
     //
     if (publish) {
         uint64_t uptime = sys::Duration(startTime, sys::now());
-        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
         qpid::sys::MemStat::loadMemInfo(memstat.get());
     }
 
@@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachReques
     string   label;
     uint32_t requestedBrokerBank, requestedAgentBank;
     uint32_t assignedBank;
-    ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+    ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId();
     Uuid     systemId;
 
     moveNewObjects();
@@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachReques
     agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
     agent->mgmtObject->set_connectionRef(agent->connectionRef);
     agent->mgmtObject->set_label        (label);
-    agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
+    agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId());
     agent->mgmtObject->set_systemId     ((const unsigned char*)systemId.data());
     agent->mgmtObject->set_brokerBank   (brokerBank);
     agent->mgmtObject->set_agentBank    (assignedBank);
@@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buf
 
     if (className == "broker") {
         uint64_t uptime = sys::Duration(startTime, sys::now());
-        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
     }
 
 
@@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(con
 
     if (className == "broker") {
         uint64_t uptime = sys::Duration(startTime, sys::now());
-        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+        boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
     }
 
     /*

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Dec 17 11:22:49 2012
@@ -211,7 +211,7 @@ private:
         ObjectId          connectionRef;
         qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
         RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
-        ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+        ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; }
 
         virtual ~RemoteAgent ();
         void mapEncode(qpid::types::Variant::Map& _map) const;

Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1411034-1415148

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Dec 17 11:22:49 2012
@@ -53,7 +53,8 @@ ConnectionContext::ConnectionContext(con
       writeHeader(false),
       readHeader(false),
       haveOutput(false),
-      state(DISCONNECTED)
+      state(DISCONNECTED),
+      codecSwitch(*this)
 {
     if (pn_transport_bind(engine, connection)) {
         //error
@@ -149,7 +150,14 @@ void ConnectionContext::close()
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     if (state != CONNECTED) return;
     if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
-        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+        for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+            //wait for outstanding sends to settle
+            while (!i->second->settled()) {
+                QPID_LOG(debug, "Waiting for sends to settle before closing");
+                wait();//wait until message has been confirmed
+            }
+
+
             if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
                 pn_session_close(i->second->session);
             }
@@ -181,6 +189,7 @@ bool ConnectionContext::fetch(boost::sha
         qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
         if (lnk->capacity) {
             pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+            wakeupDriver();
         }
         return true;
     } else {
@@ -188,12 +197,24 @@ bool ConnectionContext::fetch(boost::sha
             qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
             pn_link_drain(lnk->receiver, 0);
             wakeupDriver();
-            while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
-                QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+            while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+                QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
                 wait();
             }
+            if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+                pn_link_flow(lnk->receiver, lnk->capacity);
+            }
+        }
+        if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+            qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+            if (lnk->capacity) {
+                pn_link_flow(lnk->receiver, 1);
+                wakeupDriver();
+            }
+            return true;
+        } else {
+            return false;
         }
-        return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
     }
 }
 
@@ -322,6 +343,7 @@ void ConnectionContext::setCapacity(boos
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     receiver->setCapacity(capacity);
     pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
+    wakeupDriver();
 }
 uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
 {
@@ -543,13 +565,48 @@ bool ConnectionContext::useSasl()
 
 qpid::sys::Codec& ConnectionContext::getCodec()
 {
-    qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
-    if (sasl.get()) {
-        qpid::sys::Codec* c = sasl->getCodec();
-        if (c) return *c;
-        lock.notifyAll();
+    return codecSwitch;
+}
+
+ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
+std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    size_t decoded = 0;
+    if (parent.sasl.get() && !parent.sasl->authenticated()) {
+        decoded = parent.sasl->decode(buffer, size);
+        if (!parent.sasl->authenticated()) return decoded;
     }
-    return *this;
+    if (decoded < size) {
+        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+        else decoded += parent.decode(buffer+decoded, size-decoded);
+    }
+    return decoded;
 }
+std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    size_t encoded = 0;
+    if (parent.sasl.get() && parent.sasl->canEncode()) {
+        encoded += parent.sasl->encode(buffer, size);
+        if (!parent.sasl->authenticated()) return encoded;
+    }
+    if (encoded < size) {
+        if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+        else encoded += parent.encode(buffer+encoded, size-encoded);
+    }
+    return encoded;
+}
+bool ConnectionContext::CodecSwitch::canEncode()
+{
+    qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+    if (parent.sasl.get()) {
+        if (parent.sasl->canEncode()) return true;
+        else if (!parent.sasl->authenticated()) return false;
+        else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+    }
+    return parent.canEncode();
+}
+
 
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Dec 17 11:22:49 2012
@@ -123,6 +123,17 @@ class ConnectionContext : public qpid::s
         CONNECTED
     } state;
     std::auto_ptr<Sasl> sasl;
+    class CodecSwitch : public qpid::sys::Codec
+    {
+      public:
+        CodecSwitch(ConnectionContext&);
+        std::size_t decode(const char* buffer, std::size_t size);
+        std::size_t encode(char* buffer, std::size_t size);
+        bool canEncode();
+      private:
+        ConnectionContext& parent;
+    };
+    CodecSwitch codecSwitch;
 
     void wait();
     void wakeupDriver();

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Mon Dec 17 11:22:49 2012
@@ -120,18 +120,20 @@ void ReceiverContext::configure(pn_termi
         helper.setNodeProperties(source);
     }
 
-    //filter:
-    pn_data_t* filter = pn_terminus_filter(source);
-    pn_data_put_map(filter);
-    pn_data_enter(filter);
-    pn_data_put_symbol(filter, convert("subject"));
-    //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
-    //pn_data_put_described(filter);
-    //pn_data_enter(filter);
-    //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
-    pn_data_put_string(filter, convert(address.getSubject()));
-    //pn_data_exit(filter);
-    pn_data_exit(filter);
+    if (!address.getSubject().empty()) {
+        //filter:
+        pn_data_t* filter = pn_terminus_filter(source);
+        pn_data_put_map(filter);
+        pn_data_enter(filter);
+        pn_data_put_symbol(filter, convert("subject"));
+        //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
+        //pn_data_put_described(filter);
+        //pn_data_enter(filter);
+        //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
+        pn_data_put_string(filter, convert(address.getSubject()));
+        //pn_data_exit(filter);
+        pn_data_exit(filter);
+    }
 }
 
 bool ReceiverContext::isClosed() const

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp Mon Dec 17 11:22:49 2012
@@ -58,7 +58,7 @@ std::size_t Sasl::encode(char* buffer, s
          encoded += writeProtocolHeader(buffer, size);
          writeHeader = !encoded;
     }
-    if (state == NONE && encoded < size) {
+    if (encoded < size) {
         encoded += write(buffer + encoded, size - encoded);
     }
     haveOutput = (encoded == size);
@@ -135,14 +135,9 @@ void Sasl::outcome(uint8_t result)
     context.activateOutput();
 }
 
-qpid::sys::Codec* Sasl::getCodec()
+qpid::sys::Codec* Sasl::getSecurityLayer()
 {
-    switch (state) {
-      case SUCCEEDED: return static_cast<qpid::sys::Codec*>(securityLayer.get());
-      case FAILED: throw qpid::messaging::UnauthorizedAccess("Failed to authenticate");
-      case NONE: return static_cast<qpid::sys::Codec*>(this);
-    }
-    return 0;
+    return securityLayer.get();
 }
 
 bool Sasl::authenticated()

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h Mon Dec 17 11:22:49 2012
@@ -47,7 +47,7 @@ class Sasl : public qpid::sys::Codec, qp
     bool canEncode();
 
     bool authenticated();
-    qpid::sys::Codec* getCodec();
+    qpid::sys::Codec* getSecurityLayer();
     std::string getAuthenticatedUsername();
  private:
     ConnectionContext& context;

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Mon Dec 17 11:22:49 2012
@@ -80,7 +80,7 @@ const std::string& SenderContext::getTar
 
 SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
 {
-    if (processUnsettled() < capacity) {
+    if (processUnsettled() < capacity && pn_link_credit(sender)) {
         deliveries.push_back(Delivery(nextId++));
         Delivery& delivery = deliveries.back();
         delivery.encode(MessageImplAccess::get(message), address);
@@ -95,6 +95,7 @@ uint32_t SenderContext::processUnsettled
 {
     //remove accepted messages from front of deque
     while (!deliveries.empty() && deliveries.front().accepted()) {
+        deliveries.front().settle();
         deliveries.pop_front();
     }
     return deliveries.size();
@@ -336,7 +337,10 @@ bool SenderContext::Delivery::accepted()
 {
     return pn_delivery_remote_state(token) == PN_ACCEPTED;
 }
-
+void SenderContext::Delivery::settle()
+{
+    pn_delivery_settle(token);
+}
 void SenderContext::configure() const
 {
     configure(pn_link_target(sender));
@@ -350,4 +354,10 @@ void SenderContext::configure(pn_terminu
         helper.setNodeProperties(target);
     }
 }
+
+bool SenderContext::settled()
+{
+    return processUnsettled() == 0;
+}
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Mon Dec 17 11:22:49 2012
@@ -53,6 +53,7 @@ class SenderContext
         void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
         void send(pn_link_t*);
         bool accepted();
+        void settle();
       private:
         int32_t id;
         pn_delivery_t* token;
@@ -69,6 +70,7 @@ class SenderContext
     const std::string& getTarget() const;
     Delivery* send(const qpid::messaging::Message& message);
     void configure() const;
+    bool settled();
   private:
     friend class ConnectionContext;
     typedef std::deque<Delivery> Deliveries;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org