You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/10/28 17:52:18 UTC

svn commit: r1190406 [3/5] - in /qpid/branches/qpid-3346/qpid: ./ bin/ cpp/design_docs/ cpp/docs/api/ cpp/docs/man/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/src/ cpp/src/posix/ cpp/src/qmf/engine/ cpp/src/qpid/ cpp/src/qpid/acl/ cpp/...

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/sasl.mk Fri Oct 28 15:52:13 2011
@@ -30,7 +30,7 @@ check_PROGRAMS+=sasl_version
 sasl_version_SOURCES=sasl_version.cpp
 sasl_version_LDADD=$(lib_client)
 
-TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster
+TESTS += run_cluster_authentication_test sasl_fed sasl_fed_ex_dynamic sasl_fed_ex_link sasl_fed_ex_queue sasl_fed_ex_route sasl_fed_ex_route_cluster sasl_fed_ex_link_cluster sasl_fed_ex_queue_cluster sasl_fed_ex_dynamic_cluster sasl_no_dir
 LONG_TESTS += run_cluster_authentication_soak
 EXTRA_DIST += run_cluster_authentication_test \
               sasl_fed                        \
@@ -43,7 +43,8 @@ EXTRA_DIST += run_cluster_authentication
               sasl_fed_ex_dynamic_cluster     \
               sasl_fed_ex_link_cluster        \
               sasl_fed_ex_queue_cluster       \
-              sasl_fed_ex_route_cluster
+              sasl_fed_ex_route_cluster       \
+              sasl_no_dir
 
 
 endif # HAVE_SASL

Modified: qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/tests/ssl_test Fri Oct 28 15:52:13 2011
@@ -47,9 +47,13 @@ delete_certs() {
     fi
 }
 
