You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 11:15:49 UTC
svn commit: r1560618 [2/5] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/bin/ qpid/cpp/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.receiver/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.sender/
qpid/cpp/bindings/...
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.1
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.1?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.1 (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.1 Thu Jan 23 10:15:46 2014
@@ -1,230 +1,369 @@
-.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.38.2.
-.TH QPIDD "1" "March 2011" "qpidd (qpidc) version 0.11" "User Commands"
+.\"
+.\" Licensed to the Apache Software Foundation (ASF) under one
+.\" or more contributor license agreements. See the NOTICE file
+.\" distributed with this work for additional information
+.\" regarding copyright ownership. The ASF licenses this file
+.\" to you under the Apache License, Version 2.0 (the
+.\" "License"); you may not use this file except in compliance
+.\" with the License. You may obtain a copy of the License at
+.\"
+.\" http://www.apache.org/licenses/LICENSE-2.0
+.\"
+.\" Unless required by applicable law or agreed to in writing,
+.\" software distributed under the License is distributed on an
+.\" "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.\" KIND, either express or implied. See the License for the
+.\" specific language governing permissions and limitations
+.\" under the License.
+.\"
+
+.TH QPIDD "1" "December 2013" "qpidd (qpid-cpp) version 0.27" "User Commands"
.SH NAME
qpidd \- the Qpid AMQP Message Broker Daemon
+
.SH SYNOPSIS
qpidd [-p port] [--config config_file] [--data-dir directory]
+
.SH DESCRIPTION
An AMQP message broker daemon that stores, routes and forwards
messages using the Advanced Message Queueing Protocol (AMQP).
+
.SH OPTIONS
The options below are built-in to qpidd. Installing add-on modules provides additional options. To see the full set of options available type "qpidd --help"
Options may be specified via command line, environment variable or configuration file. See FILES and ENVIRONMENT below for details.
+
.PP
+.SS Options
+
.TP
-\fB\-h\fR [ \fB\-\-help\fR ]
+\-h [ \-\-help ]
Displays the help message
.TP
-\fB\-v\fR [ \fB\-\-version\fR ]
+\-v [ \-\-version ]
Displays version information
.TP
-\fB\-\-config\fR FILE
+\-\-config FILE (/etc/qpid/qpidd.conf)
Reads configuration from FILE
-.SS "Module options:"
-.TP
-\fB\-\-module\-dir\fR DIR
-Load all shareable modules in this
+\-\-client\-config FILE (/etc/qpid/qpidc.conf)
+Reads client configuration from FILE
+(for cluster interconnect)
+
+.SS Module options
+\-\-module\-dir DIR (/usr/lib64/qpid/daemon)
+Load all shareable modules in this
directory
.TP
-\fB\-\-load\-module\fR FILE
-Specifies additional module(s) to be
+\-\-load\-module FILE
+Specifies additional module(s) to be
loaded
.TP
-\fB\-\-no\-module\-dir\fR
-Don't load modules from module
+\-\-no\-module\-dir
+Don't load modules from module
directory
-.SS "Broker Options:"
-.TP
-\fB\-\-data\-dir\fR DIR
-Directory to contain persistent data
+
+.SS Broker Options
+\-\-data\-dir DIR (/home/aconway/.qpidd)
+Directory to contain persistent data
generated by the broker
.TP
-\fB\-\-no\-data\-dir\fR
-Don't use a data directory. No
+\-\-no\-data\-dir
+Don't use a data directory. No
persistent configuration will be loaded
or stored
.TP
-\fB\-p\fR [ \fB\-\-port\fR ] PORT (5672)
+\-\-paging\-dir DIR
+Directory in which paging files will be
+created for paged queues
+.TP
+\-p [ \-\-port ] PORT (5672)
Tells the broker to listen on PORT
+\-\-interface <interface name>|<interface address>
+Which network interfaces to use to
+listen for incoming connections
+\-\-listen\-disable <transport name> Transports to disable listening
.TP
-\fB\-\-worker\-threads\fR N (3)
+\-\-worker\-threads N (5)
Sets the broker thread pool size
.TP
-\fB\-\-max\-connections\fR N (500)
-Sets the maximum allowed connections
-.TP
-\fB\-\-connection\-backlog\fR N (10)
-Sets the connection backlog limit for
+\-\-connection\-backlog N (10)
+Sets the connection backlog limit for
the server socket
.TP
-\fB\-m\fR [ \fB\-\-mgmt\-enable\fR ] yes|no (1)
+\-m [ \-\-mgmt\-enable ] yes|no (1)
Enable Management
.TP
-\fB\-\-mgmt\-qmf2\fR yes|no (1)
-Enable broadcast of management
+\-\-mgmt\-publish yes|no (1)
+Enable Publish of Management Data (\&'no'
+implies query\-only)
+.TP
+\-\-mgmt\-qmf2 yes|no (1)
+Enable broadcast of management
information over QMF v2
.TP
-\fB\-\-mgmt\-qmf1\fR yes|no (1)
-Enable broadcast of management
+\-\-mgmt\-qmf1 yes|no (0)
+Enable broadcast of management
information over QMF v1
.TP
-\fB\-\-mgmt\-pub\-interval\fR SECONDS (10)
+\-\-mgmt\-pub\-interval SECONDS (10s)
Management Publish Interval
-.TP
-\fB\-\-queue\-purge\-interval\fR SECONDS (600)
-Interval between attempts to purge any
+\-\-queue\-purge\-interval SECONDS (600s)
+Interval between attempts to purge any
expired messages from queues
.TP
-\fB\-\-auth\fR yes|no (1)
-Enable authentication, if disabled all
+\-\-auth yes|no (1)
+Enable authentication, if disabled all
incoming connections will be trusted
.TP
-\fB\-\-realm\fR REALM (QPID)
-Use the given realm when performing
+\-\-realm REALM (QPID)
+Use the given realm when performing
authentication
-.TP
-\fB\-\-default\-queue\-limit\fR BYTES (104857600)
-Default maximum size for queues (in
+\-\-default\-queue\-limit BYTES (104857600)
+Default maximum size for queues (in
bytes)
.TP
-\fB\-\-tcp\-nodelay\fR
+\-\-tcp\-nodelay
Set TCP_NODELAY on TCP connections
.TP
-\fB\-\-require\-encryption\fR
-Only accept connections that are
+\-\-require\-encryption
+Only accept connections that are
encrypted
-.TP
-\fB\-\-known\-hosts\-url\fR URL or 'none'
-URL to send as 'known\-hosts' to clients
-('none' implies empty list)
-.TP
-\fB\-\-sasl\-config\fR DIR
-gets sasl config info from nonstandard
-location
-.TP
-\fB\-\-max\-session\-rate\fR MESSAGES/S (0)
-Sets the maximum message rate per
-session (0=unlimited)
-.TP
-\fB\-\-async\-queue\-events\fR yes|no (0)
-Set Queue Events async, used for
-services like replication
-.TP
-\fB\-\-default\-flow\-stop\-threshold\fR PERCENT (80)
-Percent of queue's maximum capacity at
+\-\-known\-hosts\-url URL or \&'none' (none)
+URL to send as \&'known\-hosts' to clients
+(\&'none' implies empty list)
+.TP
+\-\-sasl\-config DIR
+Allows SASL config path, if supported
+by platform, to be overridden. For
+default location on Linux, see Cyrus
+SASL documentation. There is no SASL
+config dir on Windows.
+\-\-default\-flow\-stop\-threshold PERCENT (80)
+Percent of queue's maximum capacity at
which flow control is activated.
-.TP
-\fB\-\-default\-flow\-resume\-threshold\fR PERCENT (70)
-Percent of queue's maximum capacity at
+\-\-default\-flow\-resume\-threshold PERCENT (70)
+Percent of queue's maximum capacity at
which flow control is de\-activated.
-.TP
-\fB\-\-default\-event\-threshold\-ratio\fR %age of limit (80)
-The ratio of any specified queue limit
+\-\-default\-event\-threshold\-ratio %age of limit (80)
+The ratio of any specified queue limit
at which an event will be raised
-.SS "Logging options:"
+\-\-default\-message\-group GROUP\-IDENTIFER (qpid.no\-group)
+Group identifier to assign to messages
+delivered to a message group queue that
+do not contain an identifier.
+.TP
+\-\-enable\-timestamp yes|no (0)
+Add current time to each received
+message.
+\-\-link\-maintenance\-interval SECONDS (2s)
+Interval to check link health and
+.TP
+ re\-connect
+if need be
+\-\-link\-heartbeat\-interval SECONDS (120s)
+Heartbeat interval for a federation
+link
+\-\-max\-negotiate\-time MILLISECONDS (10000)
+Maximum time a connection can take to
+send the initial protocol negotiation
+.TP
+\-\-federation\-tag NAME
+Override the federation tag
+
+.SS Logging options
.TP
-\fB\-t\fR [ \fB\-\-trace\fR ]
+\-t [ \-\-trace ]
Enables all logging
.TP
-\fB\-\-log\-enable\fR RULE (notice+)
+\-\-log\-enable RULE (notice+)
Enables logging for selected levels and
-components. RULE is in the form
-\&'LEVEL[+][:PATTERN]' Levels are one of:
-.IP
-trace debug info notice warning error
-.IP
+components. RULE is in the form
+\&'LEVEL[+\-][:PATTERN]'
+LEVEL is one of:
+trace debug info notice warning error
critical
+PATTERN is a logging category name, or
+a namespace\-qualified function name or
+name fragment. Logging category names
+are:
+Security Broker Management Protocol
+System HA Messaging Store Network Test
+Client Model Unspecified
For example:
-\&'\-\-log\-enable warning+' logs all
-warning, error and critical messages.
-\&'\-\-log\-enable debug:framing' logs debug
-messages from the framing namespace.
+\&'\-\-log\-enable warning+'
+logs all warning, error and critical
+messages.
+\&'\-\-log\-enable trace+:Broker'
+logs all category \&'Broker' messages.
+\&'\-\-log\-enable debug:framing'
+logs debug messages from all functions
+with \&'framing' in the namespace or
+function name.
This option can be used multiple times
.TP
-\fB\-\-log\-time\fR yes|no (1)
+\-\-log\-disable RULE
+Disables logging for selected levels
+and components. RULE is in the form
+\&'LEVEL[+\-][:PATTERN]'
+LEVEL is one of:
+trace debug info notice warning error
+critical
+PATTERN is a logging category name, or
+a namespace\-qualified function name or
+name fragment. Logging category names
+are:
+Security Broker Management Protocol
+System HA Messaging Store Network Test
+Client Model Unspecified
+For example:
+\&'\-\-log\-disable warning\-'
+disables logging all warning, notice,
+info, debug, and trace messages.
+\&'\-\-log\-disable trace:Broker'
+disables all category \&'Broker' trace
+messages.
+\&'\-\-log\-disable debug\-:qmf::'
+disables logging debug and trace
+messages from all functions with
+\&'qmf::' in the namespace.
+This option can be used multiple times
+.TP
+\-\-log\-time yes|no (1)
Include time in log messages
.TP
-\fB\-\-log\-level\fR yes|no (1)
+\-\-log\-level yes|no (1)
Include severity level in log messages
.TP
-\fB\-\-log\-source\fR yes|no (0)
-Include source file:line in log
+\-\-log\-source yes|no (0)
+Include source file:line in log
messages
.TP
-\fB\-\-log\-thread\fR yes|no (0)
+\-\-log\-thread yes|no (0)
Include thread ID in log messages
.TP
-\fB\-\-log\-function\fR yes|no (0)
-Include function signature in log
+\-\-log\-function yes|no (0)
+Include function signature in log
+messages
+.TP
+\-\-log\-hires\-timestamp yes|no (0)
+Use hi\-resolution timestamps in log
messages
.TP
-\fB\-\-log\-prefix\fR STRING
-Prefix to append to all log messages
-.SS "Logging sink options:"
+\-\-log\-category yes|no (1)
+Include category in log messages
.TP
-\fB\-\-log\-to\-stderr\fR yes|no (1)
+\-\-log\-prefix STRING
+Prefix to prepend to all log messages
+
+.SS Logging sink options
+.TP
+\-\-log\-to\-stderr yes|no (1)
Send logging output to stderr
.TP
-\fB\-\-log\-to\-stdout\fR yes|no (0)
+\-\-log\-to\-stdout yes|no (0)
Send logging output to stdout
.TP
-\fB\-\-log\-to\-file\fR FILE
+\-\-log\-to\-file FILE
Send log output to FILE.
.TP
-\fB\-\-log\-to\-syslog\fR yes|no (0)
+\-\-log\-to\-syslog yes|no (0)
Send logging output to syslog;
-customize using \fB\-\-syslog\-name\fR and
-\fB\-\-syslog\-facility\fR
+customize using \-\-syslog\-name and
+\-\-syslog\-facility
.TP
-\fB\-\-syslog\-name\fR NAME (lt\-qpidd)
+\-\-syslog\-name NAME (qpidd)
Name to use in syslog messages
-.TP
-\fB\-\-syslog\-facility\fR LOG_XXX (LOG_DAEMON)
+\-\-syslog\-facility LOG_XXX (LOG_DAEMON)
Facility to use in syslog messages
-.SS "Daemon options:"
+
+.SS Daemon options
.TP
-\fB\-d\fR [ \fB\-\-daemon\fR ]
-Run as a daemon. Logs to syslog by
+\-d [ \-\-daemon ]
+Run as a daemon. Logs to syslog by
default in this mode.
.TP
-\fB\-\-transport\fR TRANSPORT (tcp)
-The transport for which to return the
+\-\-transport TRANSPORT (tcp)
+The transport for which to return the
port
.TP
-\fB\-\-pid\-dir\fR DIR
-Directory where port\-specific PID file
+\-\-pid\-dir DIR (/home/aconway/.qpidd)
+Directory where port\-specific PID file
is stored
.TP
-\fB\-w\fR [ \fB\-\-wait\fR ] SECONDS (600)
-Sets the maximum wait time to
-initialize the daemon. If the daemon
-fails to initialize, prints an error
-and returns 1
-.TP
-\fB\-c\fR [ \fB\-\-check\fR ]
-Prints the daemon's process ID to
-stdout and returns 0 if the daemon is
+\-w [ \-\-wait ] SECONDS (600)
+Sets the maximum wait time to
+initialize or shutdown the daemon. If
+the daemon fails to initialize/shutdown
+, prints an error and returns 1
+.TP
+\-c [ \-\-check ]
+Prints the daemon's process ID to
+stdout and returns 0 if the daemon is
running, otherwise returns 1
.TP
-\fB\-q\fR [ \fB\-\-quit\fR ]
+\-q [ \-\-quit ]
Tells the daemon to shut down
-.SH ENVIRONMENT
-.I QPID_<option>
-.RS
-There is an environment variable for each option.
-.RE
-The environment variable is the option name in uppercase, prefixed with QPID_ and '.' or '-' are replaced with '_'. Environment settings are over-ridden by command line settings. For example:
+.TP
+\-\-socket\-fd FD
+File descriptor for tcp listening socket
+
+.SS ACL Options
+.TP
+\-\-acl\-file FILE
+The policy file to load from, loaded from
+data dir
+.TP
+\-\-connection\-limit\-per\-user N (0)
+The maximum number of connections allowed
+per user. 0 implies no limit.
+.TP
+\-\-max\-connections N (500)
+The maximum combined number of connections
+allowed. 0 implies no limit.
+.TP
+\-\-connection\-limit\-per\-ip N (0)
+The maximum number of connections allowed
+per host IP address. 0 implies no limit.
+.TP
+\-\-max\-queues\-per\-user N (0)
+The maximum number of queues allowed per
+user. 0 implies no limit.
+
+.SS SSL Settings
+.TP
+\-\-ssl\-use\-export\-policy
+Use NSS export policy
+.TP
+\-\-ssl\-cert\-password\-file PATH
+File containing password to use for
+accessing certificate database
+.TP
+\-\-ssl\-cert\-db PATH
+Path to directory containing certificate
+database
+.TP
+\-\-ssl\-cert\-name NAME (gonzo)
+Name of the certificate to use
+.TP
+\-\-ssl\-port PORT (5671)
+Port on which to listen for SSL
+connections
+.TP
+\-\-ssl\-require\-client\-authentication
+Forces clients to authenticate in order
+to establish an SSL connection
+.TP
+\-\-ssl\-sasl\-no\-dict
+Disables SASL mechanisms that are
+vulnerable to passive dictionary\-based
+password attacks
- export QPID_PORT=6000
- export QPID_MAX_CONNECTIONS=10
- export QPID_LOG_TO_FILE=/tmp/qpidd.log
.SH FILES
.I /etc/qpidd.conf
.RS
@@ -239,9 +378,23 @@ Each line is a name=value pair. Blank li
port=6000
max-connections=10
log-to-file=/tmp/qpidd.log
+
+.SH ENVIRONMENT
+.I QPID_<option>
+.RS
+There is an environment variable for each option.
+.RE
+
+The environment variable is the option name in uppercase, prefixed with QPID_ and '.' or '-' are replaced with '_'. Environment settings are over-ridden by command line settings. For example:
+
+ export QPID_PORT=6000
+ export QPID_MAX_CONNECTIONS=10
+ export QPID_LOG_TO_FILE=/tmp/qpidd.log
+
.SH AUTHOR
The Apache Qpid Project, dev@qpid.apache.org
-.SH "REPORTING BUGS"
+
+.SH REPORTING BUGS
Please report bugs to users@qpid.apache.org
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.x
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.x?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.x (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/man/qpidd.x Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+.\"
+.\" Licensed to the Apache Software Foundation (ASF) under one
+.\" or more contributor license agreements. See the NOTICE file
+.\" distributed with this work for additional information
+.\" regarding copyright ownership. The ASF licenses this file
+.\" to you under the Apache License, Version 2.0 (the
+.\" "License"); you may not use this file except in compliance
+.\" with the License. You may obtain a copy of the License at
+.\"
+.\" http://www.apache.org/licenses/LICENSE-2.0
+.\"
+.\" Unless required by applicable law or agreed to in writing,
+.\" software distributed under the License is distributed on an
+.\" "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.\" KIND, either express or implied. See the License for the
+.\" specific language governing permissions and limitations
+.\" under the License.
+.\"
+
[NAME]
qpidd \- the Qpid AMQP Message Broker Daemon
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/src/CONTENTS
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/src/CONTENTS?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/src/CONTENTS (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/docs/src/CONTENTS Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
This directory contains documentation about the C++ source
that is expressed in formats that does not fit comfortably
within C++ source files.
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/etc/cluster.conf-example.xml.in
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/etc/cluster.conf-example.xml.in?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/etc/cluster.conf-example.xml.in (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/etc/cluster.conf-example.xml.in Thu Jan 23 10:15:46 2014
@@ -1,4 +1,24 @@
<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
<!--
This is an example of a cluster.conf file to run qpidd HA under rgmanager.
This example assumes a 3 node cluster, with nodes named node1, node2 and node3.
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/drain.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/drain.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/drain.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/drain.cpp Thu Jan 23 10:15:46 2014
@@ -21,6 +21,7 @@
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Message_io.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
@@ -81,8 +82,9 @@ int main(int argc, char** argv)
{
Options options;
if (options.parse(argc, argv) && options.checkAddress()) {
- Connection connection(options.url, options.connectionOptions);
+ Connection connection;
try {
+ connection = Connection(options.url, options.connectionOptions);
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(options.address);
@@ -92,17 +94,7 @@ int main(int argc, char** argv)
int i = 0;
while (receiver.fetch(message, timeout)) {
- std::cout << "Message(properties=" << message.getProperties();
- if (!message.getSubject().empty()) {
- std::cout << ", subject='" << message.getSubject() << "'";
- }
- std::cout << ", content='";
- if (message.getContentType() == "amqp/map") {
- std::cout << message.getContentObject().asMap();
- } else {
- std::cout << message.getContentObject();
- }
- std::cout << "')" << std::endl;
+ std::cout << message << std::endl;
session.acknowledge();
if (count && (++i == count))
break;
@@ -112,7 +104,7 @@ int main(int argc, char** argv)
connection.close();
return 0;
} catch(const std::exception& error) {
- std::cout << error.what() << std::endl;
+ std::cout << "Error: " << error.what() << std::endl;
connection.close();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/spout.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/spout.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/spout.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/examples/messaging/spout.cpp Thu Jan 23 10:15:46 2014
@@ -22,6 +22,7 @@
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/Message_io.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
@@ -51,6 +52,7 @@ struct Options : OptionParser
string_vector entries;
std::string content;
std::string connectionOptions;
+ bool print;
Options()
: OptionParser("Usage: spout [OPTIONS] ADDRESS", "Send messages to the specified address"),
@@ -69,6 +71,7 @@ struct Options : OptionParser
add("map,M", entries, "specify entry for map content");
add("content", content, "specify textual content");
add("connection-options", connectionOptions, "connection options string in the form {name1:value1, name2:value2}");
+ add("print", print, "print each message sent");
}
static bool nameval(const std::string& in, std::string& name, std::string& value)
@@ -137,7 +140,6 @@ struct Options : OptionParser
}
};
-
int main(int argc, char** argv)
{
Options options;
@@ -170,6 +172,7 @@ int main(int argc, char** argv)
std::stringstream spoutid;
spoutid << id << ":" << count;
message.getProperties()["spout-id"] = spoutid.str();
+ if (options.print) std::cout << message << std::endl;
sender.send(message);
}
session.sync();
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/managementgen/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/managementgen/CMakeLists.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/managementgen/CMakeLists.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/managementgen/CMakeLists.txt Thu Jan 23 10:15:46 2014
@@ -22,7 +22,7 @@ cmake_minimum_required(VERSION 2.4.0 FAT
find_package(PythonInterp REQUIRED)
execute_process(COMMAND ${PYTHON_EXECUTABLE}
- -c "from distutils.sysconfig import get_python_lib; print get_python_lib(False)"
+ -c "from distutils.sysconfig import get_python_lib; print get_python_lib(False, prefix='${CMAKE_INSTALL_PREFIX}')"
OUTPUT_VARIABLE PYTHON_SITEARCH_PACKAGES
OUTPUT_STRIP_TRAILING_WHITESPACE)
Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/CMakeLists.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/CMakeLists.txt Thu Jan 23 10:15:46 2014
@@ -183,14 +183,11 @@ if (CMAKE_COMPILER_IS_GNUCXX)
set (HIDE_SYMBOL_FLAGS "-fno-visibility-inlines-hidden")
else (GCC_VERSION VERSION_EQUAL 4.1.2)
set (HIDE_SYMBOL_FLAGS "-fno-visibility-inlines-hidden -fvisibility=hidden")
+ set (QPID_LINKMAP ${CMAKE_CURRENT_SOURCE_DIR}/qpid.linkmap)
+ set (LINK_VERSION_SCRIPT_FLAG "-Wl,--version-script=${QPID_LINKMAP}")
endif (GCC_VERSION VERSION_EQUAL 4.1.2)
endif (CMAKE_COMPILER_IS_GNUCXX)
-if (CMAKE_SYSTEM_NAME STREQUAL Linux)
- set (QPID_LINKMAP ${CMAKE_CURRENT_SOURCE_DIR}/qpid.linkmap)
- set (LINK_VERSION_SCRIPT_FLAG "-Wl,--version-script=${QPID_LINKMAP}")
-endif (CMAKE_SYSTEM_NAME STREQUAL Linux)
-
if (CMAKE_CXX_COMPILER_ID STREQUAL SunPro)
set (COMPILER_FLAGS "-library=stlport4 -mt")
set (WARNING_FLAGS "+w")
@@ -584,7 +581,6 @@ option(BUILD_HA "Build Active-Passive HA
if (BUILD_HA)
set (ha_SOURCES
qpid/ha/QueueSnapshot.h
- qpid/ha/QueueSnapshots.h
qpid/ha/AlternateExchangeSetter.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
@@ -1056,6 +1052,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/Duration.cpp
qpid/messaging/exceptions.cpp
qpid/messaging/FailoverUpdates.cpp
+ qpid/messaging/Logger.cpp
qpid/messaging/Message.cpp
qpid/messaging/Receiver.cpp
qpid/messaging/Session.cpp
@@ -1065,6 +1062,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/ConnectionOptions.cpp
qpid/messaging/MessageImpl.h
qpid/messaging/MessageImpl.cpp
+ qpid/messaging/Message_io.cpp
qpid/messaging/ProtocolRegistry.cpp
qpid/messaging/amqp/EncodedMessage.h
qpid/messaging/amqp/EncodedMessage.cpp
@@ -1073,7 +1071,7 @@ set (qpidmessaging_SOURCES
add_msvc_version (qpidmessaging library dll)
add_library (qpidmessaging SHARED ${qpidmessaging_SOURCES})
-target_link_libraries (qpidmessaging qpidtypes qpidclient qpidcommon ${PROTON_LIBRARIES})
+target_link_libraries (qpidmessaging qpidtypes qpidclient qpidcommon "${Boost_PROGRAM_OPTIONS_LIBRARY}" ${PROTON_LIBRARIES})
set_target_properties (qpidmessaging PROPERTIES
LINK_FLAGS "${HIDE_SYMBOL_FLAGS} ${LINK_VERSION_SCRIPT_FLAG}"
COMPILE_FLAGS "${HIDE_SYMBOL_FLAGS}"
Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/CMakeLists.txt:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/check-abi
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/check-abi?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/check-abi (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/check-abi Thu Jan 23 10:15:46 2014
@@ -1,5 +1,24 @@
#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# Ask the compiler the implementation specific type for a standard typedeffed type
# (int64_t, size_t etc.). Operates by test compiling and using the demangling ABI call.
#
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidmessaging-api-symbols.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidmessaging-api-symbols.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidmessaging-api-symbols.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidmessaging-api-symbols.txt Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# Address
qpid::messaging::Address::Address()
qpid::messaging::Address::Address(std::string const&)
@@ -96,6 +115,14 @@ qpid::messaging::Message::getContentPtr(
qpid::messaging::Message::getContentSize() const
qpid::messaging::Message::setProperty(std::string const&, qpid::types::Variant const&)
+# Logger
+qpid::messaging::Logger::configure(int, char const**, std::string const&)
+qpid::messaging::Logger::log(qpid::messaging::Level, char const*, int, char const*, std::string const&)
+qpid::messaging::Logger::setOutput(qpid::messaging::LoggerOutput&)
+qpid::messaging::Logger::usage()
+
+qpid::messaging::LoggerOutput::~LoggerOutput()
+
# Receiver
qpid::messaging::Receiver::Receiver(qpid::messaging::ReceiverImpl*)
qpid::messaging::Receiver::Receiver(qpid::messaging::Receiver const&)
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidtypes-api-symbols.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidtypes-api-symbols.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidtypes-api-symbols.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/libqpidtypes-api-symbols.txt Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# Uuid
qpid::types::Uuid::SIZE
qpid::types::Uuid::Uuid(bool)
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qmf2.pc.in
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qmf2.pc.in?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qmf2.pc.in (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qmf2.pc.in Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
@@ -7,5 +26,5 @@ Name: qmf2
Version: @VERSION@
Description: Qpid Management Framework
Requires: qpid
-Libs: -L${libdir} -lqmf2 @LIBS@
-Cflags: -I${includedir}
+Libs: -L@libdir@ -lqmf2 @LIBS@
+Cflags: -I@includedir@
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.linkmap
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.linkmap?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.linkmap (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.linkmap Thu Jan 23 10:15:46 2014
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
{
global:
extern "C++" {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.pc.in
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.pc.in?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.pc.in (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid.pc.in Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
@@ -7,5 +26,5 @@ Name: qpid
Version: @VERSION@
Description: Qpid C++ client library
Requires:
-Libs: -L${libdir} -lqpidmessaging -lqpidtypes @LIBS@
-Cflags: -I${includedir}
+Libs: -L@libdir@ -lqpidmessaging -lqpidtypes @LIBS@
+Cflags: -I@includedir@
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Decoder.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Decoder.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Decoder.cpp Thu Jan 23 10:15:46 2014
@@ -301,7 +301,7 @@ Descriptor Decoder::readDescriptor()
void Decoder::advance(size_t n)
{
- if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds"));
+ if (n > available()) throw qpid::Exception(QPID_MSG("Out of Bounds: requested advance of " << n << " at " << position << " but only " << available() << " available"));
position += n;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.cpp Thu Jan 23 10:15:46 2014
@@ -246,6 +246,12 @@ void Encoder::writeLong(int64_t i, const
write((uint64_t) i, typecodes::LONG, d);
}
+void Encoder::writeTimestamp(int64_t t, const Descriptor* d)
+{
+ write((uint64_t) t, typecodes::TIMESTAMP, d);
+}
+
+
void Encoder::writeFloat(float f, const Descriptor* d)
{
write(f, typecodes::FLOAT, d);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/Encoder.h Thu Jan 23 10:15:46 2014
@@ -71,6 +71,7 @@ class Encoder
void writeFloat(float, const Descriptor* d=0);
void writeDouble(double, const Descriptor* d=0);
void writeUuid(const qpid::types::Uuid&, const Descriptor* d=0);
+ void writeTimestamp(int64_t, const Descriptor* d=0);
void writeSymbol(const CharSequence&, const Descriptor* d=0);
void writeSymbol(const std::string&, const Descriptor* d=0);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/amqp/MessageEncoder.cpp Thu Jan 23 10:15:46 2014
@@ -116,10 +116,10 @@ void MessageEncoder::writeProperties(con
if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding());
else if (fields > 7) writeNull();
- if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime());
+ if (msg.hasAbsoluteExpiryTime()) writeTimestamp(msg.getAbsoluteExpiryTime());
else if (fields > 8) writeNull();
- if (msg.hasCreationTime()) writeLong(msg.getCreationTime());
+ if (msg.hasCreationTime()) writeTimestamp(msg.getCreationTime());
else if (fields > 9) writeNull();
if (msg.hasGroupId()) writeString(msg.getGroupId());
Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/broker:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jan 23 10:15:46 2014
@@ -200,9 +200,10 @@ void Bridge::create(amqp_0_10::Connectio
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}
-void Bridge::cancel(amqp_0_10::Connection&)
+void Bridge::cancel(amqp_0_10::Connection& c)
{
- if (resetProxy()) {
+ // If &c != conn then we have failed over so the old connection is closed.
+ if (&c == conn && resetProxy()) {
peer->getMessage().cancel(args.i_dest);
peer->getSession().detach(sessionName);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Broker.cpp Thu Jan 23 10:15:46 2014
@@ -417,6 +417,12 @@ boost::intrusive_ptr<Broker> Broker::cre
void Broker::setStore (const boost::shared_ptr<MessageStore>& _store)
{
+ // Exit now if multiple store plugins are attempting to load
+ if (!NullMessageStore::isNullStore(store.get())) {
+ QPID_LOG(error, "Multiple store plugins are not supported");
+ throw Exception(QPID_MSG("Failed to start broker: Multiple store plugins were loaded"));
+ }
+
store.reset(new MessageStoreModule (_store));
setStore();
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/BrokerObservers.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/BrokerObservers.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/BrokerObservers.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/BrokerObservers.h Thu Jan 23 10:15:46 2014
@@ -24,19 +24,14 @@
#include "BrokerObserver.h"
#include "Observers.h"
-#include "qpid/sys/Mutex.h"
namespace qpid {
namespace broker {
/**
- * A broker observer that delegates to a collection of broker observers.
- *
- * THREAD SAFE
+ * Collection of BrokerObserver.
*/
-class BrokerObservers : public BrokerObserver,
- public Observers<BrokerObserver>
-{
+class BrokerObservers : public Observers<BrokerObserver> {
public:
void queueCreate(const boost::shared_ptr<Queue>& q) {
each(boost::bind(&BrokerObserver::queueCreate, _1, q));
@@ -54,15 +49,13 @@ class BrokerObservers : public BrokerObs
const boost::shared_ptr<Queue>& queue,
const std::string& key,
const framing::FieldTable& args) {
- each(boost::bind(
- &BrokerObserver::bind, _1, exchange, queue, key, args));
+ each(boost::bind(&BrokerObserver::bind, _1, exchange, queue, key, args));
}
void unbind(const boost::shared_ptr<Exchange>& exchange,
const boost::shared_ptr<Queue>& queue,
const std::string& key,
const framing::FieldTable& args) {
- each(boost::bind(
- &BrokerObserver::unbind, _1, exchange, queue, key, args));
+ each(boost::bind(&BrokerObserver::unbind, _1, exchange, queue, key, args));
}
void startTx(const boost::intrusive_ptr<TxBuffer>& tx) {
each(boost::bind(&BrokerObserver::startTx, _1, tx));
@@ -70,6 +63,9 @@ class BrokerObservers : public BrokerObs
void startDtx(const boost::intrusive_ptr<DtxBuffer>& dtx) {
each(boost::bind(&BrokerObserver::startDtx, _1, dtx));
}
+
+ private:
+ template <class F> void each(F f) { Observers<BrokerObserver>::each(f); }
};
}} // namespace qpid::broker
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ConnectionObservers.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ConnectionObservers.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ConnectionObservers.h Thu Jan 23 10:15:46 2014
@@ -30,11 +30,8 @@ namespace broker {
/**
* A collection of connection observers.
- * Calling a ConnectionObserver function will call that function on each observer.
- * THREAD SAFE.
*/
-class ConnectionObservers : public ConnectionObserver,
- public Observers<ConnectionObserver>
+class ConnectionObservers : public Observers<ConnectionObserver>
{
public:
void connection(Connection& c) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Message.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Message.h Thu Jan 23 10:15:46 2014
@@ -90,7 +90,7 @@ public:
int getDeliveryCount() const { return deliveryCount; }
void resetDeliveryCount() { deliveryCount = -1; }
- void setPublisher(const Connection& p);
+ QPID_BROKER_EXTERN void setPublisher(const Connection& p);
const Connection* getPublisher() const;
bool isLocalTo(const OwnershipToken*) const;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Observers.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Observers.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Observers.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Observers.h Thu Jan 23 10:15:46 2014
@@ -24,7 +24,7 @@
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
-#include <vector>
+#include <set>
#include <algorithm>
namespace qpid {
@@ -37,19 +37,21 @@ template <class Observer>
class Observers
{
public:
- void add(boost::shared_ptr<Observer> observer) {
+ typedef boost::shared_ptr<Observer> ObserverPtr;
+
+ void add(const ObserverPtr& observer) {
sys::Mutex::ScopedLock l(lock);
- observers.push_back(observer);
+ observers.insert(observer);
}
- void remove(boost::shared_ptr<Observer> observer) {
+ void remove(const ObserverPtr& observer) {
sys::Mutex::ScopedLock l(lock);
- typename List::iterator i = std::find(observers.begin(), observers.end(), observer);
- observers.erase(i);
+ observers.erase(observer) ;
}
+ /** Iterate over the observers. */
template <class F> void each(F f) {
- List copy;
+ Set copy; // Make a copy and iterate outside the lock.
{
sys::Mutex::ScopedLock l(lock);
copy = observers;
@@ -57,11 +59,33 @@ class Observers
std::for_each(copy.begin(), copy.end(), f);
}
+ template <class T> boost::shared_ptr<T> findType() const {
+ sys::Mutex::ScopedLock l(lock);
+ typename Set::const_iterator i =
+ std::find_if(observers.begin(), observers.end(), &isA<T>);
+ return i == observers.end() ?
+ boost::shared_ptr<T>() : boost::dynamic_pointer_cast<T>(*i);
+ }
+
protected:
- typedef std::vector<boost::shared_ptr<Observer> > List;
+ typedef std::set<ObserverPtr> Set;
+ Observers() : lock(myLock) {}
+
+ /** Specify a lock for the Observers to use */
+ Observers(sys::Mutex& l) : lock(l) {}
+
+ /** Iterate over the observers without taking the lock, caller must hold the lock */
+ template <class F> void each(F f, const sys::Mutex::ScopedLock&) {
+ std::for_each(observers.begin(), observers.end(), f);
+ }
+
+ template <class T> static bool isA(const ObserverPtr&o) {
+ return boost::dynamic_pointer_cast<T>(o);
+ }
- sys::Mutex lock;
- List observers;
+ mutable sys::Mutex myLock;
+ sys::Mutex& lock;
+ Set observers;
};
}} // namespace qpid::broker
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.cpp Thu Jan 23 10:15:46 2014
@@ -195,6 +195,7 @@ Queue::Queue(const string& _name, const
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
eventMode(0),
+ observers(name, messageLock),
broker(b),
deleted(false),
barrier(*this),
@@ -520,16 +521,17 @@ void Queue::markInUse(bool controlling)
void Queue::releaseFromUse(bool controlling)
{
+ bool trydelete;
if (controlling) {
- {
- Mutex::ScopedLock locker(messageLock);
- users.removeLifecycleController();
- }
- scheduleAutoDelete();
+ Mutex::ScopedLock locker(messageLock);
+ users.removeLifecycleController();
+ trydelete = true;
} else {
Mutex::ScopedLock locker(messageLock);
users.removeOther();
+ trydelete = isUnused(locker);
}
+ if (trydelete) scheduleAutoDelete();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
@@ -988,68 +990,38 @@ void Queue::observeDequeue(const Message
{
current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->dequeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.dequeued(msg, lock);
if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
-void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->acquired(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
- }
- }
+ observers.acquired(msg, l);
}
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
+void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->requeued(msg);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.requeued(msg, l);
}
/** updates queue observers when a new consumer has subscribed to this queue.
*/
-void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerAdded(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerAdded(c, l);
}
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
-void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
+void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
- for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
- try{
- (*i)->consumerRemoved(c);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
- }
- }
+ observers.consumerRemoved(c, l);
}
@@ -1133,12 +1105,9 @@ void Queue::destroyed()
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
- Mutex::ScopedLock lock(messageLock);
- for_each(observers.begin(), observers.end(),
- boost::bind(&QueueObserver::destroy, _1));
- observers.clear();
+ Mutex::ScopedLock l(messageLock);
+ observers.destroy(l);
}
-
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (brokerMgmtObject)
@@ -1513,15 +1482,9 @@ void Queue::recoveryComplete(ExchangeReg
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
-void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
+void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
{
- for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
- try {
- (*i)->enqueued(m);
- } catch (const std::exception& e) {
- QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
- }
- }
+ observers.enqueued(m, l);
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
@@ -1538,18 +1501,6 @@ bool Queue::isDeleted() const
return deleted;
}
-void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.insert(observer);
-}
-
-void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
-{
- Mutex::ScopedLock lock(messageLock);
- observers.erase(observer);
-}
-
void Queue::flush()
{
ScopedUse u(barrier);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/Queue.h Thu Jan 23 10:15:46 2014
@@ -31,7 +31,7 @@
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/QueueObservers.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/TxOp.h"
@@ -169,7 +169,6 @@ class Queue : public boost::enable_share
bool eligible;
};
- typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
typedef boost::function1<void, Message&> MessageFunctor;
@@ -211,7 +210,7 @@ class Queue : public boost::enable_share
qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
- Observers observers;
+ QueueObservers observers;
MessageInterceptors interceptors;
std::string seqNoKey;
Broker* broker;
@@ -446,12 +445,6 @@ class Queue : public boost::enable_share
bindings.eachBinding(f);
}
- /** Apply f to each Observer on the queue */
- template <class F> void eachObserver(F f) {
- sys::Mutex::ScopedLock l(messageLock);
- std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f);
- }
-
/**
* Set the sequence number for the back of the queue, the
* next message enqueued will be pos+1.
@@ -484,11 +477,12 @@ class Queue : public boost::enable_share
SubscriptionType type=CONSUMER
);
- QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>);
- QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>);
+
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
QPID_BROKER_EXTERN MessageInterceptors& getMessageInterceptors() { return interceptors; }
+ QPID_BROKER_EXTERN QueueObservers& getObservers() { return observers; }
+
/**
* Notify queue that recovery has completed.
*/
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFactory.cpp Thu Jan 23 10:15:46 2014
@@ -90,7 +90,7 @@ boost::shared_ptr<Queue> QueueFactory::c
if (settings.groupKey.size()) {
boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings));
queue->allocator = mgm;
- queue->addObserver(mgm);
+ queue->getObservers().add(mgm);
} else {
queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) ));
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Jan 23 10:15:46 2014
@@ -244,7 +244,7 @@ void QueueFlowLimit::observe(Queue& queu
}
/* set up the observer */
- queue.addObserver(shared_from_this());
+ queue.getObservers().add(shared_from_this());
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Thu Jan 23 10:15:46 2014
@@ -93,7 +93,7 @@ void ThresholdAlerts::observe(Queue& que
boost::shared_ptr<QueueObserver> observer(
new ThresholdAlerts(queue.getName(), agent, ctu, ctd, stu, std, (_ctd == 0 && _std == 0))
);
- queue.addObserver(observer);
+ queue.getObservers().add(observer);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Thu Jan 23 10:15:46 2014
@@ -20,12 +20,14 @@
*/
#include "Incoming.h"
#include "Exception.h"
+#include "ManagedConnection.h"
#include "Message.h"
#include "Session.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace broker {
@@ -110,18 +112,38 @@ DecodingIncoming::~DecodingIncoming() {}
void DecodingIncoming::readable(pn_delivery_t* delivery)
{
- boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
- /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
- received->scan();
- pn_link_advance(link);
-
- qpid::broker::Message message(received, received);
- userid.verify(message.getUserId());
- message.computeExpiration(expiryPolicy);
- handle(message);
- --window;
- received->begin();
- Transfer t(delivery, session);
- received->end(t);
+ size_t pending = pn_delivery_pending(delivery);
+ size_t offset = partial ? partial->getSize() : 0;
+ boost::intrusive_ptr<Message> received(new Message(offset + pending));
+ if (partial) {
+ ::memcpy(received->getData(), partial->getData(), offset);
+ partial = boost::intrusive_ptr<Message>();
+ }
+ assert(received->getSize() == pending + offset);
+ pn_link_recv(link, received->getData() + offset, pending);
+
+ if (pn_delivery_partial(delivery)) {
+ QPID_LOG(debug, "Message incomplete: received " << pending << " bytes, now have " << received->getSize());
+ partial = received;
+ } else {
+ if (offset) {
+ QPID_LOG(debug, "Message complete: received " << pending << " bytes, " << received->getSize() << " in total");
+ } else {
+ QPID_LOG(debug, "Message received: " << received->getSize() << " bytes");
+ }
+
+ received->scan();
+ pn_link_advance(link);
+
+ qpid::broker::Message message(received, received);
+ message.setPublisher(session->getParent());
+ userid.verify(message.getUserId());
+ message.computeExpiration(expiryPolicy);
+ handle(message);
+ --window;
+ received->begin();
+ Transfer t(delivery, session);
+ received->end(t);
+ }
}
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Incoming.h Thu Jan 23 10:15:46 2014
@@ -78,6 +78,7 @@ class DecodingIncoming : public Incoming
private:
boost::shared_ptr<Session> session;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ boost::intrusive_ptr<Message> partial;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Message.cpp Thu Jan 23 10:15:46 2014
@@ -449,12 +449,6 @@ boost::intrusive_ptr<PersistableMessage>
}
copy->data.resize(position);//annotationsSize may be slightly bigger than needed if optimisations are used (e.g. smallint)
copy->scan();
- {
- qpid::amqp::MapBuilder builder;
- qpid::amqp::Decoder decoder(copy->messageAnnotations.data, copy->messageAnnotations.size);
- decoder.read(builder);
- QPID_LOG(notice, "Merged annotations are now: " << builder.getMap() << " raw=" << std::hex << std::string(copy->messageAnnotations.data, copy->messageAnnotations.size) << " " << copy->messageAnnotations.size << " bytes");
- }
assert(copy->messageAnnotations);
assert(copy->bareMessage.size == bareMessage.size);
assert(copy->footer.size == footer.size);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.cpp Thu Jan 23 10:15:46 2014
@@ -54,6 +54,7 @@ const std::string EXCLUSIVE("exclusive")
const std::string AUTO_DELETE("auto-delete");
const std::string ALTERNATE_EXCHANGE("alternate-exchange");
const std::string EXCHANGE_TYPE("exchange-type");
+const std::string EMPTY;
pn_bytes_t convert(const std::string& s)
{
@@ -104,7 +105,8 @@ bool getLifetimeDescriptorSymbol(QueueSe
}
-NodeProperties::NodeProperties() : received(false), queue(true), durable(false), autoDelete(false), exclusive(false), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
+NodeProperties::NodeProperties(bool isDynamic) : received(false), queue(true), durable(false), autoDelete(false), exclusive(false),
+ dynamic(isDynamic), exchangeType("topic"), lifetime(QueueSettings::DELETE_IF_UNUSED) {}
void NodeProperties::read(pn_data_t* data)
{
@@ -113,7 +115,7 @@ void NodeProperties::read(pn_data_t* dat
}
-bool NodeProperties::wasSpecified(const std::string& key)
+bool NodeProperties::wasSpecified(const std::string& key) const
{
return specified.find(key) != specified.end();
}
@@ -334,7 +336,9 @@ void NodeProperties::onSymbolValue(const
QueueSettings NodeProperties::getQueueSettings()
{
- QueueSettings settings(durable, autoDelete);
+ //assume autodelete for dynamic nodes unless explicitly requested
+ //otherwise or unless durability is requested
+ QueueSettings settings(durable, autoDelete || (dynamic && !wasSpecified(AUTO_DELETE) && !durable));
qpid::types::Variant::Map unused;
settings.populate(properties, unused);
settings.lifetime = lifetime;
@@ -362,6 +366,10 @@ std::string NodeProperties::getExchangeT
{
return exchangeType;
}
+std::string NodeProperties::getSpecifiedExchangeType() const
+{
+ return wasSpecified(EXCHANGE_TYPE) ? exchangeType : EMPTY;
+}
std::string NodeProperties::getAlternateExchange() const
{
return alternateExchange;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/NodeProperties.h Thu Jan 23 10:15:46 2014
@@ -38,7 +38,7 @@ namespace amqp {
class NodeProperties : public qpid::amqp::MapReader
{
public:
- NodeProperties();
+ NodeProperties(bool isDynamic);
void read(pn_data_t*);
void write(pn_data_t*,boost::shared_ptr<Queue>);
void write(pn_data_t*,boost::shared_ptr<Exchange>);
@@ -65,6 +65,7 @@ class NodeProperties : public qpid::amqp
bool isExclusive() const;
bool isAutodelete() const;
std::string getExchangeType() const;
+ std::string getSpecifiedExchangeType() const;
std::string getAlternateExchange() const;
bool trackControllingLink() const;
const qpid::types::Variant::Map& getProperties() const;
@@ -74,6 +75,7 @@ class NodeProperties : public qpid::amqp
bool durable;
bool autoDelete;
bool exclusive;
+ bool dynamic;
std::string exchangeType;
std::string alternateExchange;
qpid::types::Variant::Map properties;
@@ -81,7 +83,7 @@ class NodeProperties : public qpid::amqp
std::set<std::string> specified;
void process(const std::string&, const qpid::types::Variant&, const qpid::amqp::Descriptor*);
- bool wasSpecified(const std::string& key);
+ bool wasSpecified(const std::string& key) const;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Thu Jan 23 10:15:46 2014
@@ -66,7 +66,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
current(0), outstanding(0),
buffer(1024)/*used only for header at present*/,
//for exclusive queues, assume unreliable unless reliable is explicitly requested; otherwise assume reliable unless unreliable requested
- unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link))
+ unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link)),
+ cancelled(false)
{
for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
deliveries[i].init(i);
@@ -181,6 +182,13 @@ void OutgoingFromQueue::detached()
}
if (exclusive) queue->releaseExclusiveOwnership();
else if (isControllingUser) queue->releaseFromUse(true);
+ cancelled = true;
+}
+
+
+OutgoingFromQueue::~OutgoingFromQueue()
+{
+ if (!cancelled && isControllingUser) queue->releaseFromUse(true);
}
//Consumer interface:
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Thu Jan 23 10:15:46 2014
@@ -90,6 +90,7 @@ class OutgoingFromQueue : public Outgoin
public:
OutgoingFromQueue(Broker&, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session&,
qpid::sys::OutputControl& o, SubscriptionType type, bool exclusive, bool isControllingUser);
+ ~OutgoingFromQueue();
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
@@ -144,6 +145,7 @@ class OutgoingFromQueue : public Outgoin
std::string subjectFilter;
boost::scoped_ptr<Selector> selector;
bool unreliable;
+ bool cancelled;
};
}}} // namespace qpid::broker::amqp
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Jan 23 10:15:46 2014
@@ -216,7 +216,7 @@ Session::ResolvedNode Session::resolve(c
}
//check whether user is even allowed access to queues/topics before resolving
authorise.access(name, isQueueRequested, isTopicRequested);
- ResolvedNode node;
+ ResolvedNode node(pn_terminus_is_dynamic(terminus));
if (isTopicRequested || !isQueueRequested) {
node.topic = connection.getTopics().get(name);
if (node.topic) node.exchange = node.topic->getExchange();
@@ -234,7 +234,7 @@ Session::ResolvedNode Session::resolve(c
node.properties.read(pn_terminus_properties(terminus));
if (node.exchange && createOnDemand && isTopicRequested) {
- if (!node.properties.getExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
+ if (!node.properties.getSpecifiedExchangeType().empty() && node.properties.getExchangeType() != node.exchange->getType()) {
//emulate 0-10 exchange-declare behaviour
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "Exchange of different type already exists");
}
@@ -251,14 +251,20 @@ Session::ResolvedNode Session::resolve(c
}
qpid::framing::FieldTable args;
qpid::amqp_0_10::translate(node.properties.getProperties(), args);
- node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
- node.properties.getAlternateExchange(),
- args, connection.getUserId(), connection.getId()).first;
+ std::pair<boost::shared_ptr<Exchange>, bool> result
+ = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.isAutodelete(),
+ node.properties.getAlternateExchange(),
+ args, connection.getUserId(), connection.getId());
+ node.exchange = result.first;
+ node.created = result.second;
} else {
if (node.exchange) {
QPID_LOG_CAT(warning, model, "Node name will be ambiguous, creation of queue named " << name << " requested when exchange of the same name already exists");
}
- node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
+ std::pair<boost::shared_ptr<Queue>, bool> result
+ = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId());
+ node.queue = result.first;
+ node.created = result.second;
}
} else {
boost::shared_ptr<NodePolicy> nodePolicy = connection.getNodePolicies().match(name);
@@ -415,7 +421,7 @@ void Session::setupIncoming(pn_link_t* l
source = sourceAddress;
}
if (node.queue) {
- boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.properties.trackControllingLink()));
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source, node.created && node.properties.trackControllingLink()));
incoming[link] = q;
} else if (node.exchange) {
boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source));
@@ -460,7 +466,7 @@ void Session::setupOutgoing(pn_link_t* l
if (type == CONSUMER && node.queue->hasExclusiveOwner() && !node.queue->isExclusiveOwner(this)) {
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, std::string("Cannot consume from exclusive queue ") + node.queue->getName());
}
- boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.properties.trackControllingLink()));
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, type, false, node.created && node.properties.trackControllingLink()));
q->init();
filter.apply(q);
outgoing[link] = q;
@@ -474,7 +480,8 @@ void Session::setupOutgoing(pn_link_t* l
if (node.topic) {
settings = node.topic->getPolicy();
settings.durable = durable;
- settings.autodelete = autodelete;
+ //only determine autodelete from link details if the policy did not imply autodeletion
+ if (!settings.autodelete) settings.autodelete = autodelete;
altExchange = node.topic->getAlternateExchange();
}
settings.autoDeleteDelay = pn_terminus_get_timeout(source);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Session.h Thu Jan 23 10:15:46 2014
@@ -100,6 +100,8 @@ class Session : public ManagedSession, p
boost::shared_ptr<qpid::broker::amqp::Topic> topic;
boost::shared_ptr<Relay> relay;
NodeProperties properties;
+ bool created;
+ ResolvedNode(bool isDynamic) : properties(isDynamic), created(false) {}
};
ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/broker/amqp/Topic.cpp Thu Jan 23 10:15:46 2014
@@ -48,12 +48,12 @@ bool testProperty(const std::string& k,
else return i->second;
}
-qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties)
+qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties, bool queue)
{
qpid::types::Variant::Map filtered = properties;
filtered.erase(DURABLE);
filtered.erase(EXCHANGE);
- filtered.erase(ALTERNATE_EXCHANGE);
+ if (queue) filtered.erase(ALTERNATE_EXCHANGE);
return filtered;
}
}
@@ -65,13 +65,13 @@ Topic::Topic(Broker& broker, const std::
if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified.");
qpid::types::Variant::Map unused;
- qpid::types::Variant::Map filtered = filter(properties);
+ qpid::types::Variant::Map filtered = filter(properties, true);
policy.populate(filtered, unused);
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent != 0) {
topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable));
- topic->set_properties(filtered);
+ topic->set_properties(filter(properties, false));
agent->addObject(topic);
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Jan 23 10:15:46 2014
@@ -438,7 +438,7 @@ void BrokerReplicator::route(Deliverable
if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(debug, "Broker replicator event: " << map);
+ QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
std::string key = (schema[PACKAGE_NAME].asString() +
@@ -450,7 +450,7 @@ void BrokerReplicator::route(Deliverable
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(debug, "Broker replicator response: " << map);
+ QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 23 10:15:46 2014
@@ -22,17 +22,18 @@
#include "BackupConnectionExcluder.h"
#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "IdSetter.h"
#include "Primary.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "StandAlone.h"
#include "QueueSnapshot.h"
-#include "QueueSnapshots.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/assert.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SignalHandler.h"
@@ -60,6 +61,20 @@ using sys::Mutex;
using boost::shared_ptr;
using boost::dynamic_pointer_cast;
+// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue
+// because we don't know in advance which queues might be used for stand-alone
+// replication.
+//
+// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated?
+//
+class HaBroker::BrokerObserver : public broker::BrokerObserver {
+ public:
+ void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
+ q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+ }
+};
+
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: systemId(b.getSystem()->getSystemId().data()),
@@ -69,8 +84,7 @@ HaBroker::HaBroker(broker::Broker& b, co
observer(new ConnectionObserver(*this, systemId)),
role(new StandAlone),
membership(BrokerInfo(systemId, STANDALONE), *this),
- failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
- queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
+ failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
@@ -82,8 +96,7 @@ HaBroker::HaBroker(broker::Broker& b, co
broker.getConnectionObservers().add(observer);
broker.getExchanges().registerExchange(failoverExchange);
}
- // QueueSnapshots are needed for standalone replication as well as cluster.
- broker.getBrokerObservers().add(queueSnapshots);
+ broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
}
namespace {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/HaBroker.h Thu Jan 23 10:15:46 2014
@@ -54,8 +54,6 @@ class Backup;
class ConnectionObserver;
class Primary;
class Role;
-class QueueSnapshot;
-class QueueSnapshots;
class QueueReplicator;
/**
@@ -98,14 +96,14 @@ class HaBroker : public management::Mana
void setAddress(const Address&); // set self address from a self-connection
- boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
-
boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
/** Authenticated user ID for queue create/delete */
std::string getUserId() const { return userId; }
private:
+ class BrokerObserver;
+
void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -129,7 +127,6 @@ class HaBroker : public management::Mana
boost::shared_ptr<Role> role;
Membership membership;
boost::shared_ptr<FailoverExchange> failoverExchange;
- boost::shared_ptr<QueueSnapshots> queueSnapshots;
};
}} // namespace qpid::ha
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/IdSetter.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/IdSetter.h Thu Jan 23 10:15:46 2014
@@ -43,15 +43,11 @@ namespace ha {
class IdSetter : public broker::MessageInterceptor
{
public:
- IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
- QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get());
- }
-
+ IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
void record(broker::Message& m) { m.setReplicationId(nextId++); }
private:
sys::AtomicValue<uint32_t> nextId;
- std::string name;
};
}} // namespace qpid::ha
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org