You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/28 17:52:18 UTC
svn commit: r1190406 [3/5] - in /qpid/branches/qpid-3346/qpid: ./ bin/
cpp/design_docs/ cpp/docs/api/ cpp/docs/man/
cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/src/
cpp/src/posix/ cpp/src/qmf/engine/ cpp/src/qpid/ cpp/src/qpid/acl/ cpp/...
Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk Fri Oct 28 15:52:13 2011
@@ -30,7 +30,7 @@ check_PROGRAMS+=sasl_version
sasl_version_SOURCES=sasl_version.cpp
sasl_version_LDADD=$(lib_client)
-TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster
+TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster sasl_no_dir
LONG_TESTS += run_cluster_authentication_soak
EXTRA_DIST += run_cluster_authentication_test \
sasl_fed \
@@ -43,7 +43,8 @@ EXTRA_DIST += run_cluster_authentication
sasl_fed_ex_dynamic_cluster \
sasl_fed_ex_link_cluster \
sasl_fed_ex_queue_cluster \
- sasl_fed_ex_route_cluster
+ sasl_fed_ex_route_cluster \
+ sasl_no_dir
endif # HAVE_SASL
Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test Fri Oct 28 15:52:13 2011
@@ -47,9 +47,13 @@ delete_certs() {
fi
}
-COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME"
start_broker() { # $1 = extra opts
- ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS $1;
+ ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --auth no $1;
+}
+
+start_authenticating_broker() {
+ ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes;
}
stop_brokers() {
@@ -64,6 +68,13 @@ cleanup() {
delete_certs
}
+pick_port() {
+ # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+ PICK=`../qpidd --no-module-dir -dp0`
+ ../qpidd --no-module-dir -qp $PICK
+ echo $PICK
+}
+
CERTUTIL=$(type -p certutil)
if [[ !(-x $CERTUTIL) ]] ; then
echo "No certutil, skipping ssl test";
@@ -93,7 +104,7 @@ test "$MSG" = "hello" || { echo "receive
#### Client Authentication tests
-PORT2=`start_broker --ssl-require-client-authentication` || error "Could not start broker"
+PORT2=`start_authenticating_broker` || error "Could not start broker"
echo "Running SSL client authentication test on port $PORT2"
URL=amqp:ssl:$TEST_HOSTNAME:$PORT2
@@ -109,19 +120,22 @@ test "$MSG3" = "" || { echo "receive suc
stop_brokers
+#Test multiplexed connection where SSL and plain TCP are served by the same port
+PORT=`pick_port`; ../qpidd --port $PORT --ssl-port $PORT $COMMON_OPTS --transport ssl --auth no
+echo "Running multiplexed SSL/TCP test on $PORT"
+
+./qpid-perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary || { echo "SSL on multiplexed connection failed!"; exit 1; }
+./qpid-perftest --count ${COUNT} --port ${PORT} -P tcp -b $TEST_HOSTNAME --summary || { echo "Plain TCP on multiplexed connection failed!"; exit 1; }
+
+stop_brokers
+
test -z $CLUSTER_LIB && exit 0 # Exit if cluster not supported.
## Test failover in a cluster using SSL only
. $srcdir/ais_check # Will exit if clustering not enabled.
-pick_port() {
- # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
- PICK=`../qpidd --no-module-dir -dp0`
- ../qpidd --no-module-dir -qp $PICK
- echo $PICK
-}
ssl_cluster_broker() { # $1 = port
- ../qpidd $COMMON_OPTS --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+ ../qpidd $COMMON_OPTS --require-encryption --auth no --load-module $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
# Wait for broker to be ready
qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
echo "Running SSL cluster broker on port $1"
Modified: qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp Fri Oct 28 15:52:13 2011
@@ -19,17 +19,9 @@
*
*/
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#else
-// These need to be made something sensible, like reading a value from
-// the registry. But for now, get things going with a local definition.
-namespace {
-const char *QPIDD_CONF_FILE = "qpid_broker.conf";
-const char *QPIDD_MODULE_DIR = ".";
-}
-#endif
+#include "config.h"
#include "qpidd.h"
+#include "SCM.h"
#include "qpid/Exception.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
@@ -205,8 +197,56 @@ struct BrokerInfo {
DWORD pid;
};
+// Service-related items. Only involved when running the broker as a Windows
+// service.
+
+const std::string svcName = "qpidd";
+SERVICE_STATUS svcStatus;
+SERVICE_STATUS_HANDLE svcStatusHandle = 0;
+
+// This function is only called when the broker is run as a Windows
+// service. It receives control requests from Windows.
+VOID WINAPI SvcCtrlHandler(DWORD control)
+{
+ switch(control) {
+ case SERVICE_CONTROL_STOP:
+ svcStatus.dwCurrentState = SERVICE_STOP_PENDING;
+ svcStatus.dwControlsAccepted = 0;
+ svcStatus.dwCheckPoint = 1;
+ svcStatus.dwWaitHint = 5000; // 5 secs.
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ CtrlHandler(CTRL_C_EVENT);
+ break;
+
+ case SERVICE_CONTROL_INTERROGATE:
+ break;
+
+ default:
+ break;
+ }
+}
+
+VOID WINAPI ServiceMain(DWORD argc, LPTSTR *argv)
+{
+ ::memset(&svcStatus, 0, sizeof(svcStatus));
+ svcStatusHandle = ::RegisterServiceCtrlHandler(svcName.c_str(),
+ SvcCtrlHandler);
+ svcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+ svcStatus.dwCheckPoint = 1;
+ svcStatus.dwWaitHint = 10000; // 10 secs.
+ svcStatus.dwCurrentState = SERVICE_START_PENDING;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ // QpiddBroker class resets state to running.
+ svcStatus.dwWin32ExitCode = run_broker(argc, argv, true);
+ svcStatus.dwCurrentState = SERVICE_STOPPED;
+ svcStatus.dwCheckPoint = 0;
+ svcStatus.dwWaitHint = 0;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
}
+} // namespace
+
+
struct ProcessControlOptions : public qpid::Options {
bool quit;
bool check;
@@ -225,9 +265,49 @@ struct ProcessControlOptions : public qp
}
};
+struct ServiceOptions : public qpid::Options {
+ bool install;
+ bool start;
+ bool stop;
+ bool uninstall;
+ bool daemon;
+ std::string startType;
+ std::string startArgs;
+ std::string account;
+ std::string password;
+ std::string depends;
+
+ ServiceOptions()
+ : qpid::Options("Service options"),
+ install(false),
+ start(false),
+ stop(false),
+ uninstall(false),
+ daemon(false),
+ startType("demand"),
+ startArgs(""),
+ account("NT AUTHORITY\\LocalService"),
+ password(""),
+ depends("")
+ {
+ addOptions()
+ ("install", qpid::optValue(install), "Install as service")
+ ("start-type", qpid::optValue(startType, "auto|demand|disabled"), "Service start type\nApplied at install time only.")
+ ("arguments", qpid::optValue(startArgs, "COMMAND LINE ARGS"), "Arguments to pass when service auto-starts")
+ ("account", qpid::optValue(account, "(LocalService)"), "Account to run as, default is LocalService\nApplied at install time only.")
+ ("password", qpid::optValue(password, "PASSWORD"), "Account password, if needed\nApplied at install time only.")
+ ("depends", qpid::optValue(depends, "(comma delimited list)"), "Names of services that must start before this service\nApplied at install time only.")
+ ("start", qpid::optValue(start), "Start the service.")
+ ("stop", qpid::optValue(stop), "Stop the service.")
+ ("uninstall", qpid::optValue(uninstall), "Uninstall the service.");
+ }
+};
+
struct QpiddWindowsOptions : public QpiddOptionsPrivate {
ProcessControlOptions control;
+ ServiceOptions service;
QpiddWindowsOptions(QpiddOptions *parent) : QpiddOptionsPrivate(parent) {
+ parent->add(service);
parent->add(control);
}
};
@@ -253,12 +333,63 @@ void QpiddOptions::usage() const {
}
int QpiddBroker::execute (QpiddOptions *options) {
+
+ // If running as a service, bump the status checkpoint to let SCM know
+ // we're still making progress.
+ if (svcStatusHandle != 0) {
+ svcStatus.dwCheckPoint++;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ }
+
// Options that affect a running daemon.
QpiddWindowsOptions *myOptions =
- reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
+ reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
if (myOptions == 0)
throw qpid::Exception("Internal error obtaining platform options");
+ if (myOptions->service.install) {
+ // Handle start type
+ DWORD startType;
+ if (myOptions->service.startType.compare("demand") == 0)
+ startType = SERVICE_DEMAND_START;
+ else if (myOptions->service.startType.compare("auto") == 0)
+ startType = SERVICE_AUTO_START;
+ else if (myOptions->service.startType.compare("disabled") == 0)
+ startType = SERVICE_DISABLED;
+ else if (!myOptions->service.startType.empty())
+ throw qpid::Exception("Invalid service start type: " +
+ myOptions->service.startType);
+
+ // Install service and exit
+ qpid::windows::SCM manager;
+ manager.install(svcName,
+ "Apache Qpid Message Broker",
+ myOptions->service.startArgs,
+ startType,
+ myOptions->service.account,
+ myOptions->service.password,
+ myOptions->service.depends);
+ return 0;
+ }
+
+ if (myOptions->service.start) {
+ qpid::windows::SCM manager;
+ manager.start(svcName);
+ return 0;
+ }
+
+ if (myOptions->service.stop) {
+ qpid::windows::SCM manager;
+ manager.stop(svcName);
+ return 0;
+ }
+
+ if (myOptions->service.uninstall) {
+ qpid::windows::SCM manager;
+ manager.uninstall(svcName);
+ return 0;
+ }
+
if (myOptions->control.check || myOptions->control.quit) {
// Relies on port number being set via --port or QPID_PORT env variable.
NamedSharedMemory<BrokerInfo> info(brokerInfoName(options->broker.port));
@@ -301,10 +432,41 @@ int QpiddBroker::execute (QpiddOptions *
::SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE);
brokerPtr->accept();
std::cout << options->broker.port << std::endl;
+
+ // If running as a service, tell SCM we're up. There's still a chance
+ // that store recovery will drag out the time before the broker actually
+ // responds to requests, but integrating that mechanism with the SCM
+ // updating is probably more work than it's worth.
+ if (svcStatusHandle != 0) {
+ svcStatus.dwCheckPoint = 0;
+ svcStatus.dwWaitHint = 0;
+ svcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+ svcStatus.dwCurrentState = SERVICE_RUNNING;
+ ::SetServiceStatus(svcStatusHandle, &svcStatus);
+ }
+
brokerPtr->run();
waitShut.signal(); // In case we shut down some other way
waitThr.join();
+ return 0;
+}
- // CloseHandle(h);
+
+int main(int argc, char* argv[])
+{
+ // If started as a service, notify the SCM we're up. Else just run.
+ // If as a service, StartServiceControlDispatcher doesn't return until
+ // the service is stopped.
+ SERVICE_TABLE_ENTRY dispatchTable[] =
+ {
+ { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain },
+ { NULL, NULL }
+ };
+ if (!StartServiceCtrlDispatcher(dispatchTable)) {
+ DWORD err = ::GetLastError();
+ if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console
+ return run_broker(argc, argv);
+ throw QPID_WINDOWS_ERROR(err);
+ }
return 0;
}
Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml Fri Oct 28 15:52:13 2011
@@ -51,8 +51,8 @@
<ssl>
<enabled>true</enabled>
<sslOnly>true</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
</programlisting>
Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml Fri Oct 28 15:52:13 2011
@@ -3443,9 +3443,8 @@ log4j.appender.console.layout.Conversion
<entry>qpid.amqp.version</entry>
<entry>string</entry>
<entry>0-10</entry>
- <entry>Sets the AMQP version to be used - currently supports one of {0-8,0-9,0-91,0-10}</entry>
- </row>
-
+ <entry><para>Sets the AMQP version to be used - currently supports one of {0-8,0-9,0-91,0-10}.</para><para>The client will begin negotiation at the specified version and only negotiate downwards if the Broker does not support the specified version.</para></entry>
+ </row>
<row>
<entry>qpid.heartbeat</entry>
<entry>int</entry>
@@ -3610,15 +3609,20 @@ log4j.appender.console.layout.Conversion
<entry>qpid.transport</entry>
<entry>string</entry>
<entry>org.apache.qpid.transport.network.io.IoNetworkTransport</entry>
- <entry><para>The transport implementation to be used.</para><para>A user could specify an alternative transport mechanism that implements the <varname>org.apache.qpid.transport.network.NetworkTransport</varname> interface.</para></entry>
- </row>
-
+ <entry><para>The transport implementation to be used.</para><para>A user could specify an alternative transport mechanism that implements the <varname>org.apache.qpid.transport.network.NetworkTransport</varname> interface.</para></entry>
+ </row>
+ <row>
+ <entry>qpid.sync_op_timeout</entry>
+ <entry>long</entry>
+ <entry>60000</entry>
+ <entry><para>The length of time (in milliseconds) to wait for a synchronous operation to complete.</para><para>For compatibility with older clients, the synonym <varname>amqj.default_syncwrite_timeout</varname> is supported.</para></entry>
+ </row>
<row>
<entry>amqj.tcp_nodelay</entry>
<entry>boolean</entry>
<entry>false</entry>
- <entry><para>Sets the TCP_NODELAY property of the underlying socket.</para><para>This could also be set per connection as well (see connection paramters).</para></entry>
- </row>
+ <entry><para>Sets the TCP_NODELAY property of the underlying socket.</para><para>This could also be set per connection as well (see connection paramters).</para></entry>
+ </row>
</tbody>
</tgroup>
</table>
Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml Fri Oct 28 15:52:13 2011
@@ -736,35 +736,6 @@ amqj.logging.level
</para>
<!--h3--></section>
- <section role="h3" id="QpidJavaFAQ-HowdoIuseanInVMBrokerformyowntests-3F"><title>
- How do I
- use an InVM Broker for my own tests?
- </title>
-
- <para>
- I would take a look at the testPassiveTTL in
- <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java">TimeToLiveTest.java</ulink>
- </para><para>
- The setUp and tearDown methods show how to correctly start up a
- broker for InVM testing. If you write your tests using a file for
- the JNDI you can then very easily swap between running your tests
- InVM and against a real broker.
- </para><para>
- See our <xref linkend="How-to-Use-JNDI"/> on how to confgure it
- </para><para>
- Basically though you just need to set two System Properites:
- </para><para>
- java.naming.factory.initial =
- org.apache.qpid.jndi.PropertiesFileInitialContextFactory
- java.naming.provider.url = <your JNDI file>
- </para><para>
- and call getInitialContext() in your code.
- </para><para>
- You will of course need to have the broker libraries on your
- class path for this to run.
- </para>
-<!--h3--></section>
-
<section role="h3" id="QpidJavaFAQ-HowcanIinspectthecontentsofmyMessageStore-3F"><title>
How
can I inspect the contents of my MessageStore?
@@ -907,31 +878,6 @@ java.lang.NullPointerException
</para>
<!--h3--></section>
- <section role="h3" id="QpidJavaFAQ-Clientkeepsthrowing-27Serverdidnotrespondinatimelyfashion-27-5Cerrorcode408-3ARequestTimeout-5C."><title>
- Client keeps throwing 'Server did not respond in a timely
- fashion' [error code 408: Request Timeout].
- </title>
-
- <para>
- Certain operations wait for a response from the Server. One such
- operations is commit. If the server does not respond to the
- commit request within a set time a Request Timeout [error code:
- 408] exception is thrown (Server did not respond in a timely
- fashion). This is to ensure that a server that has hung does not
- cause the client process to be come unresponsive.
- </para><para>
- However, it is possible that the server just needs a long time to
- process a give request. For example, sending a large persistent
- message when using a persistent store will take some time to a)
- Transfer accross the network and b) to be fully written to disk.
- </para><para>
- These situations require that the default timeout value be
- increased. A cilent <xref linkend="qpid_System-Properties"/> 'amqj.default_syncwrite_timeout' can be set
- on the client to increase the wait time. The default in 0.5 is
- 30000 (30s).
- </para>
-<!--h3--></section>
-
<section role="h3" id="QpidJavaFAQ-CanauseTCPKEEPALIVEorAMQPheartbeatingtokeepmyconnectionopen-3F"><title>
Can a use TCP_KEEPALIVE or AMQP heartbeating to keep my
connection open?
Modified: qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml Fri Oct 28 15:52:13 2011
@@ -135,7 +135,7 @@
<varlistentry>
- <term>amqj.default_syncwrite_timeout</term>
+ <term>qpid.sync_op_timeout</term>
<listitem>
<variablelist>
<varlistentry>
@@ -144,12 +144,11 @@
</varlistentry>
<varlistentry>
<term>Default</term>
- <listitem><para>30000</para></listitem>
+ <listitem><para>60000</para></listitem>
</varlistentry>
</variablelist>
- <para> The number length of time in millisecond to wait
- for a synchronous write to complete.
- </para>
+ <para>The length of time (in milliseconds) to wait for a synchronous operation to complete.
+ For compatibility with older clients, the synonym amqj.default_syncwrite_timeout is supported.</para>
</listitem>
</varlistentry>
@@ -193,7 +192,7 @@
<varlistentry>
- <term>amqj.tcpNoDelay</term>
+ <term>amqj.tcp_nodelay</term>
<listitem>
<variablelist>
<varlistentry>
@@ -211,65 +210,6 @@
</varlistentry>
<varlistentry>
- <term>amqj.sendBufferSize</term>
- <listitem>
- <variablelist>
- <varlistentry>
- <term>integer</term>
- <listitem><para>Boolean</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>Default</term>
- <listitem><para>32768</para></listitem>
- </varlistentry>
- </variablelist>
- <para>This is the default buffer sized created by Mina.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
- <term>amqj.receiveBufferSize</term>
- <listitem>
- <variablelist>
- <varlistentry>
- <term>Type</term>
- <listitem><para>integer</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>Default</term>
- <listitem><para>32768</para></listitem>
- </varlistentry>
- </variablelist>
- <para>This is the default buffer sized created by Mina.
- </para>
- </listitem>
- </varlistentry>
-
-
- <varlistentry>
- <term>amqj.protocolprovider.class</term>
- <listitem>
- <variablelist>
- <varlistentry>
- <term>Type</term>
- <listitem><para>String</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>Default</term>
- <listitem><para>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</para></listitem>
- </varlistentry>
- </variablelist>
- <para> This specifies the default IoHandlerAdapter that
- represents the InVM broker. The IoHandlerAdapter must have
- a constructor that takes a single Integer that represents
- the InVM port number.
- </para>
- </listitem>
- </varlistentry>
-
-
- <varlistentry>
<term>amqj.protocol.logging.level</term>
<listitem>
<variablelist>
Modified: qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py Fri Oct 28 15:52:13 2011
@@ -3295,6 +3295,7 @@ class Agent:
self.lock.acquire()
try:
self.contextMap.pop(sequence)
+ self.seqMgr._release(sequence)
except KeyError:
pass # @todo - shouldn't happen, log a warning.
finally:
Propchange: qpid/branches/qpid-3346/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
/qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:1144319-1179750
+/qpid/trunk/qpid/java:1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
/qpid/branches/qpid-2935/qpid/java/broker:1061302-1072333
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java Fri Oct 28 15:52:13 2011
@@ -74,9 +74,9 @@ public class AppInfo
appInfoMap.put("port", sc.getPorts().toString());
appInfoMap.put("version", QpidProperties.getReleaseVersion());
appInfoMap.put("vhosts", "standalone");
- appInfoMap.put("KeystorePath", sc.getKeystorePath());
+ appInfoMap.put("KeystorePath", sc.getConnectorKeyStorePath());
appInfoMap.put("PluginDirectory", sc.getPluginDirectory());
- appInfoMap.put("CertType", sc.getCertType());
+ appInfoMap.put("CertType", sc.getConnectorCertType());
appInfoMap.put("QpidWork", sc.getQpidWork());
appInfoMap.put("Bind", sc.getBind());
}
Modified: qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java Fri Oct 28 15:52:13 2011
@@ -210,12 +210,13 @@ public class InfoPluginTest extends Qpid
}
br.close();
System.out.println("*** Received buffer: " + buf);
- System.out.println("*** Latch countdown");
- _latch.countDown();
synchronized (_recv)
{
_recv.add(buf);
}
+
+ System.out.println("*** Latch countdown");
+ _latch.countDown();
}
catch (Exception ex)
{
Propchange: qpid/branches/qpid-3346/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
/qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat Fri Oct 28 15:52:13 2011
@@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ
REM Set the default system properties that we'll use now that they have
REM all been initialised
-set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%"
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%"
if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
@@ -77,7 +77,7 @@ goto afterQpidClasspath
:noQpidClasspath
echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar
-set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\*
:afterQpidClasspath
REM start parsing -run arguments
Modified: qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml Fri Oct 28 15:52:13 2011
@@ -35,8 +35,8 @@
<enabled>false</enabled>
<port>5671</port>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
<port>5672</port>
<socketReceiveBuffer>262144</socketReceiveBuffer>
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Oct 28 15:52:13 2011
@@ -713,6 +713,19 @@ public class QMFService implements Confi
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+ final java.lang.Boolean receive)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
final String type,
final String name,
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java Fri Oct 28 15:52:13 2011
@@ -210,9 +210,9 @@ public class Broker
if (serverConfig.getEnableSSL())
{
- final String keystorePath = serverConfig.getKeystorePath();
- final String keystorePassword = serverConfig.getKeystorePassword();
- final String certType = serverConfig.getCertType();
+ final String keystorePath = serverConfig.getConnectorKeyStorePath();
+ final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+ final String certType = serverConfig.getConnectorCertType();
final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
for(int sslPort : sslPorts)
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Oct 28 15:52:13 2011
@@ -123,7 +123,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param configurationURL
@@ -169,7 +169,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param conf
@@ -239,6 +239,22 @@ public class ServerConfiguration extends
+ (_configFile == null ? "" : " Configuration file : " + _configFile);
throw new ConfigurationException(message);
}
+
+ // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
+ // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
+ for (String key : new String[] {"management.ssl.keystorePath",
+ "management.ssl.keystorePassword," +
+ "connector.ssl.keystorePath",
+ "connector.ssl.keystorePassword"})
+ {
+ if (contains(key))
+ {
+ final String deprecatedXpath = key.replaceAll("\\.", "/");
+ final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
+ _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
+ + (_configFile == null ? "" : " Configuration file : " + _configFile));
+ }
+ }
}
/*
@@ -404,7 +420,7 @@ public class ServerConfiguration extends
public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
- // interpolation takes place accross the entirety of the
+ // interpolation takes place across the entirety of the
// composite configuration. Without doing this each
// configuration object only interpolates variables defined
// inside itself.
@@ -551,7 +567,8 @@ public class ServerConfiguration extends
public String getManagementKeyStorePath()
{
- return getStringValue("management.ssl.keyStorePath");
+ final String fallback = getStringValue("management.ssl.keystorePath");
+ return getStringValue("management.ssl.keyStorePath", fallback);
}
public boolean getManagementSSLEnabled()
@@ -561,7 +578,8 @@ public class ServerConfiguration extends
public String getManagementKeyStorePassword()
{
- return getStringValue("management.ssl.keyStorePassword");
+ final String fallback = getStringValue("management.ssl.keystorePassword");
+ return getStringValue("management.ssl.keyStorePassword", fallback);
}
public boolean getQueueAutoRegister()
@@ -699,17 +717,19 @@ public class ServerConfiguration extends
return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
}
- public String getKeystorePath()
+ public String getConnectorKeyStorePath()
{
- return getStringValue("connector.ssl.keystorePath");
+ final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
+ return getStringValue("connector.ssl.keyStorePath", fallback);
}
- public String getKeystorePassword()
+ public String getConnectorKeyStorePassword()
{
- return getStringValue("connector.ssl.keystorePassword");
+ final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
+ return getStringValue("connector.ssl.keyStorePassword", fallback);
}
- public String getCertType()
+ public String getConnectorCertType()
{
return getStringValue("connector.ssl.certType", "SunX509");
}
Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375
Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -1,3 +1,3 @@
/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1144319-1190375
Modified: qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Fri Oct 28 15:52:13 2011
@@ -256,7 +256,7 @@ public class ServerConfigurationTest ext
assertEquals(false, _serverConfig.getManagementSSLEnabled());
}
- public void testGetManagementKeyStorePassword() throws ConfigurationException
+ public void testGetManagementKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
@@ -534,43 +534,57 @@ public class ServerConfigurationTest ext
assertEquals("10", _serverConfig.getSSLPorts().get(0));
}
- public void testGetKeystorePath() throws ConfigurationException
+ public void testGetConnectorKeystorePath() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePath());
+ assertNull(_serverConfig.getConnectorKeyStorePath());
// Check value we set
- _config.setProperty("connector.ssl.keystorePath", "a");
+ _config.setProperty("connector.ssl.keyStorePath", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePath());
+ assertEquals("a", _serverConfig.getConnectorKeyStorePath());
+
+ // Ensure we continue to support the old name keystorePath
+ _config.clearProperty("connector.ssl.keyStorePath");
+ _config.setProperty("connector.ssl.keystorePath", "b");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("b", _serverConfig.getConnectorKeyStorePath());
}
- public void testGetKeystorePassword() throws ConfigurationException
+ public void testGetConnectorKeystorePassword() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertNull(_serverConfig.getKeystorePassword());
+ assertNull(_serverConfig.getConnectorKeyStorePassword());
// Check value we set
- _config.setProperty("connector.ssl.keystorePassword", "a");
+ _config.setProperty("connector.ssl.keyStorePassword", "a");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("a", _serverConfig.getConnectorKeyStorePassword());
+
+ // Ensure we continue to support the old name keystorePassword
+ _config.clearProperty("connector.ssl.keyStorePassword");
+ _config.setProperty("connector.ssl.keystorePassword", "b");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getKeystorePassword());
+ assertEquals("b", _serverConfig.getConnectorKeyStorePassword());
}
- public void testGetCertType() throws ConfigurationException
+ public void testGetConnectorCertType() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertEquals("SunX509", _serverConfig.getCertType());
+ assertEquals("SunX509", _serverConfig.getConnectorCertType());
// Check value we set
_config.setProperty("connector.ssl.certType", "a");
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals("a", _serverConfig.getCertType());
+ assertEquals("a", _serverConfig.getConnectorCertType());
}
public void testGetUseBiasedWrites() throws ConfigurationException
Modified: qpid/branches/qpid-3346/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/build.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/build.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/build.xml Fri Oct 28 15:52:13 2011
@@ -22,8 +22,6 @@
<import file="common.xml"/>
- <property file="${project.root}/build.overrides"/>
-
<findSubProjects name="broker-plugins" dir="broker-plugins"/>
<findSubProjects name="client-plugins" dir="client-plugins"/>
<findSubProjects name="management" dir="management" excludes="common,example"/>
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Oct 28 15:52:13 2011
@@ -175,6 +175,10 @@ public class AMQConnection extends Close
// new amqp-0-10 encoded format.
private boolean _useLegacyMapMessageFormat;
+ //used to track the last failover time for
+ //Address resolution purposes
+ private volatile long _lastFailoverTime = 0;
+
/**
* @param broker brokerdetails
* @param username username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Close
*/
public boolean firePreFailover(boolean redirect)
{
+ _lastFailoverTime = System.currentTimeMillis();
boolean proceed = true;
if (_connectionListener != null)
{
@@ -1462,4 +1467,9 @@ public class AMQConnection extends Close
}
}
}
+
+ public long getLastFailoverTime()
+ {
+ return _lastFailoverTime;
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Oct 28 15:52:13 2011
@@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri Oct 28 15:52:13 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination imp
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination imp
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination imp
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination imp
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination imp
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 28 15:52:13 2011
@@ -308,7 +308,7 @@ public abstract class AMQSession<C exten
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -630,6 +630,7 @@ public abstract class AMQSession<C exten
try
{
acknowledgeImpl();
+ markClean();
}
catch (TransportException e)
{
@@ -855,6 +856,10 @@ public abstract class AMQSession<C exten
//Check that we are clean to commit.
if (_failedOverDirty)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ }
rollback();
throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -1813,9 +1818,7 @@ public abstract class AMQSession<C exten
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -2772,6 +2775,21 @@ public abstract class AMQSession<C exten
}
}
+ /**
+ * Undeclares the specified temporary queue/topic.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqQueue The name of the temporary destination to delete.
+ *
+ * @throws JMSException If the queue could not be deleted for any reason.
+ * @todo Be aware of possible changes to parameter order as versions change.
+ */
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+ {
+ deleteQueue(amqQueue.getAMQQueueName());
+ }
+
public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
private long getNextProducerId()
@@ -3186,7 +3204,7 @@ public abstract class AMQSession<C exten
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
@@ -3335,6 +3353,11 @@ public abstract class AMQSession<C exten
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
else if (_usingDispatcherForCleanup)
@@ -3396,6 +3419,11 @@ public abstract class AMQSession<C exten
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3526,4 +3554,15 @@ public abstract class AMQSession<C exten
{
return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
}
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 28 15:52:13 2011
@@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
@@ -1168,8 +1179,8 @@ public class AMQSession_0_10 extends AMQ
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1258,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
@@ -1378,4 +1389,15 @@ public class AMQSession_0_10 extends AMQ
getQpidSession().sync();
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+ super.resubscribe();
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 28 15:52:13 2011
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
-
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -92,7 +91,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
@@ -110,7 +109,7 @@ public final class AMQSession_0_8 extend
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
- * @param acknowledgeMode The acknoledgement mode for the session.
+ * @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
@@ -169,7 +168,7 @@ public final class AMQSession_0_8 extend
// we also need to check the state manager for 08/09 as the
// _connection variable may not be updated in time by the error receiving
// thread.
- // We can't close the session if we are alreadying in the process of
+ // We can't close the session if we are already in the process of
// closing/closed the connection.
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -605,6 +604,18 @@ public final class AMQSession_0_8 extend
}
+ @Override
+ protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
+ throws JMSException
+ {
+ // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
+ // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
+ // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
+ // client explicitly deletes it.
+
+ /* intentional no-op */
+ }
+
public boolean isQueueBound(String exchangeName, String queueName,
String bindingKey, Map<String, Object> args) throws JMSException
{
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 28 15:52:13 2011
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.client;
+import java.util.UUID;
+
import javax.jms.JMSException;
import javax.jms.TemporaryQueue;
import org.apache.qpid.framing.AMQShortString;
-import java.util.Random;
-import java.util.UUID;
-
/** AMQ implementation of a TemporaryQueue. */
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
{
@@ -50,11 +49,15 @@ final class AMQTemporaryQueue extends AM
{
throw new JMSException("Temporary Queue has consumers so cannot be deleted");
}
- _deleted = true;
- // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue. This is probably not
- // quite right for JMSCompliance.
+ try
+ {
+ _session.deleteTemporaryDestination(this);
+ }
+ finally
+ {
+ _deleted = true;
+ }
}
public AMQSession getSession()
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 28 15:52:13 2011
@@ -53,10 +53,14 @@ class AMQTemporaryTopic extends AMQTopic
throw new JMSException("Temporary Topic has consumers so cannot be deleted");
}
- _deleted = true;
- // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
- // by the server when there are no more subscriptions to that queue. This is probably not
- // quite right for JMSCompliance.
+ try
+ {
+ _session.deleteTemporaryDestination(this);
+ }
+ finally
+ {
+ _deleted = true;
+ }
}
public AMQSession getSession()
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Oct 28 15:52:13 2011
@@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 e
{
if (isMessageListenerSet() && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
+ else
+ {
+ // if we are synchronously waiting for a message
+ // and messages are not pre-fetched we then need to request another one
+ if(capacity == 0)
+ {
+ messageFlow();
+ }
+ }
}
catch (AMQException e)
{
@@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 e
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
+
if (!messageOk)
{
if (_preAcquire)
@@ -261,19 +269,12 @@ public class BasicMessageConsumer_0_10 e
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message not OK, releasing");
+ _logger.debug("filterMessage - not ack'ing message as not acquired");
}
- releaseMessage(message);
- }
- // if we are syncrhonously waiting for a message
- // and messages are not prefetched we then need to request another one
- if(capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ flushUnwantedMessage(message);
}
}
+
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk && !isNoConsume())
@@ -285,6 +286,7 @@ public class BasicMessageConsumer_0_10 e
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
+
return messageOk;
}
@@ -295,38 +297,38 @@ public class BasicMessageConsumer_0_10 e
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
- {
- throw amqe;
- }
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
}
}
/**
- * Release a message
+ * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+ * processed to ensure their AMQP command-id is marked completed.
*
- * @param message The message to be released
- * @throws AMQException If the message cannot be released due to some internal error.
+ * @param message The unwanted message to be flushed
+ * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
*/
- private void releaseMessage(AbstractJMSMessage message) throws AMQException
+ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
{
- if (_preAcquire)
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.flushProcessed(ranges,false);
+
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
{
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.sync();
+ throw amqe;
}
}
@@ -337,25 +339,28 @@ public class BasicMessageConsumer_0_10 e
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
- }
+ final RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
}
return result;
}
+ private void messageFlow()
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
@@ -364,9 +369,7 @@ public class BasicMessageConsumer_0_10 e
{
if (messageListener != null && capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -389,9 +392,7 @@ public class BasicMessageConsumer_0_10 e
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
}
@@ -412,9 +413,7 @@ public class BasicMessageConsumer_0_10 e
}
if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Oct 28 15:52:13 2011
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 e
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Fri Oct 28 15:52:13 2011
@@ -24,13 +24,16 @@ package org.apache.qpid.client;
import javax.jms.Destination;
import javax.jms.JMSException;
+import org.apache.qpid.framing.AMQShortString;
+
/**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
* so that operations related to their "temporary-ness" can be abstracted out.
*/
interface TemporaryDestination extends Destination
{
+ public AMQShortString getAMQQueueName();
public void delete() throws JMSException;
public AMQSession getSession();
public boolean isDeleted();
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 28 15:52:13 2011
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolV
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -362,7 +363,15 @@ public class AMQProtocolSession implemen
public void closeProtocolSession() throws AMQException
{
- _protocolHandler.closeConnection(0);
+ try
+ {
+ _protocolHandler.getNetworkConnection().close();
+ }
+ catch(TransportException e)
+ {
+ //ignore such exceptions, they were already logged
+ //and this is a forcible close.
+ }
}
public void failover(String host, int port)
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Fri Oct 28 15:52:13 2011
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implement
public JMSSelectorFilter(String selector) throws AMQInternalException
{
_selector = selector;
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
}
_matcher = new SelectorParser().parse(selector);
}
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implement
try
{
boolean match = _matcher.matches(message);
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
+ _logger.debug(message + " match(" + match + ") selector(" + System
.identityHashCode(_selector) + "):" + _selector);
}
return match;
}
catch (AMQInternalException e)
{
- JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e);
+ _logger.warn("Caught exception when evaluating message selector for message " + message, e);
}
return false;
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Oct 28 15:52:13 2011
@@ -526,10 +526,6 @@ public class Connection extends Connecti
{
synchronized (lock)
{
- for (Session ssn : channels.values())
- {
- ssn.closeCode(close);
- }
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
@@ -705,4 +701,13 @@ public class Connection extends Connecti
{
return sessions.containsKey(new Binary(name.getBytes()));
}
+
+ public void notifyFailoverRequired()
+ {
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
+ {
+ ssn.notifyFailoverRequired();
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org