-COMMON_OPTS="--daemon --no-data-dir --no-module-dir --auth no --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME --require-encryption"
+COMMON_OPTS="--daemon --no-data-dir --no-module-dir --config $CONFIG --load-module $SSL_LIB --ssl-cert-db $CERT_DIR --ssl-cert-password-file $CERT_PW_FILE --ssl-cert-name $TEST_HOSTNAME"
 start_broker() { # $1 = extra opts
-    ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS $1;
+    ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --auth no $1;
+}
+
+start_authenticating_broker() {
+    ../qpidd --transport ssl --port 0 --ssl-port 0 $COMMON_OPTS --require-encryption --ssl-sasl-no-dict --ssl-require-client-authentication --auth yes;
 }
 
 stop_brokers() {
@@ -64,6 +68,13 @@ cleanup() {
     delete_certs
 }
 
+pick_port() {
+    # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
+    PICK=`../qpidd --no-module-dir -dp0`
+    ../qpidd --no-module-dir -qp $PICK
+    echo $PICK
+}
+
 CERTUTIL=$(type -p certutil)
 if [[ !(-x $CERTUTIL) ]] ; then
     echo "No certutil, skipping ssl test";
@@ -93,7 +104,7 @@ test "$MSG" = "hello" || { echo "receive
 
 #### Client Authentication tests
 
-PORT2=`start_broker --ssl-require-client-authentication`  || error "Could not start broker"
+PORT2=`start_authenticating_broker`  || error "Could not start broker"
 echo "Running SSL client authentication test on port $PORT2"
 URL=amqp:ssl:$TEST_HOSTNAME:$PORT2
 
@@ -109,19 +120,22 @@ test "$MSG3" = "" || { echo "receive suc
 
 stop_brokers
 
+#Test multiplexed connection where SSL and plain TCP are served by the same port
+PORT=`pick_port`; ../qpidd --port $PORT --ssl-port $PORT $COMMON_OPTS --transport ssl --auth no
+echo "Running multiplexed SSL/TCP test on $PORT"
+
+./qpid-perftest --count ${COUNT} --port ${PORT} -P ssl -b $TEST_HOSTNAME --summary || { echo "SSL on multiplexed connection failed!"; exit 1; }
+./qpid-perftest --count ${COUNT} --port ${PORT} -P tcp -b $TEST_HOSTNAME --summary || { echo "Plain TCP on multiplexed connection failed!"; exit 1; }
+
+stop_brokers
+
 test -z $CLUSTER_LIB && exit 0	# Exit if cluster not supported.
 
 ## Test failover in a cluster using SSL only
 . $srcdir/ais_check		# Will exit if clustering not enabled.
 
-pick_port() {
-    # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
-    PICK=`../qpidd --no-module-dir -dp0`
-    ../qpidd --no-module-dir -qp $PICK
-    echo $PICK
-}
 ssl_cluster_broker() {		# $1 = port
-    ../qpidd $COMMON_OPTS --load-module  $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
+    ../qpidd $COMMON_OPTS --require-encryption --auth no --load-module  $CLUSTER_LIB --cluster-name ssl_test.$HOSTNAME.$$ --cluster-url amqp:ssl:$TEST_HOSTNAME:$1 --port 0 --ssl-port $1 --transport ssl > /dev/null
     # Wait for broker to be ready
     qpid-ping -Pssl -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
     echo "Running SSL cluster broker on port $1"

Modified: qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/windows/QpiddBroker.cpp Fri Oct 28 15:52:13 2011
@@ -19,17 +19,9 @@
  *
  */
 
-#ifdef HAVE_CONFIG_H
-#  include "config.h"
-#else
-// These need to be made something sensible, like reading a value from
-// the registry. But for now, get things going with a local definition.
-namespace {
-const char *QPIDD_CONF_FILE = "qpid_broker.conf";
-const char *QPIDD_MODULE_DIR = ".";
-}
-#endif
+#include "config.h"
 #include "qpidd.h"
+#include "SCM.h"
 #include "qpid/Exception.h"
 #include "qpid/Options.h"
 #include "qpid/Plugin.h"
@@ -205,8 +197,56 @@ struct BrokerInfo {
     DWORD pid;
 };
 
+// Service-related items. Only involved when running the broker as a Windows
+// service.
+
+const std::string svcName = "qpidd";
+SERVICE_STATUS svcStatus;
+SERVICE_STATUS_HANDLE svcStatusHandle = 0;
+
+// This function is only called when the broker is run as a Windows
+// service. It receives control requests from Windows.
+VOID WINAPI SvcCtrlHandler(DWORD control)
+{
+    switch(control) {
+    case SERVICE_CONTROL_STOP:
+        svcStatus.dwCurrentState = SERVICE_STOP_PENDING;
+        svcStatus.dwControlsAccepted = 0;
+        svcStatus.dwCheckPoint = 1;
+        svcStatus.dwWaitHint = 5000;  // 5 secs.
+        ::SetServiceStatus(svcStatusHandle, &svcStatus);
+        CtrlHandler(CTRL_C_EVENT);
+        break;
+ 
+    case SERVICE_CONTROL_INTERROGATE:
+        break;
+ 
+    default:
+        break;
+    }
+}
+
+VOID WINAPI ServiceMain(DWORD argc, LPTSTR *argv)
+{
+    ::memset(&svcStatus, 0, sizeof(svcStatus));
+    svcStatusHandle = ::RegisterServiceCtrlHandler(svcName.c_str(),
+                                                   SvcCtrlHandler);
+    svcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+    svcStatus.dwCheckPoint = 1;
+    svcStatus.dwWaitHint = 10000;  // 10 secs.
+    svcStatus.dwCurrentState = SERVICE_START_PENDING;
+    ::SetServiceStatus(svcStatusHandle, &svcStatus);
+    // QpiddBroker class resets state to running.
+    svcStatus.dwWin32ExitCode = run_broker(argc, argv, true);
+    svcStatus.dwCurrentState = SERVICE_STOPPED;
+    svcStatus.dwCheckPoint = 0;
+    svcStatus.dwWaitHint = 0;
+    ::SetServiceStatus(svcStatusHandle, &svcStatus);
 }
 
+}  // namespace
+
+
 struct ProcessControlOptions : public qpid::Options {
     bool quit;
     bool check;
@@ -225,9 +265,49 @@ struct ProcessControlOptions : public qp
     }
 };
 
+struct ServiceOptions : public qpid::Options {
+    bool install;
+    bool start;
+    bool stop;
+    bool uninstall;
+    bool daemon;
+    std::string startType;
+    std::string startArgs;
+    std::string account;
+    std::string password;
+    std::string depends;
+
+    ServiceOptions() 
+        : qpid::Options("Service options"),
+          install(false),
+          start(false),
+          stop(false),
+          uninstall(false),
+          daemon(false),
+          startType("demand"),
+          startArgs(""),
+          account("NT AUTHORITY\\LocalService"),
+          password(""),
+          depends("")
+    {
+        addOptions()
+            ("install", qpid::optValue(install), "Install as service")
+            ("start-type", qpid::optValue(startType, "auto|demand|disabled"), "Service start type\nApplied at install time only.")
+            ("arguments", qpid::optValue(startArgs, "COMMAND LINE ARGS"), "Arguments to pass when service auto-starts")
+            ("account", qpid::optValue(account, "(LocalService)"), "Account to run as, default is LocalService\nApplied at install time only.")
+            ("password", qpid::optValue(password, "PASSWORD"), "Account password, if needed\nApplied at install time only.")
+            ("depends", qpid::optValue(depends, "(comma delimited list)"), "Names of services that must start before this service\nApplied at install time only.")
+            ("start", qpid::optValue(start), "Start the service.")
+            ("stop", qpid::optValue(stop), "Stop the service.")
+            ("uninstall", qpid::optValue(uninstall), "Uninstall the service.");
+    }
+};
+
 struct QpiddWindowsOptions : public QpiddOptionsPrivate {
     ProcessControlOptions control;
+    ServiceOptions service;
     QpiddWindowsOptions(QpiddOptions *parent) : QpiddOptionsPrivate(parent) {
+        parent->add(service);
         parent->add(control);
     }
 };
@@ -253,12 +333,63 @@ void QpiddOptions::usage() const {
 }
 
 int QpiddBroker::execute (QpiddOptions *options) {
+
+    // If running as a service, bump the status checkpoint to let SCM know
+    // we're still making progress.
+    if (svcStatusHandle != 0) {
+        svcStatus.dwCheckPoint++;
+        ::SetServiceStatus(svcStatusHandle, &svcStatus);
+    }
+
     // Options that affect a running daemon.
     QpiddWindowsOptions *myOptions =
-      reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
+        reinterpret_cast<QpiddWindowsOptions *>(options->platform.get());
     if (myOptions == 0)
         throw qpid::Exception("Internal error obtaining platform options");
 
+    if (myOptions->service.install) {
+        // Handle start type
+        DWORD startType;
+        if (myOptions->service.startType.compare("demand") == 0)
+            startType = SERVICE_DEMAND_START;
+        else if (myOptions->service.startType.compare("auto") == 0)
+            startType = SERVICE_AUTO_START;
+        else if (myOptions->service.startType.compare("disabled") == 0)
+            startType = SERVICE_DISABLED;
+        else if (!myOptions->service.startType.empty())
+            throw qpid::Exception("Invalid service start type: " +
+                                  myOptions->service.startType);
+
+        // Install service and exit
+        qpid::windows::SCM manager;
+        manager.install(svcName,
+                        "Apache Qpid Message Broker",
+                        myOptions->service.startArgs,
+                        startType,
+                        myOptions->service.account,
+                        myOptions->service.password,
+                        myOptions->service.depends);
+        return 0;
+    }
+
+    if (myOptions->service.start) {
+        qpid::windows::SCM manager;
+        manager.start(svcName);
+        return 0;
+    }
+
+    if (myOptions->service.stop) {
+        qpid::windows::SCM manager;
+        manager.stop(svcName);
+        return 0;
+    }
+
+    if (myOptions->service.uninstall) {
+        qpid::windows::SCM manager;
+        manager.uninstall(svcName);
+        return 0;
+    }
+
     if (myOptions->control.check || myOptions->control.quit) {
         // Relies on port number being set via --port or QPID_PORT env variable.
         NamedSharedMemory<BrokerInfo> info(brokerInfoName(options->broker.port));
@@ -301,10 +432,41 @@ int QpiddBroker::execute (QpiddOptions *
     ::SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE);
     brokerPtr->accept();
     std::cout << options->broker.port << std::endl;
+
+    // If running as a service, tell SCM we're up. There's still a chance
+    // that store recovery will drag out the time before the broker actually
+    // responds to requests, but integrating that mechanism with the SCM
+    // updating is probably more work than it's worth.
+    if (svcStatusHandle != 0) {
+        svcStatus.dwCheckPoint = 0;
+        svcStatus.dwWaitHint = 0;
+        svcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+        svcStatus.dwCurrentState = SERVICE_RUNNING;
+        ::SetServiceStatus(svcStatusHandle, &svcStatus);
+    }
+
     brokerPtr->run();
     waitShut.signal();   // In case we shut down some other way
     waitThr.join();
+    return 0;
+}
 
-    // CloseHandle(h);
+
+int main(int argc, char* argv[])
+{
+    // If started as a service, notify the SCM we're up. Else just run.
+    // If as a service, StartServiceControlDispatcher doesn't return until
+    // the service is stopped.
+    SERVICE_TABLE_ENTRY dispatchTable[] =
+    {
+        { "", (LPSERVICE_MAIN_FUNCTION)ServiceMain },
+        { NULL, NULL }
+    };
+    if (!StartServiceCtrlDispatcher(dispatchTable)) {
+        DWORD err = ::GetLastError();
+        if (err == ERROR_FAILED_SERVICE_CONTROLLER_CONNECT) // Run as console
+            return run_broker(argc, argv);
+        throw QPID_WINDOWS_ERROR(err);
+    }
     return 0;
 }

Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Configure-Java-Qpid-to-use-a-SSL-connection.xml Fri Oct 28 15:52:13 2011
@@ -51,8 +51,8 @@
 &lt;ssl&gt;
     &lt;enabled&gt;true&lt;/enabled&gt;
     &lt;sslOnly&gt;true&lt;/sslOnly&gt;
-    &lt;keystorePath&gt;/path/to/keystore.ks&lt;/keystorePath&gt;
-    &lt;keystorePassword&gt;keystorepass&lt;/keystorePassword&gt;
+    &lt;keyStorePath&gt;/path/to/keystore.ks&lt;/keyStorePath&gt;
+    &lt;keyStorePassword&gt;keystorepass&lt;/keyStorePassword&gt;
 &lt;/ssl&gt;
 </programlisting>
             

Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Programming-In-Apache-Qpid.xml Fri Oct 28 15:52:13 2011
@@ -3443,9 +3443,8 @@ log4j.appender.console.layout.Conversion
 	              <entry>qpid.amqp.version</entry>
 	              <entry>string</entry>
 	              <entry>0-10</entry>
-                  <entry>Sets the AMQP version to be used - currently supports one of {0-8,0-9,0-91,0-10}</entry>
-                </row>
-
+                      <entry><para>Sets the AMQP version to be used - currently supports one of {0-8,0-9,0-91,0-10}.</para><para>The client will begin negotiation at the specified version and only negotiate downwards if the Broker does not support the specified version.</para></entry>
+                    </row>
 	            <row>
 	              <entry>qpid.heartbeat</entry>
 	              <entry>int</entry>
@@ -3610,15 +3609,20 @@ log4j.appender.console.layout.Conversion
 	              <entry>qpid.transport</entry>
 	              <entry>string</entry>
 	              <entry>org.apache.qpid.transport.network.io.IoNetworkTransport</entry>
-                  <entry><para>The transport implementation to be used.</para><para>A user could specify an alternative transport mechanism that implements the <varname>org.apache.qpid.transport.network.NetworkTransport</varname> interface.</para></entry>
-                </row>
-
+                      <entry><para>The transport implementation to be used.</para><para>A user could specify an alternative transport mechanism that implements the <varname>org.apache.qpid.transport.network.NetworkTransport</varname> interface.</para></entry>
+                    </row>
+	            <row>
+	              <entry>qpid.sync_op_timeout</entry>
+	              <entry>long</entry>
+	              <entry>60000</entry>
+                      <entry><para>The length of time (in milliseconds) to wait for a synchronous operation to complete.</para><para>For compatibility with older clients, the synonym <varname>amqj.default_syncwrite_timeout</varname> is supported.</para></entry>
+                    </row>
 	            <row>
 	              <entry>amqj.tcp_nodelay</entry>
 	              <entry>boolean</entry>
 	              <entry>false</entry>
-                  <entry><para>Sets the TCP_NODELAY property of the underlying socket.</para><para>This could also be set per connection as well (see connection paramters).</para></entry>
-                </row>
+                      <entry><para>Sets the TCP_NODELAY property of the underlying socket.</para><para>This could also be set per connection as well (see connection paramters).</para></entry>
+                    </row>
               </tbody>
             </tgroup>
           </table>

Modified: qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/Qpid-Java-FAQ.xml Fri Oct 28 15:52:13 2011
@@ -736,35 +736,6 @@ amqj.logging.level
           </para>
 <!--h3--></section>
 
-	  <section role="h3" id="QpidJavaFAQ-HowdoIuseanInVMBrokerformyowntests-3F"><title>
-            How do I
-            use an InVM Broker for my own tests?
-          </title>
-
-	  <para>
-            I would take a look at the testPassiveTTL in
-	    <ulink url="https://svn.apache.org/repos/asf/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java">TimeToLiveTest.java</ulink>
-          </para><para>
-            The setUp and tearDown methods show how to correctly start up a
-            broker for InVM testing. If you write your tests using a file for
-            the JNDI you can then very easily swap between running your tests
-            InVM and against a real broker.
-          </para><para>
-            See our <xref linkend="How-to-Use-JNDI"/> on how to confgure it
-          </para><para>
-            Basically though you just need to set two System Properites:
-          </para><para>
-            java.naming.factory.initial =
-            org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-            java.naming.provider.url = &lt;your JNDI file&gt;
-          </para><para>
-            and call getInitialContext() in your code.
-          </para><para>
-            You will of course need to have the broker libraries on your
-            class path for this to run.
-          </para>
-<!--h3--></section>
-
 	  <section role="h3" id="QpidJavaFAQ-HowcanIinspectthecontentsofmyMessageStore-3F"><title>
             How
             can I inspect the contents of my MessageStore?
@@ -907,31 +878,6 @@ java.lang.NullPointerException 
           </para>
 <!--h3--></section>
 
-	  <section role="h3" id="QpidJavaFAQ-Clientkeepsthrowing-27Serverdidnotrespondinatimelyfashion-27-5Cerrorcode408-3ARequestTimeout-5C."><title>
-            Client keeps throwing 'Server did not respond in a timely
-            fashion' [error code 408: Request Timeout].
-          </title>
-
-	  <para>
-            Certain operations wait for a response from the Server. One such
-            operations is commit. If the server does not respond to the
-            commit request within a set time a Request Timeout [error code:
-            408] exception is thrown (Server did not respond in a timely
-            fashion). This is to ensure that a server that has hung does not
-            cause the client process to be come unresponsive.
-          </para><para>
-            However, it is possible that the server just needs a long time to
-            process a give request. For example, sending a large persistent
-            message when using a persistent store will take some time to a)
-            Transfer accross the network and b) to be fully written to disk.
-          </para><para>
-            These situations require that the default timeout value be
-            increased. A cilent <xref linkend="qpid_System-Properties"/> 'amqj.default_syncwrite_timeout' can be set
-            on the client to increase the wait time. The default in 0.5 is
-            30000 (30s).
-          </para>
-<!--h3--></section>
-
 	  <section role="h3" id="QpidJavaFAQ-CanauseTCPKEEPALIVEorAMQPheartbeatingtokeepmyconnectionopen-3F"><title>
             Can a use TCP_KEEPALIVE or AMQP heartbeating to keep my
             connection open?

Modified: qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml (original)
+++ qpid/branches/qpid-3346/qpid/doc/book/src/System-Properties.xml Fri Oct 28 15:52:13 2011
@@ -135,7 +135,7 @@
 
 
 	<varlistentry>
-	  <term>amqj.default_syncwrite_timeout</term>
+	  <term>qpid.sync_op_timeout</term>
 	  <listitem>
 	    <variablelist>
 	      <varlistentry>
@@ -144,12 +144,11 @@
 	      </varlistentry>
 	      <varlistentry>
 		<term>Default</term>
-		<listitem><para>30000</para></listitem>
+		<listitem><para>60000</para></listitem>
 	      </varlistentry>
 	    </variablelist>
-	    <para> The number length of time in millisecond to wait
-	    for a synchronous write to complete.
-            </para>
+	    <para>The length of time (in milliseconds) to wait for a synchronous operation to complete.
+                  For compatibility with older	clients, the synonym amqj.default_syncwrite_timeout is supported.</para>
 	  </listitem>
 	</varlistentry>       
 
@@ -193,7 +192,7 @@
 
 
 	<varlistentry>
-	  <term>amqj.tcpNoDelay</term>
+	  <term>amqj.tcp_nodelay</term>
 	  <listitem>
 	    <variablelist>
 	      <varlistentry>
@@ -211,65 +210,6 @@
 	</varlistentry>       
 
 	<varlistentry>
-	  <term>amqj.sendBufferSize</term>
-	  <listitem>
-	    <variablelist>
-	      <varlistentry>
-		<term>integer</term>
-		<listitem><para>Boolean</para></listitem>
-	      </varlistentry>
-	      <varlistentry>
-		<term>Default</term>
-		<listitem><para>32768</para></listitem>
-	      </varlistentry>
-	    </variablelist>
-	    <para>This is the default buffer sized created by Mina.
-            </para>
-	  </listitem>
-	</varlistentry>   
-
-	<varlistentry>
-	  <term>amqj.receiveBufferSize</term>
-	  <listitem>
-	    <variablelist>
-	      <varlistentry>
-		<term>Type</term>
-		<listitem><para>integer</para></listitem>
-	      </varlistentry>
-	      <varlistentry>
-		<term>Default</term>
-		<listitem><para>32768</para></listitem>
-	      </varlistentry>
-	    </variablelist>
-	    <para>This is the default buffer sized created by Mina.
-            </para>
-	  </listitem>
-	</varlistentry>      
-
-
-	<varlistentry>
-	  <term>amqj.protocolprovider.class</term>
-	  <listitem>
-	    <variablelist>
-	      <varlistentry>
-		<term>Type</term>
-		<listitem><para>String</para></listitem>
-	      </varlistentry>
-	      <varlistentry>
-		<term>Default</term>
-		<listitem><para>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</para></listitem>
-	      </varlistentry>
-	    </variablelist>
-	    <para> This specifies the default IoHandlerAdapter that
-	    represents the InVM broker. The IoHandlerAdapter must have
-	    a constructor that takes a single Integer that represents
-	    the InVM port number.
-            </para>
-	  </listitem>
-	</varlistentry>          
-              
-                 
-	<varlistentry>
 	  <term>amqj.protocol.logging.level</term>
 	  <listitem>
 	    <variablelist>

Modified: qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/branches/qpid-3346/qpid/extras/qmf/src/py/qmf/console.py Fri Oct 28 15:52:13 2011
@@ -3295,6 +3295,7 @@ class Agent:
       self.lock.acquire()
       try:
         self.contextMap.pop(sequence)
+        self.seqMgr._release(sequence)
       except KeyError:
         pass   # @todo - shouldn't happen, log a warning.
     finally:

Propchange: qpid/branches/qpid-3346/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
 /qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:1144319-1179750
+/qpid/trunk/qpid/java:1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker:787599
 /qpid/branches/qpid-2935/qpid/java/broker:1061302-1072333
-/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/main/java/org/apache/qpid/info/AppInfo.java Fri Oct 28 15:52:13 2011
@@ -74,9 +74,9 @@ public class AppInfo
                 appInfoMap.put("port", sc.getPorts().toString());
                 appInfoMap.put("version", QpidProperties.getReleaseVersion());
                 appInfoMap.put("vhosts", "standalone");
-                appInfoMap.put("KeystorePath", sc.getKeystorePath());
+                appInfoMap.put("KeystorePath", sc.getConnectorKeyStorePath());
                 appInfoMap.put("PluginDirectory", sc.getPluginDirectory());
-                appInfoMap.put("CertType", sc.getCertType());
+                appInfoMap.put("CertType", sc.getConnectorCertType());
                 appInfoMap.put("QpidWork", sc.getQpidWork());
                 appInfoMap.put("Bind", sc.getBind());
             }

