You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/10/22 00:05:07 UTC
svn commit: r1534394 [2/22] - in /qpid/branches/linearstore/qpid: ./ cpp/
cpp/bindings/qmf2/examples/python/ cpp/bindings/qmf2/python/
cpp/bindings/qpid/dotnet/ cpp/etc/ cpp/examples/ cpp/examples/messaging/
cpp/examples/qmf-agent/ cpp/include/qpid/ cp...
Propchange: qpid/branches/linearstore/qpid/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/agent.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/agent.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/agent.py (original)
+++ qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/agent.py Mon Oct 21 22:04:51 2013
@@ -19,7 +19,7 @@
# under the License.
#
-import cqpid
+import qpid_messaging
from qmf2 import *
@@ -34,7 +34,7 @@ class ExampleAgent(AgentHandler):
##
## Create and open a messaging connection to a broker.
##
- self.connection = cqpid.Connection(url, "{reconnect:True}")
+ self.connection = qpid_messaging.Connection(url, "{reconnect:True}")
self.session = None
self.connection.open()
Modified: qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/find_agents.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/find_agents.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/find_agents.py (original)
+++ qpid/branches/linearstore/qpid/cpp/bindings/qmf2/examples/python/find_agents.py Mon Oct 21 22:04:51 2013
@@ -17,7 +17,7 @@
# under the License.
#
-import cqpid
+import qpid_messaging
import qmf2
class FindAgents(qmf2.ConsoleHandler):
@@ -45,7 +45,7 @@ class FindAgents(qmf2.ConsoleHandler):
url = "localhost"
options = ""
-connection = cqpid.Connection(url, options)
+connection = qpid_messaging.Connection(url, options)
connection.open()
session = qmf2.ConsoleSession(connection)
Modified: qpid/branches/linearstore/qpid/cpp/bindings/qmf2/python/qmf2.py
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/bindings/qmf2/python/qmf2.py?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/bindings/qmf2/python/qmf2.py (original)
+++ qpid/branches/linearstore/qpid/cpp/bindings/qmf2/python/qmf2.py Mon Oct 21 22:04:51 2013
@@ -18,7 +18,7 @@
#
import cqmf2
-import cqpid
+import qpid_messaging
from threading import Thread
import time
@@ -124,7 +124,7 @@ class AgentHandler(Thread):
def run(self):
event = cqmf2.AgentEvent()
while self.__running:
- valid = self.__agent._impl.nextEvent(event, cqpid.Duration.SECOND)
+ valid = self.__agent._impl.nextEvent(event, qpid_messaging.Duration.SECOND)
if valid and self.__running:
if event.getType() == cqmf2.AGENT_METHOD:
self.method(event, event.getMethodName(), event.getArguments(), event.getArgumentSubtypes(),
@@ -156,7 +156,7 @@ class ConsoleHandler(Thread):
def run(self):
event = cqmf2.ConsoleEvent()
while self.__running:
- valid = self.__session._impl.nextEvent(event, cqpid.Duration.SECOND)
+ valid = self.__session._impl.nextEvent(event, qpid_messaging.Duration.SECOND)
if valid and self.__running:
if event.getType() == cqmf2.CONSOLE_AGENT_ADD:
self.agentAdded(Agent(event.getAgent()))
@@ -434,7 +434,7 @@ class Agent(object):
q_arg = q._impl
else:
q_arg = q
- dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ dur = qpid_messaging.Duration(qpid_messaging.Duration.SECOND.getMilliseconds() * timeout)
result = self._impl.query(q_arg, dur)
if result.getType() == cqmf2.CONSOLE_EXCEPTION:
raise Exception(Data(result.getData(0)))
@@ -449,7 +449,7 @@ class Agent(object):
def loadSchemaInfo(self, timeout=30):
"""
"""
- dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ dur = qpid_messaging.Duration(qpid_messaging.Duration.SECOND.getMilliseconds() * timeout)
self._impl.querySchema(dur)
def getPackages(self):
@@ -473,7 +473,7 @@ class Agent(object):
def getSchema(self, schemaId, timeout=30):
"""
"""
- dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ dur = qpid_messaging.Duration(qpid_messaging.Duration.SECOND.getMilliseconds() * timeout)
return Schema(self._impl.getSchema(schemaId._impl, dur))
## TODO: Async query
@@ -555,7 +555,7 @@ class Data(object):
return Agent(self._impl.getAgent())
def update(self, timeout=5):
- dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ dur = qpid_messaging.Duration(qpid_messaging.Duration.SECOND.getMilliseconds() * timeout)
agent = self._impl.getAgent()
query = cqmf2.Query(self._impl.getAddr())
result = agent.query(query, dur)
@@ -590,7 +590,7 @@ class Data(object):
timeout = 30
if '_timeout' in kwargs:
timeout = kwargs['_timeout']
- dur = cqpid.Duration(cqpid.Duration.SECOND.getMilliseconds() * timeout)
+ dur = qpid_messaging.Duration(qpid_messaging.Duration.SECOND.getMilliseconds() * timeout)
##
## Get the list of arguments from the schema, isolate those that are IN or IN_OUT,
Modified: qpid/branches/linearstore/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 (original)
+++ qpid/branches/linearstore/qpid/cpp/bindings/qpid/dotnet/configure-windows.ps1 Mon Oct 21 22:04:51 2013
@@ -153,6 +153,8 @@ $global:vsVersion = ''
$global:cmakeGenerator = ''
$global:vsSubdir = ''
$global:cmakeCompiler = ''
+$global:cmakeCommandLine32 = ''
+$global:cmakeCommandLine64 = ''
#############################
# Select-Folder
@@ -325,7 +327,8 @@ function WriteDotnetBindingEnvSetupBat
[string] $nBits,
[string] $outfileName,
[string] $studioVersion,
- [string] $studioSubdir
+ [string] $studioSubdir,
+ [string] $cmakeLine
)
$out = @("@ECHO OFF
@@ -337,6 +340,8 @@ REM
REM > call $outfileName
REM >
REM
+REM The solution was generated with cmake command line:
+REM $cmakeLine
ECHO %PATH% | FINDSTR /I boost > NUL
IF %ERRORLEVEL% EQU 0 ECHO WARNING: Boost is defined in your path multiple times!
SET PATH=$boostRoot\lib;%PATH%
@@ -387,7 +392,7 @@ function SelectVisualStudioVersion {
$Form.width = 350
$Form.height = 150
- $Form.Text = ”Select Visual Studio Version”
+ $Form.Text = "Select Visual Studio Version"
$DropDown = new-object System.Windows.Forms.ComboBox
$DropDown.Location = new-object System.Drawing.Size(120,10)
@@ -502,10 +507,11 @@ if ($defined64) {
# 32-bit X86
#
if ($make32) {
- $env:BOOST_ROOT = "$boost32"
cd "$build32"
Write-Host "Running 32-bit CMake in $build32 ..."
- CMake -G "$global:cmakeGenerator" "-DGEN_DOXYGEN=No" "-DCMAKE_INSTALL_PREFIX=install_x86" "-DBoost_COMPILER=$global:cmakeCompiler" $cppDir
+ $global:cmakeCommandLine32 = "CMake -G ""$global:cmakeGenerator"" ""-DGEN_DOXYGEN=No"" ""-DCMAKE_INSTALL_PREFIX=install_x86"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost32"" $cppDir"
+ Write-Host "$global:cmakeCommadLine32"
+ CMake -G "$global:cmakeGenerator" "-DGEN_DOXYGEN=No" "-DCMAKE_INSTALL_PREFIX=install_x86" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost32" $cppDir
} else {
Write-Host "Skipped 32-bit CMake."
}
@@ -514,10 +520,11 @@ if ($make32) {
# 64-bit X64
#
if ($make64) {
- $env:BOOST_ROOT = "$boost64"
cd "$build64"
Write-Host "Running 64-bit CMake in $build64"
- CMake -G "$global:cmakeGenerator Win64" "-DGEN_DOXYGEN=No" "-DCMAKE_INSTALL_PREFIX=install_x64" "-DBoost_COMPILER=$global:cmakeCompiler" $cppDir
+ $global:cmakeCommandLine64 = "CMake -G ""$global:cmakeGenerator Win64"" ""-DGEN_DOXYGEN=No"" ""-DCMAKE_INSTALL_PREFIX=install_x64"" ""-DBoost_COMPILER=$global:cmakeCompiler"" ""-DBOOST_ROOT=$boost64"" $cppDir"
+ Write-Host "$global:cmakeCommadLine64"
+ CMake -G "$global:cmakeGenerator Win64" "-DGEN_DOXYGEN=No" "-DCMAKE_INSTALL_PREFIX=install_x64" "-DBoost_COMPILER=$global:cmakeCompiler" "-DBOOST_ROOT=$boost64" $cppDir
} else {
Write-Host "Skipped 64-bit CMake."
}
@@ -569,7 +576,8 @@ if ($defined32) {
-nBits "32" `
-outfileName "setenv-messaging-$global:vsSubdir-x86-32bit.bat" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -cmakeLine "$global:cmakeCommandLine32"
} else {
Write-Host "Skipped writing 32-bit scripts."
@@ -620,7 +628,8 @@ if ($defined64) {
-nBits "64" `
-outfileName "setenv-messaging-$global:vsSubdir-x64-64bit.bat" `
-studioVersion "$global:vsVersion" `
- -studioSubdir "$global:vsSubdir"
+ -studioSubdir "$global:vsSubdir" `
+ -cmakeLine "$global:cmakeCommandLine64"
} else {
Write-Host "Skipped writing 64-bit scripts."
Modified: qpid/branches/linearstore/qpid/cpp/bld-winsdk.ps1
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/bld-winsdk.ps1?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/bld-winsdk.ps1 (original)
+++ qpid/branches/linearstore/qpid/cpp/bld-winsdk.ps1 Mon Oct 21 22:04:51 2013
@@ -356,7 +356,7 @@ if ($args.length -lt 3) {
exit
}
-$qpid_src = "qpid"
+$qpid_src = Split-Path -leaf $global:sourceDirectory
$boostRoot = $args[0]
$ver = $args[1]
$generator = ""
Modified: qpid/branches/linearstore/qpid/cpp/etc/qpidd.in
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/etc/qpidd.in?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/etc/qpidd.in (original)
+++ qpid/branches/linearstore/qpid/cpp/etc/qpidd.in Mon Oct 21 22:04:51 2013
@@ -63,8 +63,9 @@ if [ $RETVAL = 4 ]; then
fi
start() {
+ [[ $QPID_DATA_DIR ]] || QPID_DATA_DIR=/var/lib/qpidd
echo -n $"Starting Qpid AMQP daemon: "
- daemon --pidfile $pidfile --check $prog --user qpidd @sbindir@/$prog ${QPID_DATA_DIR:+--data-dir $QPID_DATA_DIR} --daemon $QPIDD_OPTIONS
+ daemon --pidfile $pidfile --check $prog --user qpidd @sbindir@/$prog --data-dir $QPID_DATA_DIR --daemon $QPIDD_OPTIONS
RETVAL=$?
echo
[ $RETVAL = 0 ] && touch $lockfile
Modified: qpid/branches/linearstore/qpid/cpp/examples/README.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/examples/README.txt?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/examples/README.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/examples/README.txt Mon Oct 21 22:04:51 2013
@@ -221,9 +221,4 @@ On Windows:
C:\Program Files\qpidc-0.7\examples\request-response> server
C:\Program Files\qpidc-0.7\examples\request-response> client
-== qmf-console ==
-
-This directory contains examples which demonstrate integration with
-the Qpid Management Framework (QMF). Refer to the README.txt file
-within the directory for more information.
Modified: qpid/branches/linearstore/qpid/cpp/examples/examples.sln
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/examples/examples.sln?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/examples/examples.sln (original)
+++ qpid/branches/linearstore/qpid/cpp/examples/examples.sln Mon Oct 21 22:04:51 2013
@@ -32,14 +32,6 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C9
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "messaging_spout", "messaging\messaging_spout.vcproj", "{D3115AC9-91C4-4D79-BCAC-DE837C70F1EA}"
EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "qmf_console_console", "qmf-console\qmf-console_console.vcproj", "{490473E1-FECA-1BAD-2E13-3FFA2B8669C3}"
-EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "qmf_console_ping", "qmf-console\qmf-console_ping.vcproj", "{C1FFDE95-3442-49AE-9985-7EEE3D45B4A3}"
-EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "qmf_console_printevents", "qmf-console\qmf-console_printevents.vcproj", "{72C74624-FECA-1BAD-2E13-3FFA2B8669C3}"
-EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "qmf_console_queuestats", "qmf-console\qmf-console_queuestats.vcproj", "{B21825EA-FECA-1BAD-2E13-3FFA2B8669C3}"
-EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
@@ -96,22 +88,6 @@ Global
{D3115AC9-91C4-4D79-BCAC-DE837C70F1EA}.Release|Win32.Build.0 = Release|Win32
{D3115AC9-91C4-4D79-BCAC-DE837C70F1EA}.Release|x64.ActiveCfg = Release|x64
{D3115AC9-91C4-4D79-BCAC-DE837C70F1EA}.Release|x64.Build.0 = Release|x64
- {490473E1-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.ActiveCfg = Debug|Win32
- {490473E1-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.Build.0 = Debug|Win32
- {490473E1-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.ActiveCfg = Release|Win32
- {490473E1-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.Build.0 = Release|Win32
- {C1FFDE95-3442-49AE-9985-7EEE3D45B4A3}.Debug|Win32.ActiveCfg = Debug|Win32
- {C1FFDE95-3442-49AE-9985-7EEE3D45B4A3}.Debug|Win32.Build.0 = Debug|Win32
- {C1FFDE95-3442-49AE-9985-7EEE3D45B4A3}.Release|Win32.ActiveCfg = Release|Win32
- {C1FFDE95-3442-49AE-9985-7EEE3D45B4A3}.Release|Win32.Build.0 = Release|Win32
- {72C74624-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.ActiveCfg = Debug|Win32
- {72C74624-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.Build.0 = Debug|Win32
- {72C74624-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.ActiveCfg = Release|Win32
- {72C74624-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.Build.0 = Release|Win32
- {B21825EA-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.ActiveCfg = Debug|Win32
- {B21825EA-FECA-1BAD-2E13-3FFA2B8669C3}.Debug|Win32.Build.0 = Debug|Win32
- {B21825EA-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.ActiveCfg = Release|Win32
- {B21825EA-FECA-1BAD-2E13-3FFA2B8669C3}.Release|Win32.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Modified: qpid/branches/linearstore/qpid/cpp/examples/messaging/hello_world.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/examples/messaging/hello_world.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/examples/messaging/hello_world.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/examples/messaging/hello_world.cpp Mon Oct 21 22:04:51 2013
@@ -33,9 +33,9 @@ int main(int argc, char** argv) {
std::string broker = argc > 1 ? argv[1] : "localhost:5672";
std::string address = argc > 2 ? argv[2] : "amq.topic";
std::string connectionOptions = argc > 3 ? argv[3] : "";
-
- Connection connection(broker, connectionOptions);
+
try {
+ Connection connection(broker, connectionOptions);
connection.open();
Session session = connection.createSession();
@@ -47,12 +47,11 @@ int main(int argc, char** argv) {
Message message = receiver.fetch(Duration::SECOND * 1);
std::cout << message.getContent() << std::endl;
session.acknowledge();
-
+
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
- connection.close();
- return 1;
+ return 1;
}
}
Modified: qpid/branches/linearstore/qpid/cpp/include/qpid/messaging/Handle.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/include/qpid/messaging/Handle.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/include/qpid/messaging/Handle.h (original)
+++ qpid/branches/linearstore/qpid/cpp/include/qpid/messaging/Handle.h Mon Oct 21 22:04:51 2013
@@ -53,14 +53,15 @@ template <class T> class Handle {
void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
+ private:
+ // Not implemented, subclasses must implement.
+ Handle(const Handle&);
+ Handle& operator=(const Handle&);
+
protected:
typedef T Impl;
QPID_MESSAGING_INLINE_EXTERN Handle() :impl() {}
- // Not implemented,subclasses must implement.
- QPID_MESSAGING_EXTERN Handle(const Handle&);
- QPID_MESSAGING_EXTERN Handle& operator=(const Handle&);
-
Impl* impl;
friend class PrivateImplRef<T>;
Modified: qpid/branches/linearstore/qpid/cpp/include/qpid/swig_perl_typemaps.i
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/include/qpid/swig_perl_typemaps.i?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/include/qpid/swig_perl_typemaps.i (original)
+++ qpid/branches/linearstore/qpid/cpp/include/qpid/swig_perl_typemaps.i Mon Oct 21 22:04:51 2013
@@ -192,8 +192,10 @@
}
%typemap (out) uint8_t, uint16_t, uint32_t, uint64_t {
- sv_setuv($result, (UV)$1);
- argvi++;
+ SV* tmp = sv_newmortal();
+ sv_setuv(tmp, (UV)$1);
+ $result = tmp;
+ argvi++;
}
%typemap (in) int8_t, int16_t, int32_t, int64_t {
@@ -206,8 +208,10 @@
}
%typemap (out) int8_t, int16_t, int32_t, int64_t {
- sv_setiv($result, (IV)$1);
- argvi++;
+ SV* tmp = sv_newmortal();
+ sv_setiv(tmp, (IV)$1);
+ $result = tmp;
+ argvi++;
}
%typemap(in) bool {
Propchange: qpid/branches/linearstore/qpid/cpp/src/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt Mon Oct 21 22:04:51 2013
@@ -299,14 +299,14 @@ if (CMAKE_COMPILER_IS_GNUCXX)
if (CMAKE_SYSTEM_NAME STREQUAL SunOS)
set (CATCH_UNDEFINED "")
endif (CMAKE_SYSTEM_NAME STREQUAL SunOS)
- set (COMPILER_FLAGS "-fvisibility-inlines-hidden -Wl,--as-needed")
+ set (COMPILER_FLAGS "-fvisibility-inlines-hidden")
# gcc 4.1.2 on RHEL 5 needs -Wno-attributes to avoid an error that's fixed
# in later gcc versions.
execute_process(COMMAND ${CMAKE_CXX_COMPILER} -dumpversion
OUTPUT_VARIABLE GCC_VERSION)
if (GCC_VERSION VERSION_EQUAL 4.1.2)
- set (COMPILER_FLAGS "-Wl,--as-needed")
- message (STATUS "Cannot use -fvisibility=hidden on gcc 4.1.2")
+ message (STATUS "Cannot restrict library symbol export on gcc 4.1.2")
+ set (HIDE_SYMBOL_FLAGS "-fno-visibility-inlines-hidden")
else (GCC_VERSION VERSION_EQUAL 4.1.2)
set (HIDE_SYMBOL_FLAGS "-fno-visibility-inlines-hidden -fvisibility=hidden")
endif (GCC_VERSION VERSION_EQUAL 4.1.2)
@@ -890,10 +890,6 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
windows/SCM.cpp
)
- set (qpidmessaging_platform_SOURCES
- qpid/messaging/HandleInstantiator.cpp
- )
-
else (CMAKE_SYSTEM_NAME STREQUAL Windows)
# POSIX (Non-Windows) platforms have a lot of overlap in sources; the only
@@ -991,9 +987,6 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows
set (qpidd_platform_SOURCES
posix/QpiddBroker.cpp
)
-
- set (qpidmessaging_platform_SOURCES
- )
endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
set (qpidcommon_SOURCES
@@ -1203,8 +1196,8 @@ install (DIRECTORY ../include/qpid
PATTERN ".svn" EXCLUDE)
install_pdb (qpidclient ${QPID_COMPONENT_CLIENT})
-set (qpidmessaging_SOURCES_hidden
- qpid/messaging/AddressParser.h
+set (qpidmessaging_SOURCES
+ ${amqpc_SOURCES}
qpid/messaging/AddressImpl.h
qpid/messaging/ConnectionImpl.h
qpid/messaging/ReceiverImpl.h
@@ -1228,16 +1221,8 @@ set (qpidmessaging_SOURCES_hidden
qpid/client/amqp0_10/SessionImpl.cpp
qpid/client/amqp0_10/SenderImpl.h
qpid/client/amqp0_10/SenderImpl.cpp
-)
-set_source_files_properties(
- ${qpidmessaging_SOURCES_hidden}
- PROPERTIES
- COMPILE_FLAGS "${HIDE_SYMBOL_FLAGS}")
-
-set (qpidmessaging_SOURCES
- ${qpidmessaging_platform_SOURCES}
- ${qpidmessaging_SOURCES_hidden}
qpid/messaging/Address.cpp
+ qpid/messaging/AddressParser.h
qpid/messaging/AddressParser.cpp # The functions in here are not in the public interface, but qmf uses them
qpid/messaging/Connection.cpp
qpid/messaging/Duration.cpp
@@ -1256,17 +1241,14 @@ set (qpidmessaging_SOURCES
qpid/messaging/amqp/EncodedMessage.h
qpid/messaging/amqp/EncodedMessage.cpp
)
-set_source_files_properties(
- ${qpidmessaging_SOURCES}
- PROPERTIES
- COMPILE_FLAGS "${HIDE_SYMBOL_FLAGS}")
add_msvc_version (qpidmessaging library dll)
add_library (qpidmessaging SHARED ${qpidmessaging_SOURCES})
-target_link_libraries (qpidmessaging qpidtypes qpidclient qpidcommon)
+target_link_libraries (qpidmessaging qpidtypes qpidclient qpidcommon ${PROTON_LIBRARIES})
set_target_properties (qpidmessaging PROPERTIES
LINK_FLAGS "${HIDE_SYMBOL_FLAGS} ${LINK_VERSION_SCRIPT_FLAG}"
+ COMPILE_FLAGS "${HIDE_SYMBOL_FLAGS}"
VERSION ${qpidmessaging_version}
SOVERSION ${qpidmessaging_version_major})
install (TARGETS qpidmessaging
@@ -1533,6 +1515,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DI
${CMAKE_CURRENT_BINARY_DIR}/config.h)
add_subdirectory(qpid/store)
add_subdirectory(tests)
+add_subdirectory(tests/legacystore)
# Support for pkg-config
Propchange: qpid/branches/linearstore/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/CMakeLists.txt:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/src/amqp.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/amqp.cmake?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/amqp.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/amqp.cmake Mon Oct 21 22:04:51 2013
@@ -163,14 +163,22 @@ if (BUILD_AMQP)
qpid/messaging/amqp/TcpTransport.h
qpid/messaging/amqp/TcpTransport.cpp
)
- add_library (amqpc MODULE ${amqpc_SOURCES})
- target_link_libraries (amqpc qpidmessaging qpidtypes qpidclient qpidcommon ${PROTON_LIBRARIES})
- set_target_properties (amqpc PROPERTIES
- PREFIX ""
- LINK_FLAGS "${CATCH_UNDEFINED}")
- install (TARGETS amqpc
- DESTINATION ${QPIDC_MODULE_DIR}
- COMPONENT ${QPID_COMPONENT_CLIENT})
+ if (WIN32)
+ set(proton_dll "${PROTON_LIBRARY_DIRS}/${PROTON_LIBRARIES}.dll")
+ set(proton_dlld "${PROTON_LIBRARY_DIRS}/${PROTON_LIBRARIES}d.dll")
+ install (PROGRAMS ${proton_dll}
+ DESTINATION ${QPID_INSTALL_LIBDIR}
+ COMPONENT ${QPID_COMPONENT_COMMON}
+ CONFIGURATIONS Release|MinSizeRel|RelWithDebInfo)
+ install (PROGRAMS ${proton_dlld}
+ DESTINATION ${QPID_INSTALL_LIBDIR}
+ COMPONENT ${QPID_COMPONENT_COMMON}
+ CONFIGURATIONS Debug)
+ endif (WIN32)
+else (BUILD_AMQP)
+ # ensure that qpid build ignores proton
+ UNSET( amqpc_SOURCES )
+ UNSET( PROTON_LIBRARIES )
endif (BUILD_AMQP)
Modified: qpid/branches/linearstore/qpid/cpp/src/legacystore.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/legacystore.cmake?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/legacystore.cmake (original)
+++ qpid/branches/linearstore/qpid/cpp/src/legacystore.cmake Mon Oct 21 22:04:51 2013
@@ -146,12 +146,34 @@ if (BUILD_LEGACYSTORE)
)
target_link_libraries (legacystore
+ ${clock_gettime_LIB}
aio
uuid
qpidcommon qpidtypes qpidbroker
${DB_LIBRARY}
)
+ # For use in the store tests only
+ add_library (legacystore_shared SHARED
+ ${legacy_jrnl_SOURCES}
+ ${legacy_store_SOURCES}
+ ${legacy_qmf_SOURCES}
+ )
+
+ set_target_properties (legacystore_shared PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ INCLUDE_DIRECTORIES "${legacy_include_DIRECTORIES}"
+ )
+
+ target_link_libraries (legacystore_shared
+ ${clock_gettime_LIB}
+ aio
+ uuid
+ qpidcommon qpidtypes qpidbroker
+ ${Boost_PROGRAM_OPTIONS_LIBRARY}
+ ${DB_LIBRARY}
+ )
+
install(TARGETS legacystore
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp/descriptors.h Mon Oct 21 22:04:51 2013
@@ -33,7 +33,7 @@ const std::string DELIVERY_ANNOTATIONS_S
const std::string MESSAGE_ANNOTATIONS_SYMBOL("amqp:message-annotations:map");
const std::string APPLICATION_PROPERTIES_SYMBOL("amqp:application-properties:map");
const std::string AMQP_SEQUENCE_SYMBOL("amqp:amqp-sequence:list");
-const std::string AMQP_VALUE_SYMBOL("amqp:amqp-sequence:*");
+const std::string AMQP_VALUE_SYMBOL("amqp:amqp-value:*");
const std::string DATA_SYMBOL("amqp:data:binary");
const std::string FOOTER_SYMBOL("amqp:footer:map");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp Mon Oct 21 22:04:51 2013
@@ -575,6 +575,11 @@ void translate(const boost::shared_ptr<F
to = toVariant(from);
}
+boost::shared_ptr<framing::FieldValue> translate(const types::Variant& from)
+{
+ return toFieldValue(from);
+}
+
const std::string ListCodec::contentType("amqp/list");
const std::string MapCodec::contentType("amqp/map");
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/amqp_0_10/Codecs.h Mon Oct 21 22:04:51 2013
@@ -80,6 +80,7 @@ QPID_COMMON_EXTERN void translate(const
qpid::types::Variant& to);
QPID_COMMON_EXTERN void translate(const types::Variant& from,
boost::shared_ptr<qpid::framing::FieldValue> to);
+QPID_COMMON_EXTERN boost::shared_ptr<qpid::framing::FieldValue> translate(const types::Variant& from);
}} // namespace qpid::amqp_0_10
Propchange: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1525057-1534385
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 21 22:04:51 2013
@@ -355,6 +355,11 @@ Broker::Broker(const Broker::Options& co
//recover any objects via object factories
objects.restore(*this);
+ // Assign to queues their users who created them (can be done after ACL is loaded in Plugin::initializeAll above
+ if ((getAcl()) && (store.get())) {
+ queues.eachQueue(boost::bind(&qpid::broker::Queue::updateAclUserQueueCount, _1));
+ }
+
if(conf.enableMgmt) {
if (getAcl()) {
mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal());
@@ -1289,8 +1294,9 @@ std::pair<boost::shared_ptr<Queue>, bool
if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
- if (!acl->approveCreateQueue(userId,name) )
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
+ if (!queues.find(name))
+ if (!acl->approveCreateQueue(userId,name) )
+ throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId));
}
Exchange::shared_ptr alternate;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 21 22:04:51 2013
@@ -135,7 +135,7 @@ void Exchange::doRoute(Deliverable& msg,
if (mgmtExchange != 0)
{
qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = mgmtExchange->getStatistics();
- uint64_t contentSize = msg.getMessage().getContentSize();
+ uint64_t contentSize = msg.getMessage().getMessageSize();
eStats->msgReceives += 1;
eStats->byteReceives += contentSize;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.cpp Mon Oct 21 22:04:51 2013
@@ -71,9 +71,9 @@ bool Message::isPersistent() const
return getEncoding().isPersistent();
}
-uint64_t Message::getContentSize() const
+uint64_t Message::getMessageSize() const
{
- return getEncoding().getContentSize();
+ return getEncoding().getMessageSize();
}
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Message.h Mon Oct 21 22:04:51 2013
@@ -68,7 +68,7 @@ public:
virtual std::string getRoutingKey() const = 0;
virtual bool isPersistent() const = 0;
virtual uint8_t getPriority() const = 0;
- virtual uint64_t getContentSize() const = 0;
+ virtual uint64_t getMessageSize() const = 0;
virtual qpid::amqp::MessageId getMessageId() const = 0;
virtual qpid::amqp::MessageId getCorrelationId() const = 0;
virtual std::string getPropertyAsString(const std::string& key) const = 0;
@@ -83,7 +83,7 @@ public:
QPID_BROKER_EXTERN Message();
QPID_BROKER_EXTERN ~Message();
- bool isRedelivered() const { return deliveryCount; }
+ bool isRedelivered() const { return deliveryCount > 0; }
void deliver() { ++deliveryCount; }
void undeliver() { --deliveryCount; }
int getDeliveryCount() const { return deliveryCount; }
@@ -119,7 +119,7 @@ public:
QPID_BROKER_EXTERN qpid::types::Variant getProperty(const std::string& key) const;
void processProperties(qpid::amqp::MapHandler&) const;
- QPID_BROKER_EXTERN uint64_t getContentSize() const;
+ QPID_BROKER_EXTERN uint64_t getMessageSize() const;
QPID_BROKER_EXTERN Encoding& getEncoding();
QPID_BROKER_EXTERN const Encoding& getEncoding() const;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 21 22:04:51 2013
@@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message&
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
@@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message&
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
@@ -131,7 +131,15 @@ inline void mgntDeqStats(const Message&
QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
{
QueueSettings settings(inputs);
- if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
+ settings.maxDepth = QueueDepth();
+ if (inputs.maxDepth.hasCount() && inputs.maxDepth.getCount()) {
+ settings.maxDepth.setCount(inputs.maxDepth.getCount());
+ }
+ if (inputs.maxDepth.hasSize()) {
+ if (inputs.maxDepth.getSize()) {
+ settings.maxDepth.setSize(inputs.maxDepth.getSize());
+ }
+ } else if (globalOptions.queueLimit) {
settings.maxDepth.setSize(globalOptions.queueLimit);
}
return settings;
@@ -194,7 +202,7 @@ Queue::Queue(const string& _name, const
redirectSource(false)
{
current.setCount(0);//always track depth in messages
- if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
+ if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -297,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuf
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- current += QueueDepth(1, msg.getContentSize());
+ current += QueueDepth(1, msg.getMessageSize());
}
void Queue::recover(Message& msg)
@@ -311,7 +319,7 @@ void Queue::process(Message& msg)
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.getContentSize();
+ const uint64_t contentSize = msg.getMessageSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -853,7 +861,7 @@ bool Queue::enqueue(TransactionContext*
{
Mutex::ScopedLock locker(messageLock);
- if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) {
return false;
}
}
@@ -883,7 +891,7 @@ void Queue::enqueueAborted(const Message
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
}
void Queue::enqueueCommited(Message& msg)
@@ -911,7 +919,7 @@ void Queue::dequeueCommited(const Messag
observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
+ mgmtObject->inc_byteTxnDequeues(msg.getMessageSize());
}
}
@@ -954,7 +962,7 @@ void Queue::dequeueCommitted(const Queue
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
- const uint64_t contentSize = msg->getContentSize();
+ const uint64_t contentSize = msg->getMessageSize();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
@@ -978,7 +986,7 @@ void Queue::dequeueCommitted(const Queue
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1182,18 +1190,27 @@ void Queue::encode(Buffer& buffer) const
buffer.putShortString(name);
buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
+ buffer.putShortString(userId);
}
uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ + userId.size() + 1 /* short string */
+ encodableSettings.encodedSize();
}
+void Queue::updateAclUserQueueCount()
+{
+ if (broker->getAcl())
+ broker->getAcl()->approveCreateQueue(userId, name);
+}
+
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
+ string _userId;
buffer.getShortString(name);
FieldTable ft;
buffer.get(ft);
@@ -1207,6 +1224,12 @@ Queue::shared_ptr Queue::restore( QueueR
result.first->alternateExchangeName.assign(altExch);
}
+ //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized
+ if (buffer.available()) {
+ buffer.getShortString(_userId);
+ result.first->setOwningUser(_userId);
+ }
+
return result.first;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 21 22:04:51 2013
@@ -204,6 +204,7 @@ class Queue : public boost::enable_share
QueueDepth current;
QueueBindings bindings;
std::string alternateExchangeName;
+ std::string userId; // queue owner for ACL quota purposes
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject;
@@ -384,6 +385,10 @@ class Queue : public boost::enable_share
/** Get the message at position pos, returns true if found and sets msg */
QPID_BROKER_EXTERN bool find(framing::SequenceNumber pos, Message& msg ) const;
+ // Remember the queue's owner so acl quotas can be restored after restart
+ void setOwningUser(std::string& _userId) { userId = _userId; }
+ void updateAclUserQueueCount();
+
QPID_BROKER_EXTERN void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
QPID_BROKER_EXTERN boost::shared_ptr<Exchange> getAlternateExchange();
QPID_BROKER_EXTERN bool isLocal(const Message& msg);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Mon Oct 21 22:04:51 2013
@@ -112,7 +112,7 @@ void QueueFlowLimit::enqueued(const Mess
sys::Mutex::ScopedLock l(indexLock);
++count;
- size += msg.getContentSize();
+ size += msg.getMessageSize();
if (!flowStopped) {
if (flowStopCount && count > flowStopCount) {
@@ -150,7 +150,7 @@ void QueueFlowLimit::dequeued(const Mess
throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName));
}
- uint64_t _size = msg.getContentSize();
+ uint64_t _size = msg.getMessageSize();
if (_size <= size) {
size -= _size;
} else {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Oct 21 22:04:51 2013
@@ -64,6 +64,8 @@ QueueRegistry::declare(const string& nam
//Move this to factory also?
if (alternate)
queue->setAlternateExchange(alternate);//need to do this *before* create
+ queue->setOwningUser(userId);
+
if (!recovering) {
//create persistent record if required
queue->create();
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.cpp Mon Oct 21 22:04:51 2013
@@ -112,10 +112,10 @@ QueueSettings::QueueSettings(bool d, boo
bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
{
- if (key == MAX_COUNT && value.asUint32() > 0) {
+ if (key == MAX_COUNT) {
maxDepth.setCount(value);
return true;
- } else if (key == MAX_SIZE && value.asUint64() > 0) {
+ } else if (key == MAX_SIZE) {
maxDepth.setSize(value);
return true;
} else if (key == POLICY_TYPE) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/QueueSettings.h Mon Oct 21 22:04:51 2013
@@ -110,7 +110,7 @@ struct QueueSettings
void validate() const;
QPID_BROKER_EXTERN void populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused);
QPID_BROKER_EXTERN void populate(const qpid::framing::FieldTable& inputs, qpid::framing::FieldTable& unused);
- std::map<std::string, qpid::types::Variant> asMap() const;
+ QPID_BROKER_EXTERN std::map<std::string, qpid::types::Variant> asMap() const;
struct Aliases : std::map<std::string, std::string>
{
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Mon Oct 21 22:04:51 2013
@@ -44,7 +44,7 @@ ThresholdAlerts::ThresholdAlerts(const s
void ThresholdAlerts::enqueued(const Message& m)
{
- size += m.getContentSize();
+ size += m.getMessageSize();
++count;
if (sizeGoingUp && sizeThreshold && size >= sizeThreshold) {
@@ -64,7 +64,7 @@ void ThresholdAlerts::enqueued(const Mes
void ThresholdAlerts::dequeued(const Message& m)
{
- size -= m.getContentSize();
+ size -= m.getMessageSize();
--count;
if (!sizeGoingUp && sizeThreshold && size <= sizeThresholdDown) {
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.cpp Mon Oct 21 22:04:51 2013
@@ -31,6 +31,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/Buffer.h"
#include <string.h>
+#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
@@ -79,8 +80,51 @@ uint8_t Message::getPriority() const
else return priority.get();
}
-std::string Message::getPropertyAsString(const std::string& /*key*/) const { return empty; }
-std::string Message::getAnnotationAsString(const std::string& /*key*/) const { return empty; }
+namespace {
+class StringRetriever : public MapHandler
+{
+ public:
+ StringRetriever(const std::string& k) : key(k) {}
+ void handleBool(const qpid::amqp::CharSequence& actualKey, bool actualValue) { process(actualKey, actualValue); }
+ void handleUint8(const qpid::amqp::CharSequence& actualKey, uint8_t actualValue) { process(actualKey, actualValue); }
+ void handleUint16(const qpid::amqp::CharSequence& actualKey, uint16_t actualValue) { process(actualKey, actualValue); }
+ void handleUint32(const qpid::amqp::CharSequence& actualKey, uint32_t actualValue) { process(actualKey, actualValue); }
+ void handleUint64(const qpid::amqp::CharSequence& actualKey, uint64_t actualValue) { process(actualKey, actualValue); }
+ void handleInt8(const qpid::amqp::CharSequence& actualKey, int8_t actualValue) { process(actualKey, actualValue); }
+ void handleInt16(const qpid::amqp::CharSequence& actualKey, int16_t actualValue) { process(actualKey, actualValue); }
+ void handleInt32(const qpid::amqp::CharSequence& actualKey, int32_t actualValue) { process(actualKey, actualValue); }
+ void handleInt64(const qpid::amqp::CharSequence& actualKey, int64_t actualValue) { process(actualKey, actualValue); }
+ void handleFloat(const qpid::amqp::CharSequence& actualKey, float actualValue) { process(actualKey, actualValue); }
+ void handleDouble(const qpid::amqp::CharSequence& actualKey, double actualValue) { process(actualKey, actualValue); }
+ void handleVoid(const qpid::amqp::CharSequence&) { /*nothing to do*/ }
+ void handleString(const qpid::amqp::CharSequence& actualKey, const qpid::amqp::CharSequence& actualValue, const qpid::amqp::CharSequence& /*encoding*/)
+ {
+ if (isRequestedKey(actualKey)) value = std::string(actualValue.data, actualValue.size);
+ }
+ std::string getValue() const { return value; }
+ private:
+ const std::string key;
+ std::string value;
+
+ template <typename T> void process(const qpid::amqp::CharSequence& actualKey, T actualValue)
+ {
+ if (isRequestedKey(actualKey)) value = boost::lexical_cast<std::string>(actualValue);
+ }
+
+ bool isRequestedKey(const qpid::amqp::CharSequence& actualKey)
+ {
+ //TODO: avoid allocating new string by just iterating over chars
+ return key == std::string(actualKey.data, actualKey.size);
+ }
+};
+}
+
+std::string Message::getPropertyAsString(const std::string& key) const
+{
+ StringRetriever sr(key);
+ processProperties(sr);
+ return sr.getValue();
+}
namespace {
class PropertyAdapter : public Reader {
@@ -135,19 +179,30 @@ namespace {
state(KEY)
{}
};
+
+void processMapData(const CharSequence& source, MapHandler& handler)
+{
+ qpid::amqp::Decoder d(source.data, source.size);
+ PropertyAdapter adapter(handler);
+ d.read(adapter);
+
+}
}
void Message::processProperties(MapHandler& mh) const {
- qpid::amqp::Decoder d(applicationProperties.data, applicationProperties.size);
- PropertyAdapter mha(mh);
- d.read(mha);
+ processMapData(applicationProperties, mh);
+}
+
+std::string Message::getAnnotationAsString(const std::string& key) const
+{
+ StringRetriever sr(key);
+ processMapData(messageAnnotations, sr);
+ if (sr.getValue().empty()) processMapData(deliveryAnnotations, sr);
+ return sr.getValue();
+
}
-//getContentSize() is primarily used in stats about the number of
-//bytes enqueued/dequeued etc, not sure whether this is the right name
-//and whether it should indeed only be the content that is thus
-//measured
-uint64_t Message::getContentSize() const { return data.size(); }
+uint64_t Message::getMessageSize() const { return data.size(); }
//getContent() is used primarily for decoding qmf messages in
//management and ha, but also by the xml exchange
std::string Message::getContent() const
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Message.h Mon Oct 21 22:04:51 2013
@@ -44,7 +44,7 @@ class Message : public qpid::broker::Mes
std::string getRoutingKey() const;
bool isPersistent() const;
uint8_t getPriority() const;
- uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
std::string getPropertyAsString(const std::string& key) const;
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp Mon Oct 21 22:04:51 2013
@@ -20,10 +20,13 @@
*/
#include "qpid/broker/amqp/NodeProperties.h"
#include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
#include "qpid/amqp/descriptors.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/types/Variant.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/log/Statement.h"
@@ -100,7 +103,7 @@ bool getLifetimeDescriptorSymbol(QueueSe
}
-NodeProperties::NodeProperties() : queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
+NodeProperties::NodeProperties() : received(false), queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
void NodeProperties::read(pn_data_t* data)
{
@@ -108,26 +111,92 @@ void NodeProperties::read(pn_data_t* dat
reader.read(data);
}
-void NodeProperties::write(pn_data_t* data)
+void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Queue> node)
{
- pn_data_put_map(data);
- pn_data_enter(data);
- pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
- pn_data_put_string(data, convert(queue ? MOVE : COPY));
- pn_bytes_t symbol;
- if (autoDelete && getLifetimeDescriptorSymbol(lifetime, symbol)) {
- pn_data_put_symbol(data, convert(LIFETIME_POLICY));
- pn_data_put_described(data);
+ if (received) {
+ pn_data_put_map(data);
pn_data_enter(data);
- pn_data_put_symbol(data, symbol);
- pn_data_put_list(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(MOVE));//TODO: should really add COPY as well, since queues can be browsed
+ pn_bytes_t symbol;
+ if (autoDelete && node->isAutoDelete() && getLifetimeDescriptorSymbol(node->getSettings().lifetime, symbol)) {
+ pn_data_put_symbol(data, convert(LIFETIME_POLICY));
+ pn_data_put_described(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, symbol);
+ pn_data_put_list(data);
+ pn_data_exit(data);
+ }
+ if (durable && node->isDurable()) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ if (exclusive && node->hasExclusiveOwner()) {
+ pn_data_put_symbol(data, convert(EXCLUSIVE));
+ pn_data_put_bool(data, true);
+ }
+ if (!alternateExchange.empty() && node->getAlternateExchange()) {
+ pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE));
+ pn_data_put_string(data, convert(node->getAlternateExchange()->getName()));
+ }
+
+ qpid::types::Variant::Map actual = node->getSettings().asMap();
+ qpid::types::Variant::Map unrecognised;
+ QueueSettings dummy;
+ dummy.populate(actual, unrecognised);
+ for (qpid::types::Variant::Map::const_iterator i = unrecognised.begin(); i != unrecognised.end(); ++i) {
+ actual.erase(i->first);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ qpid::types::Variant::Map::const_iterator j = actual.find(i->first);
+ if (j != actual.end()) {
+ pn_data_put_symbol(data, convert(j->first));
+ pn_data_put_string(data, convert(j->second.asString()));
+ }
+ }
+
pn_data_exit(data);
}
- pn_data_exit(data);
}
+namespace {
+const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence");
+const std::string QPID_IVE("qpid.ive");
+}
+void NodeProperties::write(pn_data_t* data, boost::shared_ptr<Exchange> node)
+{
+ if (received) {
+ pn_data_put_map(data);
+ pn_data_enter(data);
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(COPY));
+ if (durable && node->isDurable()) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ if (!exchangeType.empty()) {
+ pn_data_put_symbol(data, convert(EXCHANGE_TYPE));
+ pn_data_put_string(data, convert(node->getType()));
+ }
+ if (!alternateExchange.empty() && node->getAlternate()) {
+ pn_data_put_symbol(data, convert(ALTERNATE_EXCHANGE));
+ pn_data_put_string(data, convert(node->getAlternate()->getName()));
+ }
+
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ if ((i->first == QPID_MSG_SEQUENCE || i->first == QPID_IVE) && node->getArgs().isSet(i->first)) {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_bool(data, true);
+ }
+ }
+
+ pn_data_exit(data);
+ }
+}
+
void NodeProperties::process(const std::string& key, const qpid::types::Variant& value, const Descriptor* d)
{
+ received = true;
QPID_LOG(debug, "Processing node property " << key << " = " << value);
if (key == SUPPORTED_DIST_MODES) {
if (value == MOVE) queue = true;
@@ -248,6 +317,7 @@ QueueSettings NodeProperties::getQueueSe
qpid::types::Variant::Map unused;
settings.populate(properties, unused);
settings.lifetime = lifetime;
+ qpid::amqp_0_10::translate(unused, settings.storeSettings);
return settings;
}
@@ -277,4 +347,9 @@ bool NodeProperties::trackControllingLin
return lifetime == QueueSettings::DELETE_ON_CLOSE || lifetime == QueueSettings::DELETE_IF_EMPTY;
}
+const qpid::types::Variant::Map& NodeProperties::getProperties() const
+{
+ return properties;
+}
+
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h Mon Oct 21 22:04:51 2013
@@ -24,10 +24,13 @@
#include "qpid/amqp/MapReader.h"
#include "qpid/types/Variant.h"
#include "qpid/broker/QueueSettings.h"
+#include <boost/shared_ptr.hpp>
struct pn_data_t;
namespace qpid {
namespace broker {
+class Exchange;
+class Queue;
struct QueueSettings;
namespace amqp {
@@ -36,7 +39,8 @@ class NodeProperties : public qpid::amqp
public:
NodeProperties();
void read(pn_data_t*);
- void write(pn_data_t*);
+ void write(pn_data_t*,boost::shared_ptr<Queue>);
+ void write(pn_data_t*,boost::shared_ptr<Exchange>);
void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
@@ -61,7 +65,9 @@ class NodeProperties : public qpid::amqp
std::string getExchangeType() const;
std::string getAlternateExchange() const;
bool trackControllingLink() const;
+ const qpid::types::Variant::Map& getProperties() const;
private:
+ bool received;
bool queue;
bool durable;
bool autoDelete;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Oct 21 22:04:51 2013
@@ -45,6 +45,17 @@ void Outgoing::wakeup()
session.wakeup();
}
+namespace {
+bool requested_reliable(pn_link_t* link)
+{
+ return pn_link_remote_snd_settle_mode(link) == PN_SND_UNSETTLED;
+}
+bool requested_unreliable(pn_link_t* link)
+{
+ return pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+}
+}
+
OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
: Outgoing(broker, session, source, target, pn_link_name(l)),
@@ -54,7 +65,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/,
- unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
+ //for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
+ unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link))
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -106,8 +118,8 @@ void OutgoingFromQueue::handle(pn_delive
write(&buffer[0], encoder.getPosition());
Translation t(r.msg);
t.write(*this);
- if (unreliable) pn_delivery_settle(delivery);
if (pn_link_advance(link)) {
+ if (unreliable) pn_delivery_settle(delivery);
--outstanding;
outgoingMessageSent();
QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from " << queue->getName() << ", index=" << r.index);
@@ -121,6 +133,10 @@ void OutgoingFromQueue::handle(pn_delive
} else if (pn_delivery_updated(delivery)) {
assert(r.delivery == delivery);
r.disposition = pn_delivery_remote_state(delivery);
+ if (!r.disposition && pn_delivery_settled(delivery)) {
+ //if peer has settled without setting state, assume accepted
+ r.disposition = PN_ACCEPTED;
+ }
if (r.disposition) {
switch (r.disposition) {
case PN_ACCEPTED:
@@ -132,17 +148,18 @@ void OutgoingFromQueue::handle(pn_delive
outgoingMessageRejected();
break;
case PN_RELEASED:
- if (preAcquires()) queue->release(r.cursor, false);//TODO: for PN_RELEASED, delivery count should not be incremented
+ if (preAcquires()) queue->release(r.cursor, false);//for PN_RELEASED, delivery count should not be incremented
outgoingMessageRejected();//TODO: not quite true...
break;
case PN_MODIFIED:
- if (preAcquires()) queue->release(r.cursor, true);//TODO: proper handling of modified
+ if (preAcquires()) queue->release(r.cursor, pn_disposition_is_failed(pn_delivery_remote(delivery)));
+ //TODO: handle undeliverable-here and message-annotations
outgoingMessageRejected();//TODO: not quite true...
break;
default:
QPID_LOG(warning, "Unhandled disposition: " << r.disposition);
}
- //TODO: ony settle once any dequeue on store has completed
+ //TODO: only settle once any dequeue on store has completed
pn_delivery_settle(delivery);
r.reset();
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.cpp Mon Oct 21 22:04:51 2013
@@ -44,6 +44,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/format.hpp>
#include <map>
@@ -100,11 +101,13 @@ void readCapabilities(pn_data_t* data, F
if (type == PN_ARRAY) {
pn_data_enter(data);
while (pn_data_next(data)) {
- f(convert(pn_data_get_symbol(data)));
+ std::string s = convert(pn_data_get_symbol(data));
+ f(s);
}
pn_data_exit(data);
} else if (type == PN_SYMBOL) {
- f(convert(pn_data_get_symbol(data)));
+ std::string s = convert(pn_data_get_symbol(data));
+ f(s);
} else {
QPID_LOG(error, "Skipping capabilities field of type " << pn_type_name(type));
}
@@ -200,25 +203,32 @@ Session::ResolvedNode Session::resolve(c
node.exchange = connection.getBroker().getExchanges().find(name);
node.queue = connection.getBroker().getQueues().find(name);
node.topic = connection.getTopics().get(name);
+ bool createOnDemand = is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus));
+ //Strictly speaking, properties should only be specified when the
+ //terminus is dynamic. However we will not enforce that here. If
+ //properties are set on the attach request, we will set them on
+ //our reply. This allows the 'create' and 'assert' options in the
+ //qpid messaging API to be implemented over 1.0.
+ node.properties.read(pn_terminus_properties(terminus));
+
if (node.topic) node.exchange = node.topic->getExchange();
- if (node.exchange && !node.queue && is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
- node.properties.read(pn_terminus_properties(terminus));
+ if (node.exchange && !node.queue && createOnDemand) {
if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ //emulate 0-10 exchange-declare behaviour
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
}
}
if (!node.queue && !node.exchange) {
- if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) {
+ if (pn_terminus_is_dynamic(terminus) || createOnDemand) {
//is it a queue or an exchange?
- node.properties.read(pn_terminus_properties(terminus));
if (node.properties.isQueue()) {
node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
+ qpid::amqp_0_10::translate(node.properties.getProperties(), args);
node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
args, connection.getUserId(), connection.getId()).first;
}
- node.created = true;
} else {
size_t i = name.find('@');
if (i != std::string::npos && (i+1) < name.length()) {
@@ -320,12 +330,11 @@ void Session::setupIncoming(pn_link_t* l
if (node.queue) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
authorise.incoming(node.queue);
+ node.properties.write(pn_terminus_properties(pn_link_target(link)), node.queue);
} else if (node.exchange) {
setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
authorise.incoming(node.exchange);
- }
- if (node.created) {
- node.properties.write(pn_terminus_properties(pn_link_target(link)));
+ node.properties.write(pn_terminus_properties(pn_link_target(link)), node.exchange);
}
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
@@ -357,10 +366,12 @@ void Session::setupIncoming(pn_link_t* l
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
ResolvedNode node = resolve(name, source, false);
- if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
- else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
- if (node.created) {
- node.properties.write(pn_terminus_properties(pn_link_source(link)));
+ if (node.queue) {
+ setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
+ node.properties.write(pn_terminus_properties(pn_link_source(link)), node.queue);
+ } else if (node.exchange) {
+ setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
+ node.properties.write(pn_terminus_properties(pn_link_source(link)), node.exchange);
}
Filter filter;
@@ -385,11 +396,14 @@ void Session::setupOutgoing(pn_link_t* l
authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
- QueueSettings settings(durable, !durable);
+ bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+ QueueSettings settings(durable, autodelete);
+ std::string altExchange;
if (node.topic) {
settings = node.topic->getPolicy();
settings.durable = durable;
- settings.autodelete = !durable;
+ settings.autodelete = autodelete;
+ altExchange = node.topic->getAlternateExchange();
}
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
if (settings.autoDeleteDelay) {
@@ -408,7 +422,7 @@ void Session::setupOutgoing(pn_link_t* l
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
- = connection.getBroker().createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
+ = connection.getBroker().createQueue(queueName.str(), settings, this, altExchange, connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
@@ -607,6 +621,11 @@ void IncomingToExchange::handle(qpid::br
authorise.route(exchange, message);
DeliverableMessage deliverable(message, 0);
exchange->route(deliverable);
+ if (!deliverable.delivered) {
+ if (exchange->getAlternate()) {
+ exchange->getAlternate()->route(deliverable);
+ }
+ }
}
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Session.h Mon Oct 21 22:04:51 2013
@@ -100,8 +100,6 @@ class Session : public ManagedSession, p
boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
- bool created;
- ResolvedNode() : created(false) {}
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Mon Oct 21 22:04:51 2013
@@ -31,6 +31,7 @@ namespace {
const std::string TOPIC("topic");
const std::string EXCHANGE("exchange");
const std::string DURABLE("durable");
+const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EMPTY;
std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m)
@@ -52,22 +53,25 @@ qpid::types::Variant::Map filter(const q
qpid::types::Variant::Map filtered = properties;
filtered.erase(DURABLE);
filtered.erase(EXCHANGE);
+ filtered.erase(ALTERNATE_EXCHANGE);
return filtered;
}
}
Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties)
- : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties)))
+ : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties))),
+ alternateExchange(getProperty(ALTERNATE_EXCHANGE, properties))
{
if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified.");
qpid::types::Variant::Map unused;
- policy.populate(properties, unused);
+ qpid::types::Variant::Map filtered = filter(properties);
+ policy.populate(filtered, unused);
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent != 0) {
topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
- topic->set_properties(filter(properties));
+ topic->set_properties(filtered);
agent->addObject(topic);
}
}
@@ -99,7 +103,10 @@ const std::string& Topic::getName() cons
{
return name;
}
-
+const std::string& Topic::getAlternateExchange() const
+{
+ return alternateExchange;
+}
boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties)
{
boost::shared_ptr<Topic> topic(new Topic(broker, name, properties));
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp/Topic.h Mon Oct 21 22:04:51 2013
@@ -51,6 +51,7 @@ class Topic : public PersistableObject,
~Topic();
const std::string& getName() const;
const QueueSettings& getPolicy() const;
+ const std::string& getAlternateExchange() const;
boost::shared_ptr<Exchange> getExchange();
bool isDurable() const;
boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
@@ -59,6 +60,7 @@ class Topic : public PersistableObject,
bool durable;
boost::shared_ptr<Exchange> exchange;
QueueSettings policy;
+ std::string alternateExchange;
qmf::org::apache::qpid::broker::Topic::shared_ptr topic;
};
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Mon Oct 21 22:04:51 2013
@@ -52,6 +52,11 @@ uint64_t MessageTransfer::getContentSize
return frames.getContentSize();
}
+uint64_t MessageTransfer::getMessageSize() const
+{
+ return getRequiredCredit();
+}
+
std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
{
const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
@@ -180,7 +185,7 @@ class SendHeader
copy.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
for (qpid::types::Variant::Map::const_iterator i = annotations.begin();
i != annotations.end(); ++i) {
- props->getApplicationHeaders().setString(i->first, i->second.asString());
+ props->getApplicationHeaders().set(i->first, qpid::amqp_0_10::translate(i->second));
}
}
if (redelivered || ttl || timestamp) {
@@ -397,7 +402,7 @@ boost::intrusive_ptr<PersistableMessage>
boost::intrusive_ptr<MessageTransfer> clone(new MessageTransfer(this->frames));
qpid::framing::MessageProperties* mp = clone->frames.getHeaders()->get<qpid::framing::MessageProperties>(true);
for (qpid::types::Variant::Map::const_iterator i = annotations.begin(); i != annotations.end(); ++i) {
- mp->getApplicationHeaders().setString(i->first, i->second);
+ mp->getApplicationHeaders().set(i->first, qpid::amqp_0_10::translate(i->second));
}
return clone;
}
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Mon Oct 21 22:04:51 2013
@@ -45,6 +45,7 @@ class MessageTransfer : public qpid::bro
bool isPersistent() const;
uint8_t getPriority() const;
uint64_t getContentSize() const;
+ uint64_t getMessageSize() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
std::string getPropertyAsString(const std::string& key) const;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.cpp Mon Oct 21 22:04:51 2013
@@ -38,7 +38,6 @@ const std::string QueueOptions::strFLOW_
const std::string QueueOptions::strRING("ring");
const std::string QueueOptions::strRING_STRICT("ring_strict");
const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
-const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
const std::string QueueOptions::strLastValueQueueNoBrowse("qpid.last_value_queue_no_browse");
const std::string QueueOptions::strQueueEventMode("qpid.queue_event_generation");
@@ -74,11 +73,6 @@ void QueueOptions::setSizePolicy(QueueSi
}
-void QueueOptions::setPersistLastNode()
-{
- setInt(strPersistLastNode, 1);
-}
-
void QueueOptions::setOrdering(QueueOrderingPolicy op)
{
if (op == LVQ){
@@ -102,11 +96,6 @@ void QueueOptions::clearSizePolicy()
erase(strTypeKey);
}
-void QueueOptions::clearPersistLastNode()
-{
- erase(strPersistLastNode);
-}
-
void QueueOptions::clearOrdering()
{
erase(strLastValueQueue);
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.h
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.h?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.h (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/QueueOptions.h Mon Oct 21 22:04:51 2013
@@ -56,12 +56,6 @@ class QPID_CLIENT_CLASS_EXTERN QueueOpti
QPID_CLIENT_EXTERN void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount );
/**
- * Enables the persisting of a queue to the store module when a cluster fails down to it's last
- * node. Does so optimistically. Will start persisting when cluster count >1 again.
- */
- QPID_CLIENT_EXTERN void setPersistLastNode();
-
- /**
* Sets the odering policy on the Queue, default ordering is FIFO.
*/
QPID_CLIENT_EXTERN void setOrdering(QueueOrderingPolicy op);
@@ -72,11 +66,6 @@ class QPID_CLIENT_CLASS_EXTERN QueueOpti
QPID_CLIENT_EXTERN void clearSizePolicy();
/**
- * Clear Persist Last Node Policy
- */
- QPID_CLIENT_EXTERN void clearPersistLastNode();
-
- /**
* get the key used match LVQ in args for message transfer
*/
QPID_CLIENT_EXTERN void getLVQKey(std::string& key);
@@ -116,7 +105,6 @@ class QPID_CLIENT_CLASS_EXTERN QueueOpti
static QPID_CLIENT_EXTERN const std::string strRING;
static QPID_CLIENT_EXTERN const std::string strRING_STRICT;
static QPID_CLIENT_EXTERN const std::string strLastValueQueue;
- static QPID_CLIENT_EXTERN const std::string strPersistLastNode;
static QPID_CLIENT_EXTERN const std::string strLVQMatchProperty;
static QPID_CLIENT_EXTERN const std::string strLastValueQueueNoBrowse;
static QPID_CLIENT_EXTERN const std::string strQueueEventMode;
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Mon Oct 21 22:04:51 2013
@@ -490,7 +490,7 @@ void SessionImpl::releaseImpl(qpid::mess
{
SequenceSet set;
set.add(MessageImplAccess::get(m).getInternalId());
- session.messageRelease(set);
+ session.messageRelease(set, true);
}
void SessionImpl::receiverCancelled(const std::string& name)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/deq_rec.cpp Mon Oct 21 22:04:51 2013
@@ -270,12 +270,14 @@ deq_rec::decode(rec_hdr& h, void* rptr,
// Get and check header
_deq_hdr.hdr_copy(h);
rd_cnt = sizeof(rec_hdr);
- _deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt);
+ //_deq_hdr._deq_rid = *(u_int64_t*)((char*)rptr + rd_cnt);
+ std::memcpy((void*)&_deq_hdr._deq_rid, (char*)rptr + rd_cnt, sizeof(u_int64_t));
rd_cnt += sizeof(u_int64_t);
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
rd_cnt += sizeof(u_int32_t); // Filler 0
#endif
- _deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ //_deq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ std::memcpy((void*)&_deq_hdr._xidsize, (char*)rptr + rd_cnt, sizeof(std::size_t));
rd_cnt = _deq_hdr.size();
chk_hdr();
if (_deq_hdr._xidsize)
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/enq_rec.cpp Mon Oct 21 22:04:51 2013
@@ -360,7 +360,8 @@ enq_rec::decode(rec_hdr& h, void* rptr,
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
rd_cnt += sizeof(u_int32_t); // Filler 0
#endif
- _enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ //_enq_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ std::memcpy((void*)&_enq_hdr._xidsize, (char*)rptr + rd_cnt, sizeof(std::size_t));
rd_cnt += sizeof(std::size_t);
#if defined(JRNL_LITTLE_ENDIAN) && defined(JRNL_32_BIT)
rd_cnt += sizeof(u_int32_t); // Filler 0
@@ -368,7 +369,8 @@ enq_rec::decode(rec_hdr& h, void* rptr,
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
rd_cnt += sizeof(u_int32_t); // Filler 1
#endif
- _enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ //_enq_hdr._dsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ std::memcpy((void*)&_enq_hdr._dsize, (char*)rptr + rd_cnt, sizeof(std::size_t));
rd_cnt = _enq_hdr.size();
chk_hdr();
if (_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize))
Modified: qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp?rev=1534394&r1=1534393&r2=1534394&view=diff
==============================================================================
--- qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp (original)
+++ qpid/branches/linearstore/qpid/cpp/src/qpid/legacystore/jrnl/txn_rec.cpp Mon Oct 21 22:04:51 2013
@@ -270,7 +270,8 @@ txn_rec::decode(rec_hdr& h, void* rptr,
#if defined(JRNL_BIG_ENDIAN) && defined(JRNL_32_BIT)
rd_cnt += sizeof(u_int32_t); // Filler 0
#endif
- _txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ //_txn_hdr._xidsize = *(std::size_t*)((char*)rptr + rd_cnt);
+ std::memcpy((void*)&_txn_hdr._xidsize, (char*)rptr + rd_cnt, sizeof(std::size_t));
rd_cnt = _txn_hdr.size();
chk_hdr();
_buff = std::malloc(_txn_hdr._xidsize);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org