You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2012/12/17 12:22:58 UTC
svn commit: r1422853 [2/3] - in /qpid/branches/java-broker-config-qpid-4390:
./ qpid/ qpid/bin/ qpid/cpp/ qpid/cpp/bindings/qmf/
qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/
qpid/cpp/bindings/qmf2/ qpid/cpp/bindings/qmf2/examples/cpp/ qpid...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/generate.py Mon Dec 17 11:22:49 2012
@@ -307,11 +307,22 @@ class Generator:
def testGenLogs (self, variables):
return variables["genLogs"]
+ def testInBroker (self, variables):
+ return variables['genForBroker']
+
def genDisclaimer (self, stream, variables):
prefix = variables["commentPrefix"]
stream.write (prefix + " This source file was created by a code generator.\n")
stream.write (prefix + " Please do not edit.")
+ def genExternClass (self, stream, variables):
+ if variables['genForBroker']:
+ stream.write("QPID_BROKER_CLASS_EXTERN")
+
+ def genExternMethod (self, stream, variables):
+ if variables['genForBroker']:
+ stream.write("QPID_BROKER_EXTERN")
+
def fileExt (self, path):
dot = path.rfind (".")
if dot == -1:
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/schema.py Mon Dec 17 11:22:49 2012
@@ -1476,8 +1476,11 @@ class SchemaClass:
def genMethodIdDeclarations (self, stream, variables):
number = 1
+ ext = ""
+ if variables['genForBroker']:
+ ext = "QPID_BROKER_EXTERN "
for method in self.methods:
- stream.write (" QPID_BROKER_EXTERN static const uint32_t METHOD_" + method.getName().upper() +\
+ stream.write (" " + ext + "static const uint32_t METHOD_" + method.getName().upper() +\
" = %d;\n" % number)
number = number + 1
@@ -1520,8 +1523,12 @@ class SchemaClass:
def genParentRefAssignment (self, stream, variables):
for config in self.properties:
if config.isParentRef == 1:
- stream.write (config.getName () + \
- " = _parent->GetManagementObject ()->getObjectId ();")
+ if variables['genForBroker']:
+ stream.write (config.getName () + \
+ " = _parent->GetManagementObjectShared()->getObjectId ();")
+ else:
+ stream.write (config.getName () + \
+ " = _parent->GetManagementObject()->getObjectId ();")
return
def genSchemaMD5 (self, stream, variables):
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Class.h Mon Dec 17 11:22:49 2012
@@ -24,7 +24,9 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementObject.h"
+/*MGEN:IF(Root.InBroker)*/
#include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
#include <limits>
namespace qpid {
@@ -36,7 +38,7 @@ namespace qpid {
namespace qmf {
/*MGEN:Class.OpenNamespaces*/
-QPID_BROKER_CLASS_EXTERN class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
+/*MGEN:Root.ExternClass*/ class /*MGEN:Class.NameCap*/ : public ::qpid::management::ManagementObject
{
private:
@@ -79,22 +81,22 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
public:
typedef boost::shared_ptr</*MGEN:Class.NameCap*/> shared_ptr;
- QPID_BROKER_EXTERN static void writeSchema(std::string& schema);
- QPID_BROKER_EXTERN void mapEncodeValues(::qpid::types::Variant::Map& map,
+ /*MGEN:Root.ExternMethod*/ static void writeSchema(std::string& schema);
+ /*MGEN:Root.ExternMethod*/ void mapEncodeValues(::qpid::types::Variant::Map& map,
bool includeProperties=true,
bool includeStatistics=true);
- QPID_BROKER_EXTERN void mapDecodeValues(const ::qpid::types::Variant::Map& map);
- QPID_BROKER_EXTERN void doMethod(std::string& methodName,
+ /*MGEN:Root.ExternMethod*/ void mapDecodeValues(const ::qpid::types::Variant::Map& map);
+ /*MGEN:Root.ExternMethod*/ void doMethod(std::string& methodName,
const ::qpid::types::Variant::Map& inMap,
::qpid::types::Variant::Map& outMap,
const std::string& userId);
- QPID_BROKER_EXTERN std::string getKey() const;
+ /*MGEN:Root.ExternMethod*/ std::string getKey() const;
/*MGEN:IF(Root.GenQMFv1)*/
- QPID_BROKER_EXTERN uint32_t writePropertiesSize() const;
- QPID_BROKER_EXTERN void readProperties(const std::string& buf);
- QPID_BROKER_EXTERN void writeProperties(std::string& buf) const;
- QPID_BROKER_EXTERN void writeStatistics(std::string& buf, bool skipHeaders = false);
- QPID_BROKER_EXTERN void doMethod(std::string& methodName,
+ /*MGEN:Root.ExternMethod*/ uint32_t writePropertiesSize() const;
+ /*MGEN:Root.ExternMethod*/ void readProperties(const std::string& buf);
+ /*MGEN:Root.ExternMethod*/ void writeProperties(std::string& buf) const;
+ /*MGEN:Root.ExternMethod*/ void writeStatistics(std::string& buf, bool skipHeaders = false);
+ /*MGEN:Root.ExternMethod*/ void doMethod(std::string& methodName,
const std::string& inBuf,
std::string& outBuf,
const std::string& userId);
@@ -107,15 +109,15 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
bool hasInst() { return false; }
/*MGEN:ENDIF*/
- QPID_BROKER_EXTERN /*MGEN:Class.NameCap*/(
+ /*MGEN:Root.ExternMethod*/ /*MGEN:Class.NameCap*/(
::qpid::management::ManagementAgent* agent,
::qpid::management::Manageable* coreObject/*MGEN:Class.ParentArg*//*MGEN:Class.ConstructorArgs*/);
- QPID_BROKER_EXTERN ~/*MGEN:Class.NameCap*/();
+ /*MGEN:Root.ExternMethod*/ ~/*MGEN:Class.NameCap*/();
/*MGEN:Class.SetGeneralReferenceDeclaration*/
- QPID_BROKER_EXTERN static void registerSelf(
+ /*MGEN:Root.ExternMethod*/ static void registerSelf(
::qpid::management::ManagementAgent* agent);
std::string& getPackageName() const { return packageName; }
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Event.h Mon Dec 17 11:22:49 2012
@@ -24,36 +24,38 @@
/*MGEN:Root.Disclaimer*/
#include "qpid/management/ManagementEvent.h"
+/*MGEN:IF(Root.InBroker)*/
#include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
namespace qmf {
/*MGEN:Event.OpenNamespaces*/
-QPID_BROKER_CLASS_EXTERN class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
+/*MGEN:Root.ExternClass*/ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
{
private:
static void writeSchema (std::string& schema);
static uint8_t md5Sum[MD5_LEN];
- QPID_BROKER_EXTERN static std::string packageName;
- QPID_BROKER_EXTERN static std::string eventName;
+ /*MGEN:Root.ExternMethod*/ static std::string packageName;
+ /*MGEN:Root.ExternMethod*/ static std::string eventName;
/*MGEN:Event.ArgDeclarations*/
public:
writeSchemaCall_t getWriteSchemaCall(void) { return writeSchema; }
- QPID_BROKER_EXTERN Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
- QPID_BROKER_EXTERN ~Event/*MGEN:Event.NameCap*/() {};
+ /*MGEN:Root.ExternMethod*/ Event/*MGEN:Event.NameCap*/(/*MGEN:Event.ConstructorArgs*/);
+ /*MGEN:Root.ExternMethod*/ ~Event/*MGEN:Event.NameCap*/() {};
static void registerSelf(::qpid::management::ManagementAgent* agent);
std::string& getPackageName() const { return packageName; }
std::string& getEventName() const { return eventName; }
uint8_t* getMd5Sum() const { return md5Sum; }
uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
- QPID_BROKER_EXTERN void encode(std::string& buffer) const;
- QPID_BROKER_EXTERN void mapEncode(::qpid::types::Variant::Map& map) const;
+ /*MGEN:Root.ExternMethod*/ void encode(std::string& buffer) const;
+ /*MGEN:Root.ExternMethod*/ void mapEncode(::qpid::types::Variant::Map& map) const;
- QPID_BROKER_EXTERN static bool match(const std::string& evt, const std::string& pkg);
+ /*MGEN:Root.ExternMethod*/ static bool match(const std::string& evt, const std::string& pkg);
static std::pair<std::string,std::string> getFullName() {
return std::make_pair(packageName, eventName);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/managementgen/qmfgen/templates/Package.h Mon Dec 17 11:22:49 2012
@@ -24,7 +24,9 @@
/*MGEN:Root.Disclaimer*/
#include "qpid//*MGEN:Class.AgentHeaderLocation*//ManagementAgent.h"
+/*MGEN:IF(Root.InBroker)*/
#include "qmf/BrokerImportExport.h"
+/*MGEN:ENDIF*/
namespace qmf {
/*MGEN:Class.OpenNamespaces*/
@@ -32,8 +34,8 @@ namespace qmf {
class Package
{
public:
- QPID_BROKER_EXTERN Package (::qpid::management::ManagementAgent* agent);
- QPID_BROKER_EXTERN ~Package () {}
+ /*MGEN:Root.ExternMethod*/ Package (::qpid::management::ManagementAgent* agent);
+ /*MGEN:Root.ExternMethod*/ ~Package () {}
};
}/*MGEN:Class.CloseNamespaces*/
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/CMakeLists.txt:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/Makefile.am Mon Dec 17 11:22:49 2012
@@ -146,7 +146,8 @@ libqpidclient_la_CXXFLAGS = $(AM_CXXFLAG
qpidd_LDADD = \
libqpidbroker.la \
- libqpidcommon.la
+ libqpidcommon.la \
+ -lboost_program_options
posix_qpidd_src = posix/QpiddBroker.cpp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/posix/QpiddBroker.cpp Mon Dec 17 11:22:49 2012
@@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon {
uint16_t port=brokerPtr->getPort(options->daemon.transport);
ready(port); // Notify parent.
if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) {
- boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
}
brokerPtr->run();
}
@@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *
uint16_t port = brokerPtr->getPort(myOptions->daemon.transport);
cout << port << endl;
if (options->broker.enableMgmt) {
- boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObjectShared())->set_port(port);
}
}
brokerPtr->run();
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:r1411034-1415148
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/acl:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.cpp Mon Dec 17 11:22:49 2012
@@ -317,7 +317,7 @@ Acl::~Acl(){
broker->getConnectionObservers().remove(connectionCounter);
}
-ManagementObject::shared_ptr Acl::GetManagementObject(void) const
+ManagementObject::shared_ptr Acl::GetManagementObjectShared(void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/acl/Acl.h Mon Dec 17 11:22:49 2012
@@ -117,7 +117,7 @@ private:
bool readAclFile(std::string& aclFile, std::string& errorText);
Manageable::status_t lookup (management::Args& args, std::string& text);
Manageable::status_t lookupPublish(management::Args& args, std::string& text);
- virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const;
+ virtual qpid::management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.cpp Mon Dec 17 11:22:49 2012
@@ -540,5 +540,6 @@ CharSequence Decoder::readRawUuid()
}
size_t Decoder::getPosition() const { return position; }
+size_t Decoder::getSize() const { return size; }
void Decoder::resetSize(size_t s) { size = s; }
}} // namespace qpid::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Decoder.h Mon Dec 17 11:22:49 2012
@@ -71,6 +71,7 @@ class Decoder
QPID_COMMON_EXTERN void advance(size_t);
QPID_COMMON_EXTERN size_t getPosition() const;
QPID_COMMON_EXTERN void resetSize(size_t size);
+ QPID_COMMON_EXTERN size_t getSize() const;
private:
const char* const start;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/amqp/Sasl.cpp Mon Dec 17 11:22:49 2012
@@ -58,29 +58,35 @@ void Sasl::endFrame(void* frame)
std::size_t Sasl::read(const char* data, size_t available)
{
- Decoder decoder(data, available);
- //read frame-header
- uint32_t frameSize = decoder.readUInt();
- QPID_LOG(trace, "Reading SASL frame of size " << frameSize);
- decoder.resetSize(frameSize);
- uint8_t dataOffset = decoder.readUByte();
- uint8_t frameType = decoder.readUByte();
- if (frameType != 0x01) {
- QPID_LOG(error, "Expected SASL frame; got type " << frameType);
- }
- uint16_t ignored = decoder.readUShort();
- if (ignored) {
- QPID_LOG(info, "Got non null bytes at end of SASL frame header");
- }
+ size_t consumed = 0;
+ while (available - consumed > 4/*framesize*/) {
+ Decoder decoder(data+consumed, available-consumed);
+ //read frame-header
+ uint32_t frameSize = decoder.readUInt();
+ if (frameSize > decoder.getSize()) break;//don't have all the data for this frame yet
+
+ QPID_LOG(trace, "Reading SASL frame of size " << frameSize);
+ decoder.resetSize(frameSize);
+ uint8_t dataOffset = decoder.readUByte();
+ uint8_t frameType = decoder.readUByte();
+ if (frameType != 0x01) {
+ QPID_LOG(error, "Expected SASL frame; got type " << frameType);
+ }
+ uint16_t ignored = decoder.readUShort();
+ if (ignored) {
+ QPID_LOG(info, "Got non null bytes at end of SASL frame header");
+ }
- //body is at offset 4*dataOffset from the start
- size_t skip = dataOffset*4 - 8;
- if (skip) {
- QPID_LOG(info, "Offset for sasl frame was not as expected");
- decoder.advance(skip);
+ //body is at offset 4*dataOffset from the start
+ size_t skip = dataOffset*4 - 8;
+ if (skip) {
+ QPID_LOG(info, "Offset for sasl frame was not as expected");
+ decoder.advance(skip);
+ }
+ decoder.read(*this);
+ consumed += decoder.getPosition();
}
- decoder.read(*this);
- return decoder.getPosition();
+ return consumed;
}
std::size_t Sasl::write(char* data, size_t size)
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.cpp Mon Dec 17 11:22:49 2012
@@ -298,7 +298,7 @@ uint32_t Bridge::encodedSize() const
+ 2; // sync
}
-management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
+management::ManagementObject::shared_ptr Bridge::GetManagementObjectShared (void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Bridge.h Mon Dec 17 11:22:49 2012
@@ -73,7 +73,7 @@ class Bridge : public PersistableConfig,
bool isDetached() const { return detached; }
- management::ManagementObject::shared_ptr GetManagementObject() const;
+ management::ManagementObject::shared_ptr GetManagementObjectShared() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
management::Args& args,
std::string& text);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.cpp Mon Dec 17 11:22:49 2012
@@ -234,7 +234,7 @@ Broker::Broker(const Broker::Options& co
systemObject = System::shared_ptr(system);
mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
- mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
+ mgmtObject->set_systemRef(system->GetManagementObjectShared()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
mgmtObject->set_connBacklog(conf.connectionBacklog);
@@ -454,7 +454,7 @@ Broker::~Broker() {
QPID_LOG(notice, "Shut down");
}
-ManagementObject::shared_ptr Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObjectShared(void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Broker.h Mon Dec 17 11:22:49 2012
@@ -235,7 +235,7 @@ class Broker : public sys::Runnable, pub
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared() const;
QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.cpp Mon Dec 17 11:22:49 2012
@@ -402,7 +402,7 @@ SessionHandler& Connection::getChannel(C
return *ptr_map_ptr(i);
}
-ManagementObject::shared_ptr Connection::GetManagementObject(void) const
+ManagementObject::shared_ptr Connection::GetManagementObjectShared(void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Connection.h Mon Dec 17 11:22:49 2012
@@ -112,7 +112,7 @@ class Connection : public sys::Connectio
void closeChannel(framing::ChannelId channel);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Dec 17 11:22:49 2012
@@ -177,7 +177,7 @@ Exchange::Exchange (const string& _name,
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
}
}
}
@@ -198,7 +198,7 @@ Exchange::Exchange(const string& _name,
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObjectShared());
}
}
@@ -227,7 +227,7 @@ void Exchange::setAlternate(Exchange::sh
alternate = _alternate;
if (mgmtExchange != 0) {
if (alternate.get() != 0)
- mgmtExchange->set_altExchange(alternate->GetManagementObject()->getObjectId());
+ mgmtExchange->set_altExchange(alternate->GetManagementObjectShared()->getObjectId());
else
mgmtExchange->clr_altExchange();
}
@@ -294,7 +294,7 @@ void Exchange::recoveryComplete(Exchange
}
}
-ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObjectShared (void) const
{
return mgmtExchange;
}
@@ -352,7 +352,7 @@ Exchange::Binding::Binding(const string&
Exchange::Binding::~Binding ()
{
if (mgmtBinding != 0) {
- _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
if (mo != 0)
mo->dec_bindingCount();
mgmtBinding->resourceDestroy ();
@@ -367,7 +367,7 @@ void Exchange::Binding::startManagement(
if (broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObjectShared());
if (mo != 0) {
management::ObjectId queueId = mo->getObjectId();
@@ -383,7 +383,7 @@ void Exchange::Binding::startManagement(
}
}
-ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObjectShared () const
{
return mgmtBinding;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Exchange.h Mon Dec 17 11:22:49 2012
@@ -58,7 +58,7 @@ public:
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
void startManagement();
- management::ManagementObject::shared_ptr GetManagementObject() const;
+ management::ManagementObject::shared_ptr GetManagementObjectShared() const;
};
private:
@@ -210,7 +210,7 @@ public:
static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
// Federation hooks
class DynamicBridge {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.cpp Mon Dec 17 11:22:49 2012
@@ -292,8 +292,8 @@ void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
- if (!hideManagement() && connection->GetManagementObject()) {
- mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
+ if (!hideManagement() && connection->GetManagementObjectShared()) {
+ mgmtObject->set_connectionRef(connection->GetManagementObjectShared()->getObjectId());
}
// Get default URL from known-hosts if not already set
@@ -669,7 +669,7 @@ uint32_t Link::encodedSize() const
+ password.size() + 1;
}
-ManagementObject::shared_ptr Link::GetManagementObject (void) const
+ManagementObject::shared_ptr Link::GetManagementObjectShared (void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Link.h Mon Dec 17 11:22:49 2012
@@ -183,7 +183,7 @@ class Link : public PersistableConfig, p
static bool isEncodedLink(const std::string& key);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject(void) const;
+ management::ManagementObject::shared_ptr GetManagementObjectShared(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
// manage the exchange owned by this link
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Protocol.cpp Mon Dec 17 11:22:49 2012
@@ -42,6 +42,7 @@ boost::intrusive_ptr<const qpid::broker:
for (Protocols::const_iterator i = protocols.begin(); !transfer && i != protocols.end(); ++i) {
transfer = i->second->translate(m);
}
+ if (!transfer) throw new Exception("Could not convert message into 0-10");
return transfer;
}
boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.cpp Mon Dec 17 11:22:49 2012
@@ -198,7 +198,7 @@ Queue::Queue(const string& _name, const
new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
- brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
+ brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
@@ -1108,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _p
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
- ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
+ ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObjectShared();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1154,7 +1154,7 @@ void Queue::setAlternateExchange(boost::
alternateExchange = exchange;
if (mgmtObject) {
if (exchange.get() != 0)
- mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
+ mgmtObject->set_altExchange(exchange->GetManagementObjectShared()->getObjectId());
else
mgmtObject->clr_altExchange();
}
@@ -1258,7 +1258,7 @@ void Queue::setExternalQueueStore(Extern
externalQueueStore = inst;
if (inst) {
- ManagementObject::shared_ptr childObj = inst->GetManagementObject();
+ ManagementObject::shared_ptr childObj = inst->GetManagementObjectShared();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1306,7 +1306,7 @@ void Queue::countLoadedFromDisk(uint64_t
}
-ManagementObject::shared_ptr Queue::GetManagementObject (void) const
+ManagementObject::shared_ptr Queue::GetManagementObjectShared (void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Queue.h Mon Dec 17 11:22:49 2012
@@ -340,7 +340,7 @@ class Queue : public boost::enable_share
QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
management::Manageable::status_t
QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Dec 17 11:22:49 2012
@@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
- queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObjectShared());
if (queueMgmtObj) {
queueMgmtObj->set_flowStopped(isFlowControlActive());
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Dec 17 11:22:49 2012
@@ -303,14 +303,14 @@ Consumer(_name, type),
deliveryCount(0),
protocols(parent->getSession().getBroker().getProtocolRegistry())
{
- if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
+ if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0)
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
if (agent != 0)
{
- mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -318,7 +318,7 @@ Consumer(_name, type),
}
}
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const
{
return mgmtObject;
}
@@ -398,7 +398,8 @@ ostream& operator<<(ostream& o, const Co
void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
{
Credit original = credit;
- credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+ credit.consume(1, transfer->getRequiredCredit());
QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this)
<< ", was " << original << " now " << credit);
@@ -406,9 +407,10 @@ void SemanticState::ConsumerImpl::alloca
bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
{
- bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg));
+ boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
+ bool enoughCredit = credit.check(1, transfer->getRequiredCredit());
QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient")
- << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: "
+ << " credit for message of " << transfer->getRequiredCredit() << " bytes: "
<< credit);
return enoughCredit;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h Mon Dec 17 11:22:49 2012
@@ -163,7 +163,7 @@ class SemanticState : private boost::non
// manageable entry points
QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
- GetManagementObject(void) const;
+ GetManagementObjectShared(void) const;
QPID_BROKER_EXTERN management::Manageable::status_t
ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Dec 17 11:22:49 2012
@@ -65,7 +65,7 @@ SessionState::SessionState(
}
void SessionState::addManagementObject() {
- if (GetManagementObject()) return; // Already added.
+ if (GetManagementObjectShared()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler
if (mgmtObject != 0)
{
mgmtObject->set_attached (1);
- mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
asyncCommandCompleter->attached();
@@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_
getConnection().outputTasks.giveReadCredit(credit);
}
-ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
+ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h Mon Dec 17 11:22:49 2012
@@ -110,7 +110,7 @@ class SessionState : public qpid::Sessio
const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject (void) const;
+ management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h Mon Dec 17 11:22:49 2012
@@ -45,7 +45,7 @@ class System : public management::Manage
System (std::string _dataDir, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObject (void) const
+ management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
{ return mgmtObject; }
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h Mon Dec 17 11:22:49 2012
@@ -40,7 +40,7 @@ class Vhost : public management::Managea
Vhost (management::Manageable* parentBroker, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObject (void) const
+ management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
{ return mgmtObject; }
void setFederationTag(const std::string& tag);
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Connection.cpp Mon Dec 17 11:22:49 2012
@@ -157,6 +157,7 @@ void Connection::process()
QPID_LOG(trace, id << " process()");
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
QPID_LOG_CAT(debug, model, id << " connection opened");
+ pn_connection_set_container(connection, broker.getFederationTag().c_str());
pn_connection_open(connection);
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Mon Dec 17 11:22:49 2012
@@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int s
connection->set_saslSsf(ssf);
}
-qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const
{
return connection;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Mon Dec 17 11:22:49 2012
@@ -44,7 +44,7 @@ class ManagedConnection : public qpid::m
std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
- qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void outgoingMessageSent();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp Mon Dec 17 11:22:49 2012
@@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
+ subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id,
false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
agent->addObject(subscription);
subscription->set_creditMode("n/a");
@@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLin
if (subscription != 0) subscription->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const
{
return subscription;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h Mon Dec 17 11:22:49 2012
@@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid:
public:
ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
virtual ~ManagedOutgoingLink();
- qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
void outgoingMessageSent();
void outgoingMessageAccepted();
void outgoingMessageRejected();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp Mon Dec 17 11:22:49 2012
@@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& b
session->set_attached(true);
session->set_detachedLifespan(0);
session->clr_expireTime();
- session->set_connectionRef(parent.GetManagementObject()->getObjectId());
+ session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId());
agent->addObject(session);
}
}
@@ -48,7 +48,7 @@ ManagedSession::~ManagedSession()
if (session) session->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const
+qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const
{
return session;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h Mon Dec 17 11:22:49 2012
@@ -40,7 +40,7 @@ class ManagedSession : public qpid::mana
public:
ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id);
virtual ~ManagedSession();
- qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void incomingMessageAccepted();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.cpp Mon Dec 17 11:22:49 2012
@@ -94,6 +94,7 @@ Message::Message(size_t size) : data(siz
applicationProperties.init();
body.init();
+ footer.init();
}
char* Message::getData() { return &data[0]; }
const char* Message::getData() const { return &data[0]; }
@@ -140,6 +141,10 @@ qpid::amqp::CharSequence Message::getBod
{
return body;
}
+qpid::amqp::CharSequence Message::getFooter() const
+{
+ return footer;
+}
void Message::scan()
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Message.h Mon Dec 17 11:22:49 2012
@@ -63,6 +63,7 @@ class Message : public qpid::broker::Mes
qpid::amqp::CharSequence getApplicationProperties() const;
qpid::amqp::CharSequence getBareMessage() const;
qpid::amqp::CharSequence getBody() const;
+ qpid::amqp::CharSequence getFooter() const;
Message(size_t size);
char* getData();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Dec 17 11:22:49 2012
@@ -145,6 +145,7 @@ void Outgoing::detached()
bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
+ if (current >= deliveries.capacity()) current = 0;
r.cursor = cursor;
r.msg = msg;
pn_delivery(link, r.tag);
@@ -161,7 +162,7 @@ void Outgoing::notify()
bool Outgoing::accept(const qpid::broker::Message&)
{
- return canDeliver();
+ return true;
}
void Outgoing::setSubjectFilter(const std::string& f)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Dec 17 11:22:49 2012
@@ -53,32 +53,33 @@ namespace amqp {
class Target
{
public:
+ Target(pn_link_t* l) : credit(100), window(0), link(l) {}
virtual ~Target() {}
- virtual void flow() = 0;
+ bool flow();
+ bool needFlow();
virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message
- private:
+ protected:
+ const uint32_t credit;
+ uint32_t window;
+ pn_link_t* link;
};
class Queue : public Target
{
public:
- Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : queue(q), link(l) {}
- void flow();
+ Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
- pn_link_t* link;
};
class Exchange : public Target
{
public:
- Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : exchange(e), link(l) {}
- void flow();
+ Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
- pn_link_t* link;
};
Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
@@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link)
if (node.queue) {
boost::shared_ptr<Target> q(new Queue(node.queue, link));
targets[link] = q;
- q->flow();
} else if (node.exchange) {
boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
targets[link] = e;
- e->flow();
} else {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
@@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link,
received->begin();
Transfer t(delivery, shared_from_this());
received->end(t);
- target->second->flow();
+ if (target->second->needFlow()) out.activateOutput();
}
}
void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
@@ -283,6 +282,9 @@ bool Session::dispatch()
accepted(*i, true);
}
}
+ for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
+ if (t->second->flow()) output = true;
+ }
return output;
}
@@ -299,24 +301,32 @@ void Session::close()
deleted = true;
}
-void Queue::flow()
+void Queue::handle(qpid::broker::Message& message)
{
- pn_link_flow(link, 1);//TODO: proper flow control
+ queue->deliver(message);
+ --window;
}
-void Queue::handle(qpid::broker::Message& message)
+void Exchange::handle(qpid::broker::Message& message)
{
- queue->deliver(message);
+ DeliverableMessage deliverable(message, 0);
+ exchange->route(deliverable);
+ --window;
}
-void Exchange::flow()
+bool Target::flow()
{
- pn_link_flow(link, 1);//TODO: proper flow control
+ bool issue = window < credit;
+ if (issue) {
+ pn_link_flow(link, credit - window);//TODO: proper flow control
+ window = credit;
+ }
+ return issue;
}
-void Exchange::handle(qpid::broker::Message& message)
+bool Target::needFlow()
{
- DeliverableMessage deliverable(message, 0);
- exchange->route(deliverable);
+ return window <= (credit/2);
}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Mon Dec 17 11:22:49 2012
@@ -215,6 +215,9 @@ void Translation::write(Outgoing& out)
//write bare message
qpid::amqp::CharSequence bareMessage = message->getBareMessage();
if (bareMessage.size) out.write(bareMessage.data, bareMessage.size);
+ //write footer:
+ qpid::amqp::CharSequence footer = message->getFooter();
+ if (footer.size) out.write(footer.data, footer.size);
} else {
const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding());
if (transfer) {
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Dec 17 11:22:49 2012
@@ -116,11 +116,6 @@ void MessageTransfer::computeRequiredCre
requiredCredit = sum.getSize();
cachedRequiredCredit = true;
}
-uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
-{
- //TODO: may need to reflect annotations and other modifications in this also
- return get(msg).getRequiredCredit();
-}
qpid::framing::FrameSet& MessageTransfer::getFrames()
{
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Mon Dec 17 11:22:49 2012
@@ -109,7 +109,6 @@ class MessageTransfer : public qpid::bro
QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const;
static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
- static uint32_t getRequiredCredit(const qpid::broker::Message&);
static MessageTransfer& get(qpid::broker::Message& message) {
return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.cpp Mon Dec 17 11:22:49 2012
@@ -151,6 +151,11 @@ void TCPConnector::socketClosed(AsynchIO
shutdownHandler->shutdown();
}
+void TCPConnector::connectAborted() {
+ connector->stop();
+ connectFailed("Connection timedout");
+}
+
void TCPConnector::abort() {
// Can't abort a closed connection
if (!closed) {
@@ -159,8 +164,7 @@ void TCPConnector::abort() {
aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1));
} else if (connector) {
// We're still connecting
- connector->stop();
- connectFailed("Connection timedout");
+ connector->requestCallback(boost::bind(&TCPConnector::connectAborted, this));
}
}
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/client/TCPConnector.h Mon Dec 17 11:22:49 2012
@@ -80,6 +80,7 @@ class TCPConnector : public Connector, p
void close();
void send(framing::AMQFrame& frame);
void abort();
+ void connectAborted();
void setInputHandler(framing::InputHandler* handler);
void setShutdownHandler(sys::ShutdownHandler* handler);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h Mon Dec 17 11:22:49 2012
@@ -71,7 +71,7 @@ class HaBroker : public management::Mana
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp Mon Dec 17 11:22:49 2012
@@ -41,6 +41,16 @@ string Manageable::StatusText (status_t
return "??";
}
+ManagementObject* Manageable::GetManagementObject(void) const
+{
+ return 0;
+}
+
+ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const
+{
+ return ManagementObject::shared_ptr();
+}
+
Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
{
return STATUS_UNKNOWN_METHOD;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Dec 17 11:22:49 2012
@@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
qpid::sys::MemStat::loadMemInfo(memstat.get());
}
@@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachReques
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
- ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId();
Uuid systemId;
moveNewObjects();
@@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachReques
agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
- agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId());
agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
@@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buf
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
}
@@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(con
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
}
/*
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Dec 17 11:22:49 2012
@@ -211,7 +211,7 @@ private:
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
- ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+ ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1411034-1415148
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Mon Dec 17 11:22:49 2012
@@ -53,7 +53,8 @@ ConnectionContext::ConnectionContext(con
writeHeader(false),
readHeader(false),
haveOutput(false),
- state(DISCONNECTED)
+ state(DISCONNECTED),
+ codecSwitch(*this)
{
if (pn_transport_bind(engine, connection)) {
//error
@@ -149,7 +150,14 @@ void ConnectionContext::close()
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ //wait for outstanding sends to settle
+ while (!i->second->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait();//wait until message has been confirmed
+ }
+
+
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
@@ -181,6 +189,7 @@ bool ConnectionContext::fetch(boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (lnk->capacity) {
pn_link_flow(lnk->receiver, 1);//TODO: is this the right approach?
+ wakeupDriver();
}
return true;
} else {
@@ -188,12 +197,24 @@ bool ConnectionContext::fetch(boost::sha
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_drain(lnk->receiver, 0);
wakeupDriver();
- while (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)) {
- QPID_LOG(notice, "Waiting for credit to be drained: " << (pn_link_credit((pn_link_t*) lnk->receiver) - pn_link_queued((pn_link_t*) lnk->receiver)));
+ while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
+ QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
wait();
}
+ if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
+ pn_link_flow(lnk->receiver, lnk->capacity);
+ }
+ }
+ if (get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE)) {
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (lnk->capacity) {
+ pn_link_flow(lnk->receiver, 1);
+ wakeupDriver();
+ }
+ return true;
+ } else {
+ return false;
}
- return get(ssn, lnk, message, qpid::messaging::Duration::IMMEDIATE);
}
}
@@ -322,6 +343,7 @@ void ConnectionContext::setCapacity(boos
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
receiver->setCapacity(capacity);
pn_link_flow((pn_link_t*) receiver->receiver, receiver->getCapacity());
+ wakeupDriver();
}
uint32_t ConnectionContext::getCapacity(boost::shared_ptr<ReceiverContext> receiver)
{
@@ -543,13 +565,48 @@ bool ConnectionContext::useSasl()
qpid::sys::Codec& ConnectionContext::getCodec()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (sasl.get()) {
- qpid::sys::Codec* c = sasl->getCodec();
- if (c) return *c;
- lock.notifyAll();
+ return codecSwitch;
+}
+
+ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
+std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ size_t decoded = 0;
+ if (parent.sasl.get() && !parent.sasl->authenticated()) {
+ decoded = parent.sasl->decode(buffer, size);
+ if (!parent.sasl->authenticated()) return decoded;
}
- return *this;
+ if (decoded < size) {
+ if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+ else decoded += parent.decode(buffer+decoded, size-decoded);
+ }
+ return decoded;
}
+std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ size_t encoded = 0;
+ if (parent.sasl.get() && parent.sasl->canEncode()) {
+ encoded += parent.sasl->encode(buffer, size);
+ if (!parent.sasl->authenticated()) return encoded;
+ }
+ if (encoded < size) {
+ if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+ else encoded += parent.encode(buffer+encoded, size-encoded);
+ }
+ return encoded;
+}
+bool ConnectionContext::CodecSwitch::canEncode()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ if (parent.sasl.get()) {
+ if (parent.sasl->canEncode()) return true;
+ else if (!parent.sasl->authenticated()) return false;
+ else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+ }
+ return parent.canEncode();
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Mon Dec 17 11:22:49 2012
@@ -123,6 +123,17 @@ class ConnectionContext : public qpid::s
CONNECTED
} state;
std::auto_ptr<Sasl> sasl;
+ class CodecSwitch : public qpid::sys::Codec
+ {
+ public:
+ CodecSwitch(ConnectionContext&);
+ std::size_t decode(const char* buffer, std::size_t size);
+ std::size_t encode(char* buffer, std::size_t size);
+ bool canEncode();
+ private:
+ ConnectionContext& parent;
+ };
+ CodecSwitch codecSwitch;
void wait();
void wakeupDriver();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Mon Dec 17 11:22:49 2012
@@ -120,18 +120,20 @@ void ReceiverContext::configure(pn_termi
helper.setNodeProperties(source);
}
- //filter:
- pn_data_t* filter = pn_terminus_filter(source);
- pn_data_put_map(filter);
- pn_data_enter(filter);
- pn_data_put_symbol(filter, convert("subject"));
- //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
- //pn_data_put_described(filter);
- //pn_data_enter(filter);
- //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
- pn_data_put_string(filter, convert(address.getSubject()));
- //pn_data_exit(filter);
- pn_data_exit(filter);
+ if (!address.getSubject().empty()) {
+ //filter:
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ pn_data_put_symbol(filter, convert("subject"));
+ //TODO: At present inserting described values into the map doesn't seem to work; correct this once resolved
+ //pn_data_put_described(filter);
+ //pn_data_enter(filter);
+ //pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
+ pn_data_put_string(filter, convert(address.getSubject()));
+ //pn_data_exit(filter);
+ pn_data_exit(filter);
+ }
}
bool ReceiverContext::isClosed() const
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.cpp Mon Dec 17 11:22:49 2012
@@ -58,7 +58,7 @@ std::size_t Sasl::encode(char* buffer, s
encoded += writeProtocolHeader(buffer, size);
writeHeader = !encoded;
}
- if (state == NONE && encoded < size) {
+ if (encoded < size) {
encoded += write(buffer + encoded, size - encoded);
}
haveOutput = (encoded == size);
@@ -135,14 +135,9 @@ void Sasl::outcome(uint8_t result)
context.activateOutput();
}
-qpid::sys::Codec* Sasl::getCodec()
+qpid::sys::Codec* Sasl::getSecurityLayer()
{
- switch (state) {
- case SUCCEEDED: return static_cast<qpid::sys::Codec*>(securityLayer.get());
- case FAILED: throw qpid::messaging::UnauthorizedAccess("Failed to authenticate");
- case NONE: return static_cast<qpid::sys::Codec*>(this);
- }
- return 0;
+ return securityLayer.get();
}
bool Sasl::authenticated()
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/Sasl.h Mon Dec 17 11:22:49 2012
@@ -47,7 +47,7 @@ class Sasl : public qpid::sys::Codec, qp
bool canEncode();
bool authenticated();
- qpid::sys::Codec* getCodec();
+ qpid::sys::Codec* getSecurityLayer();
std::string getAuthenticatedUsername();
private:
ConnectionContext& context;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Mon Dec 17 11:22:49 2012
@@ -80,7 +80,7 @@ const std::string& SenderContext::getTar
SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
{
- if (processUnsettled() < capacity) {
+ if (processUnsettled() < capacity && pn_link_credit(sender)) {
deliveries.push_back(Delivery(nextId++));
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address);
@@ -95,6 +95,7 @@ uint32_t SenderContext::processUnsettled
{
//remove accepted messages from front of deque
while (!deliveries.empty() && deliveries.front().accepted()) {
+ deliveries.front().settle();
deliveries.pop_front();
}
return deliveries.size();
@@ -336,7 +337,10 @@ bool SenderContext::Delivery::accepted()
{
return pn_delivery_remote_state(token) == PN_ACCEPTED;
}
-
+void SenderContext::Delivery::settle()
+{
+ pn_delivery_settle(token);
+}
void SenderContext::configure() const
{
configure(pn_link_target(sender));
@@ -350,4 +354,10 @@ void SenderContext::configure(pn_terminu
helper.setNodeProperties(target);
}
}
+
+bool SenderContext::settled()
+{
+ return processUnsettled() == 0;
+}
+
}}} // namespace qpid::messaging::amqp
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1422853&r1=1422852&r2=1422853&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Mon Dec 17 11:22:49 2012
@@ -53,6 +53,7 @@ class SenderContext
void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
void send(pn_link_t*);
bool accepted();
+ void settle();
private:
int32_t id;
pn_delivery_t* token;
@@ -69,6 +70,7 @@ class SenderContext
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
void configure() const;
+ bool settled();
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org