Modified: qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker-plugins/experimental/info/src/test/java/org/apache/qpid/info/systest/InfoPluginTest.java Fri Oct 28 15:52:13 2011
@@ -210,12 +210,13 @@ public class InfoPluginTest extends Qpid
                 }
                 br.close();
                 System.out.println("*** Received buffer: " + buf);
-                System.out.println("*** Latch countdown");
-                _latch.countDown();
                 synchronized (_recv)
                 {
                     _recv.add(buf);
                 }
+
+                System.out.println("*** Latch countdown");
+                _latch.countDown();
             }
             catch (Exception ex)
             {

Propchange: qpid/branches/qpid-3346/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -2,4 +2,4 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/bin:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/bin:805429-821809
 /qpid/branches/qpid-2935/qpid/java/broker/bin:1061302-1072333
-/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/bin/qpid-server.bat Fri Oct 28 15:52:13 2011
@@ -65,7 +65,7 @@ if "%AMQJ_LOGGING_LEVEL%" == "" set AMQJ
 
 REM Set the default system properties that we'll use now that they have
 REM all been initialised
-set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% "-DQPID_HOME=%QPID_HOME%" "-DQPID_WORK=%QPID_WORK%"
+set SYSTEM_PROPS=-Damqj.logging.level=%AMQJ_LOGGING_LEVEL% -DQPID_HOME="%QPID_HOME%" -DQPID_WORK="%QPID_WORK%"
 
 if "%EXTERNAL_CLASSPATH%" == "" set EXTERNAL_CLASSPATH=%CLASSPATH%
 
@@ -77,7 +77,7 @@ goto afterQpidClasspath
 
 :noQpidClasspath
 echo Warning: Qpid classpath not set. CLASSPATH set to %QPID_HOME%\lib\qpid-all.jar
-set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar
+set CLASSPATH=%QPID_HOME%\lib\qpid-all.jar;%QPID_HOME%\lib\opt\*
 :afterQpidClasspath
 
 REM start parsing -run arguments

Modified: qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/etc/config.xml Fri Oct 28 15:52:13 2011
@@ -35,8 +35,8 @@
             <enabled>false</enabled>
             <port>5671</port>
             <sslOnly>false</sslOnly>
-            <keystorePath>/path/to/keystore.ks</keystorePath>
-            <keystorePassword>keystorepass</keystorePassword>
+            <keyStorePath>/path/to/keystore.ks</keyStorePath>
+            <keyStorePassword>keystorepass</keyStorePassword>
         </ssl>
         <port>5672</port>
         <socketReceiveBuffer>262144</socketReceiveBuffer>

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Oct 28 15:52:13 2011
@@ -713,6 +713,19 @@ public class QMFService implements Confi
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
         }
 
+        public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+        {
+            // TODO: timestamp support
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
+        public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+                                                                                                   final java.lang.Boolean receive)
+        {
+            // TODO: timestamp support
+            return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+        }
+
         public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
                                                                            final String type,
                                                                            final String name,

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/Broker.java Fri Oct 28 15:52:13 2011
@@ -210,9 +210,9 @@ public class Broker
 
             if (serverConfig.getEnableSSL())
             {
-                final String keystorePath = serverConfig.getKeystorePath();
-                final String keystorePassword = serverConfig.getKeystorePassword();
-                final String certType = serverConfig.getCertType();
+                final String keystorePath = serverConfig.getConnectorKeyStorePath();
+                final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+                final String certType = serverConfig.getConnectorCertType();
                 final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, certType);
 
                 for(int sslPort : sslPorts)

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Oct 28 15:52:13 2011
@@ -123,7 +123,7 @@ public class ServerConfiguration extends
      * Configuration Manager to be initialised in the Application Registry.
      * <p>
      * If using this ServerConfiguration via an ApplicationRegistry there is no
-     * need to explictly call {@link #initialise()} as this is done via the
+     * need to explicitly call {@link #initialise()} as this is done via the
      * {@link ApplicationRegistry#initialise()} method.
      *
      * @param configurationURL
@@ -169,7 +169,7 @@ public class ServerConfiguration extends
      * Configuration Manager to be initialised in the Application Registry.
      * <p>
      * If using this ServerConfiguration via an ApplicationRegistry there is no 
-     * need to explictly call {@link #initialise()} as this is done via the
+     * need to explicitly call {@link #initialise()} as this is done via the
      * {@link ApplicationRegistry#initialise()} method.
      *
      * @param conf
@@ -239,6 +239,22 @@ public class ServerConfiguration extends
                     + (_configFile == null ? "" : " Configuration file : " + _configFile);
             throw new ConfigurationException(message);
         }
+
+        // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
+        // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
+        for (String key : new String[] {"management.ssl.keystorePath",
+                "management.ssl.keystorePassword," +
+                "connector.ssl.keystorePath",
+                "connector.ssl.keystorePassword"})
+        {
+            if (contains(key))
+            {
+                final String deprecatedXpath = key.replaceAll("\\.", "/");
+                final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
+                _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
+                        + (_configFile == null ? "" : " Configuration file : " + _configFile));
+            }
+        }
     }
 
     /*
@@ -404,7 +420,7 @@ public class ServerConfiguration extends
     public final static Configuration flatConfig(File file) throws ConfigurationException
     {
         // We have to override the interpolate methods so that
-        // interpolation takes place accross the entirety of the
+        // interpolation takes place across the entirety of the
         // composite configuration. Without doing this each
         // configuration object only interpolates variables defined
         // inside itself.
@@ -551,7 +567,8 @@ public class ServerConfiguration extends
 
     public String getManagementKeyStorePath()
     {
-        return getStringValue("management.ssl.keyStorePath");
+        final String fallback = getStringValue("management.ssl.keystorePath");
+        return getStringValue("management.ssl.keyStorePath", fallback);
     }
 
     public boolean getManagementSSLEnabled()
@@ -561,7 +578,8 @@ public class ServerConfiguration extends
 
     public String getManagementKeyStorePassword()
     {
-        return getStringValue("management.ssl.keyStorePassword");
+        final String fallback = getStringValue("management.ssl.keystorePassword");
+        return getStringValue("management.ssl.keyStorePassword", fallback);
     }
 
     public boolean getQueueAutoRegister()
@@ -699,17 +717,19 @@ public class ServerConfiguration extends
         return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
     }
 
-    public String getKeystorePath()
+    public String getConnectorKeyStorePath()
     {
-        return getStringValue("connector.ssl.keystorePath");
+        final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
+        return getStringValue("connector.ssl.keyStorePath", fallback);
     }
 
-    public String getKeystorePassword()
+    public String getConnectorKeyStorePassword()
     {
-        return getStringValue("connector.ssl.keystorePassword");
+        final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
+        return getStringValue("connector.ssl.keyStorePassword", fallback);
     }
 
-    public String getCertType()
+    public String getConnectorCertType()
     {
         return getStringValue("connector.ssl.certType", "SunX509");
     }

Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -4,4 +4,4 @@
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1144319-1190375

Propchange: qpid/branches/qpid-3346/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 28 15:52:13 2011
@@ -1,3 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1144319-1179750
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1144319-1190375

Modified: qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Fri Oct 28 15:52:13 2011
@@ -256,7 +256,7 @@ public class ServerConfigurationTest ext
         assertEquals(false, _serverConfig.getManagementSSLEnabled());
     }
 
-    public void testGetManagementKeyStorePassword() throws ConfigurationException
+    public void testGetManagementKeystorePassword() throws ConfigurationException
     {
         // Check default
         _serverConfig.initialise();
@@ -534,43 +534,57 @@ public class ServerConfigurationTest ext
         assertEquals("10", _serverConfig.getSSLPorts().get(0));
     }
 
-    public void testGetKeystorePath() throws ConfigurationException
+    public void testGetConnectorKeystorePath() throws ConfigurationException
     {
         // Check default
         _serverConfig.initialise();
-        assertNull(_serverConfig.getKeystorePath());
+        assertNull(_serverConfig.getConnectorKeyStorePath());
 
         // Check value we set
-        _config.setProperty("connector.ssl.keystorePath", "a");
+        _config.setProperty("connector.ssl.keyStorePath", "a");
         _serverConfig = new ServerConfiguration(_config);
         _serverConfig.initialise();
-        assertEquals("a", _serverConfig.getKeystorePath());
+        assertEquals("a", _serverConfig.getConnectorKeyStorePath());
+
+        // Ensure we continue to support the old name keystorePath
+        _config.clearProperty("connector.ssl.keyStorePath");
+        _config.setProperty("connector.ssl.keystorePath", "b");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals("b", _serverConfig.getConnectorKeyStorePath());
     }
 
-    public void testGetKeystorePassword() throws ConfigurationException
+    public void testGetConnectorKeystorePassword() throws ConfigurationException
     {
         // Check default
         _serverConfig.initialise();
-        assertNull(_serverConfig.getKeystorePassword());
+        assertNull(_serverConfig.getConnectorKeyStorePassword());
 
         // Check value we set
-        _config.setProperty("connector.ssl.keystorePassword", "a");
+        _config.setProperty("connector.ssl.keyStorePassword", "a");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals("a", _serverConfig.getConnectorKeyStorePassword());
+
+        // Ensure we continue to support the old name keystorePassword
+        _config.clearProperty("connector.ssl.keyStorePassword");
+        _config.setProperty("connector.ssl.keystorePassword", "b");
         _serverConfig = new ServerConfiguration(_config);
         _serverConfig.initialise();
-        assertEquals("a", _serverConfig.getKeystorePassword());
+        assertEquals("b", _serverConfig.getConnectorKeyStorePassword());
     }
 
-    public void testGetCertType() throws ConfigurationException
+    public void testGetConnectorCertType() throws ConfigurationException
     {
         // Check default
         _serverConfig.initialise();
-        assertEquals("SunX509", _serverConfig.getCertType());
+        assertEquals("SunX509", _serverConfig.getConnectorCertType());
 
         // Check value we set
         _config.setProperty("connector.ssl.certType", "a");
         _serverConfig = new ServerConfiguration(_config);
         _serverConfig.initialise();
-        assertEquals("a", _serverConfig.getCertType());
+        assertEquals("a", _serverConfig.getConnectorCertType());
     }
 
     public void testGetUseBiasedWrites() throws ConfigurationException

Modified: qpid/branches/qpid-3346/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/build.xml?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/build.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/build.xml Fri Oct 28 15:52:13 2011
@@ -22,8 +22,6 @@
 
   <import file="common.xml"/>
 
-  <property file="${project.root}/build.overrides"/>
-
   <findSubProjects name="broker-plugins" dir="broker-plugins"/>
   <findSubProjects name="client-plugins" dir="client-plugins"/>
   <findSubProjects name="management" dir="management" excludes="common,example"/>

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Oct 28 15:52:13 2011
@@ -175,6 +175,10 @@ public class AMQConnection extends Close
     // new amqp-0-10 encoded format.
     private boolean _useLegacyMapMessageFormat;
 
+    //used to track the last failover time for
+    //Address resolution purposes
+    private volatile long _lastFailoverTime = 0;
+
     /**
      * @param broker      brokerdetails
      * @param username    username
@@ -1076,6 +1080,7 @@ public class AMQConnection extends Close
      */
     public boolean firePreFailover(boolean redirect)
     {
+        _lastFailoverTime = System.currentTimeMillis();
         boolean proceed = true;
         if (_connectionListener != null)
         {
@@ -1462,4 +1467,9 @@ public class AMQConnection extends Close
             }
         }
     }
+
+    public long getLastFailoverTime()
+    {
+         return _lastFailoverTime;
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Oct 28 15:52:13 2011
@@ -281,24 +281,29 @@ public class AMQConnectionDelegate_0_10 
         {
             _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
 
-            try
+            _qpidConnection.notifyFailoverRequired();
+
+            synchronized (_conn.getFailoverMutex())
             {
-                if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+                try
                 {
-                    _conn.failoverPrep();
-                    _conn.resubscribeSessions();
-                    _conn.fireFailoverComplete();
-                    return;
+                    if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+                    {
+                        _conn.failoverPrep();
+                        _conn.resubscribeSessions();
+                        _conn.fireFailoverComplete();
+                        return;
+                    }
+                }
+                catch (Exception e)
+                {
+                    _logger.error("error during failover", e);
+                }
+                finally
+                {
+                    _conn.getProtocolHandler().getFailoverLatch().countDown();
+                    _conn.getProtocolHandler().setFailoverLatch(null);
                 }
-            }
-            catch (Exception e)
-            {
-                _logger.error("error during failover", e);
-            }
-            finally
-            {
-                _conn.getProtocolHandler().getFailoverLatch().countDown();
-                _conn.getProtocolHandler().setFailoverLatch(null);
             }
         }
 
@@ -324,6 +329,18 @@ public class AMQConnectionDelegate_0_10 
 
     public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
     {
+        if (_conn.isFailingOver())
+        {
+            try
+            {
+                _conn.blockUntilNotFailingOver();
+            }
+            catch (InterruptedException e)
+            {
+                //ignore
+            }
+        }
+
         try
         {
             return operation.execute();

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Fri Oct 28 15:52:13 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.Destination;
 import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination imp
 
     private boolean _browseOnly;
     
-    private boolean _isAddressResolved;
+    private AtomicLong _addressResolved = new AtomicLong(0);
 
     private AMQShortString _queueName;
 
@@ -77,7 +78,7 @@ public abstract class AMQDestination imp
     public static final int QUEUE_TYPE = 1;
     public static final int TOPIC_TYPE = 2;
     public static final int UNKNOWN_TYPE = 3;
-    
+
     // ----- Fields required to support new address syntax -------
     
     public enum DestSyntax {        
@@ -740,12 +741,12 @@ public abstract class AMQDestination imp
     
     public boolean isAddressResolved()
     {
-        return _isAddressResolved;
+        return _addressResolved.get() > 0;
     }
 
-    public void setAddressResolved(boolean addressResolved)
+    public void setAddressResolved(long addressResolved)
     {
-        _isAddressResolved = addressResolved;
+        _addressResolved.set(addressResolved);
     }
     
     private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination imp
         dest.setTargetNode(_targetNode);
         dest.setSourceNode(_sourceNode);
         dest.setLink(_link);
-        dest.setAddressResolved(_isAddressResolved);
+        dest.setAddressResolved(_addressResolved.get());
         return dest;        
     }
     
@@ -836,4 +837,9 @@ public abstract class AMQDestination imp
     {
         _isDurable = b;
     }
+
+    public boolean isResolvedAfter(long time)
+    {
+        return _addressResolved.get() > time;
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Oct 28 15:52:13 2011
@@ -308,7 +308,7 @@ public abstract class AMQSession<C exten
     protected final FlowControllingBlockingQueue _queue;
 
     /** Holds the highest received delivery tag. */
-    private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+    protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
     private final AtomicLong _rollbackMark = new AtomicLong(-1);
     
     /** All the not yet acknowledged message tags */
@@ -630,6 +630,7 @@ public abstract class AMQSession<C exten
         try
         {
             acknowledgeImpl();
+            markClean();
         }
         catch (TransportException e)
         {
@@ -855,6 +856,10 @@ public abstract class AMQSession<C exten
         //Check that we are clean to commit.
         if (_failedOverDirty)
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+            }
             rollback();
 
             throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -1813,9 +1818,7 @@ public abstract class AMQSession<C exten
                     suspendChannel(true);
                 }
 
-                // Let the dispatcher know that all the incomming messages
-                // should be rolled back(reject/release)
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 syncDispatchQueue();
 
@@ -2772,6 +2775,21 @@ public abstract class AMQSession<C exten
         }
     }
 
+    /**
+     * Undeclares the specified temporary queue/topic.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param amqQueue The name of the temporary destination to delete.
+     *
+     * @throws JMSException If the queue could not be deleted for any reason.
+     * @todo Be aware of possible changes to parameter order as versions change.
+     */
+    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue) throws JMSException
+    {
+        deleteQueue(amqQueue.getAMQQueueName());
+    }
+
     public abstract void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException;
 
     private long getNextProducerId()
@@ -3186,7 +3204,7 @@ public abstract class AMQSession<C exten
                     setConnectionStopped(true);
                 }
 
-                _rollbackMark.set(_highestDeliveryTag.get());
+                setRollbackMark();
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
@@ -3335,6 +3353,11 @@ public abstract class AMQSession<C exten
                 if (!(message instanceof CloseConsumerMessage)
                     && tagLE(deliveryTag, _rollbackMark.get()))
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message because delivery tag " + deliveryTag
+                                + " <= rollback mark " + _rollbackMark.get());
+                    }
                     rejectMessage(message, true);
                 }
                 else if (_usingDispatcherForCleanup)
@@ -3396,6 +3419,11 @@ public abstract class AMQSession<C exten
                 // Don't reject if we're already closing
                 if (!_closed.get())
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+                                + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+                    }
                     rejectMessage(message, true);
                 }
             }
@@ -3526,4 +3554,15 @@ public abstract class AMQSession<C exten
     {
         return ((destination instanceof AMQDestination)  && ((AMQDestination)destination).isBrowseOnly());
     }
+
+    private void setRollbackMark()
+    {
+        // Let the dispatcher know that all the incomming messages
+        // should be rolled back(reject/release)
+        _rollbackMark.set(_highestDeliveryTag.get());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+        }
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Oct 28 15:52:13 2011
@@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    void messageAcknowledge(RangeSet ranges, boolean accept)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept)
     {
         messageAcknowledge(ranges,accept,false);
     }
     
-    void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+    void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
     {
-        Session ssn = getQpidSession();
-        for (Range range : ranges)
+        final Session ssn = getQpidSession();
+        flushProcessed(ranges,accept);
+        if (accept)
         {
-            ssn.processed(range);
+            ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
         }
-        ssn.flushProcessed(accept ? BATCH : NONE);
-        if (accept)
+    }
+
+    /**
+     * Flush any outstanding commands. This causes session complete to be sent.
+     * @param ranges the range of command ids.
+     * @param batch true if batched.
+     */
+    void flushProcessed(final RangeSet ranges, final boolean batch)
+    {
+        final Session ssn = getQpidSession();
+        for (final Range range : ranges)
         {
-            ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+            ssn.processed(range);
         }
+        ssn.flushProcessed(batch ? BATCH : NONE);
     }
 
     /**
@@ -1168,8 +1179,8 @@ public class AMQSession_0_10 extends AMQ
                                               boolean isConsumer,
                                               boolean noWait) throws AMQException
     {
-        if (dest.isAddressResolved())
-        {           
+        if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+        {
             if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) 
             {
                 createSubscriptionQueue(dest);
@@ -1258,7 +1269,7 @@ public class AMQSession_0_10 extends AMQ
                             "The name '" + dest.getAddressName() +
                             "' supplied in the address doesn't resolve to an exchange or a queue");
             }
-            dest.setAddressResolved(true);
+            dest.setAddressResolved(System.currentTimeMillis());
         }
     }
     
@@ -1378,4 +1389,15 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().sync();
         }
     }
+
+    @Override
+    void resubscribe() throws AMQException
+    {
+        // Also reset the delivery tag tracker, to insure we dont
+        // return the first <total number of msgs received on session>
+        // messages sent by the brokers following the first rollback
+        // after failover
+        _highestDeliveryTag.set(-1);
+        super.resubscribe();
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Oct 28 15:52:13 2011
@@ -82,7 +82,6 @@ import org.slf4j.LoggerFactory;
 
 public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
-
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
@@ -92,7 +91,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param acknowledgeMode         The acknowledgement mode for the session.
      * @param messageFactoryRegistry  The message factory factory for the session.
      * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
@@ -110,7 +109,7 @@ public final class AMQSession_0_8 extend
      * @param con                     The connection on which to create the session.
      * @param channelId               The unique identifier for the session.
      * @param transacted              Indicates whether or not the session is transactional.
-     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param acknowledgeMode         The acknowledgement mode for the session.
      * @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
      * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
      */
@@ -169,7 +168,7 @@ public final class AMQSession_0_8 extend
         // we also need to check the state manager for 08/09 as the
         // _connection variable may not be updated in time by the error receiving
         // thread.
-        // We can't close the session if we are alreadying in the process of
+        // We can't close the session if we are already in the process of
         // closing/closed the connection.
                 
         if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
@@ -605,6 +604,18 @@ public final class AMQSession_0_8 extend
         
     }
 
+    @Override
+    protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
+            throws JMSException
+    {
+        // Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
+        // This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
+        // client explicitly deletes it.
+
+        /* intentional no-op */
+    }
+
     public boolean isQueueBound(String exchangeName, String queueName,
             String bindingKey, Map<String, Object> args) throws JMSException
     {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Fri Oct 28 15:52:13 2011
@@ -20,14 +20,13 @@
  */
 package org.apache.qpid.client;
 
+import java.util.UUID;
+
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
 import org.apache.qpid.framing.AMQShortString;
 
-import java.util.Random;
-import java.util.UUID;
-
 /** AMQ implementation of a TemporaryQueue. */
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
@@ -50,11 +49,15 @@ final class AMQTemporaryQueue extends AM
         {
             throw new JMSException("Temporary Queue has consumers so cannot be deleted");
         }
-        _deleted = true;
 
-        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
-        // by the server when there are no more subscriptions to that queue.  This is probably not
-        // quite right for JMSCompliance.
+        try
+        {
+            _session.deleteTemporaryDestination(this);
+        }
+        finally
+        {
+            _deleted = true;
+        }
     }
 
     public AMQSession getSession()

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Oct 28 15:52:13 2011
@@ -53,10 +53,14 @@ class AMQTemporaryTopic extends AMQTopic
             throw new JMSException("Temporary Topic has consumers so cannot be deleted");
         }
 
-        _deleted = true;
-        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
-        // by the server when there are no more subscriptions to that queue.  This is probably not
-        // quite right for JMSCompliance.
+        try
+        {
+            _session.deleteTemporaryDestination(this);
+        }
+        finally
+        {
+            _deleted = true;
+        }
     }
 
     public AMQSession getSession()

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Oct 28 15:52:13 2011
@@ -150,13 +150,20 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (isMessageListenerSet() && capacity == 0)
                 {
-                    _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                              MessageCreditUnit.MESSAGE, 1,
-                                                              Option.UNRELIABLE);
+                    messageFlow();
                 }
                 _logger.debug("messageOk, trying to notify");
                 super.notifyMessage(jmsMessage);
             }
+            else
+            {
+                // if we are synchronously waiting for a message
+                // and messages are not pre-fetched we then need to request another one
+                if(capacity == 0)
+                {
+                   messageFlow();
+                }
+            }
         }
         catch (AMQException e)
         {
@@ -245,6 +252,7 @@ public class BasicMessageConsumer_0_10 e
             _logger.debug("messageOk " + messageOk);
             _logger.debug("_preAcquire " + _preAcquire);
         }
+
         if (!messageOk)
         {
             if (_preAcquire)
@@ -261,19 +269,12 @@ public class BasicMessageConsumer_0_10 e
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Message not OK, releasing");
+                    _logger.debug("filterMessage - not ack'ing message as not acquired");
                 }
-                releaseMessage(message);
-            }
-            // if we are syncrhonously waiting for a message
-            // and messages are not prefetched we then need to request another one
-            if(capacity == 0)
-            {
-               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                         MessageCreditUnit.MESSAGE, 1,
-                                                         Option.UNRELIABLE);
+                flushUnwantedMessage(message);
             }
         }
+
         // now we need to acquire this message if needed
         // this is the case of queue with a message selector set
         if (!_preAcquire && messageOk && !isNoConsume())
@@ -285,6 +286,7 @@ public class BasicMessageConsumer_0_10 e
             messageOk = acquireMessage(message);
             _logger.debug("filterMessage - message acquire status : " + messageOk);
         }
+
         return messageOk;
     }
 
@@ -295,38 +297,38 @@ public class BasicMessageConsumer_0_10 e
      * @param message The message to be acknowledged
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+    private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
     {
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.messageAcknowledge
-                (ranges,
-                 _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.messageAcknowledge
+            (ranges,
+             _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
 
-            AMQException amqe = _0_10session.getCurrentException();
-            if (amqe != null)
-            {
-                throw amqe;
-            }
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
+        {
+            throw amqe;
         }
     }
 
     /**
-     * Release a message
+     * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+     * processed to ensure their AMQP command-id is marked completed.
      *
-     * @param message The message to be released
-     * @throws AMQException If the message cannot be released due to some internal error.
+     * @param message The unwanted message to be flushed
+     * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
      */
-    private void releaseMessage(AbstractJMSMessage message) throws AMQException
+    private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
     {
-        if (_preAcquire)
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
+        _0_10session.flushProcessed(ranges,false);
+
+        final AMQException amqe = _0_10session.getCurrentException();
+        if (amqe != null)
         {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
-            _0_10session.getQpidSession().messageRelease(ranges);
-            _0_10session.sync();
+            throw amqe;
         }
     }
 
@@ -337,25 +339,28 @@ public class BasicMessageConsumer_0_10 e
      * @return true if the message has been acquired, false otherwise.
      * @throws AMQException If the message cannot be acquired due to some internal error.
      */
-    private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+    private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
     {
         boolean result = false;
-        if (!_preAcquire)
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add((int) message.getDeliveryTag());
+        final RangeSet ranges = new RangeSet();
+        ranges.add((int) message.getDeliveryTag());
 
-            Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+        final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
 
-            RangeSet acquired = acq.getTransfers();
-            if (acquired != null && acquired.size() > 0)
-            {
-                result = true;
-            }
+        final RangeSet acquired = acq.getTransfers();
+        if (acquired != null && acquired.size() > 0)
+        {
+            result = true;
         }
         return result;
     }
 
+    private void messageFlow()
+    {
+        _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+                                                  MessageCreditUnit.MESSAGE, 1,
+                                                  Option.UNRELIABLE);
+    }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
@@ -364,9 +369,7 @@ public class BasicMessageConsumer_0_10 e
         {
             if (messageListener != null && capacity == 0)
             {
-                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                          MessageCreditUnit.MESSAGE, 1,
-                                                          Option.UNRELIABLE);
+                messageFlow();
             }
             if (messageListener != null && !_synchronousQueue.isEmpty())
             {
@@ -389,9 +392,7 @@ public class BasicMessageConsumer_0_10 e
     {
         if (_0_10session.isStarted() && _syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow
-                (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
-                 Option.UNRELIABLE);
+            messageFlow();
         }
     }
 
@@ -412,9 +413,7 @@ public class BasicMessageConsumer_0_10 e
         }
         if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
-                                                      MessageCreditUnit.MESSAGE, 1,
-                                                      Option.UNRELIABLE);
+            messageFlow();
         }
         Object o = super.getMessageFromQueue(l);
         if (o == null && _0_10session.isStarted())

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Oct 28 15:52:13 2011
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 e
         }
         catch (Exception e)
         {
-            JMSException jmse = new JMSException("Exception when sending message");
+            JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
             jmse.setLinkedException(e);
             jmse.initCause(e);
             throw jmse;

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Fri Oct 28 15:52:13 2011
@@ -24,13 +24,16 @@ package org.apache.qpid.client;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 
+import org.apache.qpid.framing.AMQShortString;
+
 /**
- * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * Provides support for convenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
  * so that operations related to their "temporary-ness" can be abstracted out.
  */
 interface TemporaryDestination extends Destination
 {
 
+    public AMQShortString getAMQQueueName();
     public void delete() throws JMSException;
     public AMQSession getSession();
     public boolean isDeleted();

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 28 15:52:13 2011
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolV
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -362,7 +363,15 @@ public class AMQProtocolSession implemen
 
     public void closeProtocolSession() throws AMQException
     {
-        _protocolHandler.closeConnection(0);
+        try
+        {
+            _protocolHandler.getNetworkConnection().close();
+        }
+        catch(TransportException e)
+        {
+            //ignore such exceptions, they were already logged
+            //and this is a forcible close.
+        }
     }
 
     public void failover(String host, int port)

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Fri Oct 28 15:52:13 2011
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implement
     public JMSSelectorFilter(String selector) throws AMQInternalException
     {
         _selector = selector;
-        if (JMSSelectorFilter._logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+            _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
         }
         _matcher = new SelectorParser().parse(selector);
     }
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implement
         try
         {
             boolean match = _matcher.matches(message);
-            if (JMSSelectorFilter._logger.isDebugEnabled())
+            if (_logger.isDebugEnabled())
             {
-                JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
+                _logger.debug(message + " match(" + match + ") selector(" + System
                         .identityHashCode(_selector) + "):" + _selector);
             }
             return match;
         }
         catch (AMQInternalException e)
         {
-            JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message  " + message, e);
+            _logger.warn("Caught exception when evaluating message selector for message  " + message, e);
         }
         return false;
     }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1190406&r1=1190405&r2=1190406&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Oct 28 15:52:13 2011
@@ -526,10 +526,6 @@ public class Connection extends Connecti
     {
         synchronized (lock)
         {
-            for (Session ssn : channels.values())
-            {
-                ssn.closeCode(close);
-            }
             ConnectionCloseCode code = close.getReplyCode();
             if (code != ConnectionCloseCode.NORMAL)
             {
@@ -705,4 +701,13 @@ public class Connection extends Connecti
     {
         return sessions.containsKey(new Binary(name.getBytes()));
     }
+
+    public void notifyFailoverRequired()
+    {
+        List<Session> values = new ArrayList<Session>(channels.values());
+        for (Session ssn : values)
+        {
+            ssn.notifyFailoverRequired();
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org