You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/22 15:09:34 UTC
svn commit: r707065 - in /incubator/qpid/trunk/qpid/cpp: examples/failover/
src/ src/qpid/client/ src/qpid/sys/
Author: aconway
Date: Wed Oct 22 06:09:33 2008
New Revision: 707065
URL: http://svn.apache.org/viewvc?rev=707065&view=rev
Log:
QPID-1382 from Mick Goulish: Improvement to Client-Side Cluster Failover code
Also:
Fix missing DispatchHandle.h include in sys/PollableQueue.h
Added ignore properties for failover example binaries & Makefile.
Modified:
incubator/qpid/trunk/qpid/cpp/examples/failover/ (props changed)
incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
incubator/qpid/trunk/qpid/cpp/src/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
Propchange: incubator/qpid/trunk/qpid/cpp/examples/failover/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Oct 22 06:09:33 2008
@@ -1 +1,5 @@
Makefile.in
+listener
+Makefile
+declare_queues
+direct_producer
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Wed Oct 22 06:09:33 2008
@@ -60,6 +60,8 @@
if ( count > 1000 )
report = !(sent % 1000);
+ report = false;
+
if ( report )
{
std::cout << "sending message "
@@ -71,7 +73,7 @@
message_data << sent;
message.setData(message_data.str());
- /* MICK FIXME
+ /* FIXME mgoulish 21 oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
); */
@@ -85,7 +87,7 @@
}
message.setData ( "That's all, folks!" );
- /* FIXME mgoulish 16 Oct 08
+ /* FIXME mgoulish 21 oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
);
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Wed Oct 22 06:09:33 2008
@@ -64,8 +64,10 @@
void
Listener::received ( Message & message )
{
+ /*
if(! (count%1000))
std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+ * */
++ count;
Propchange: incubator/qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Oct 22 06:09:33 2008
@@ -18,3 +18,5 @@
managementgen.mk
mgen.timestamp
rgen.timestamp
+RdmaClient
+RdmaServer
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Oct 22 06:09:33 2008
@@ -151,8 +151,6 @@
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
- Mutex::ScopedLock l(lock);
-
if ( failureCallback )
failureCallback();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Wed Oct 22 06:09:33 2008
@@ -129,7 +129,7 @@
++ sessions_iterator )
{
FailoverSession * fs = * sessions_iterator;
- fs->failover_in_progress = true;
+ fs->failoverStarting();
}
std::vector<Url> knownBrokers = connection.getKnownBrokers();
@@ -187,7 +187,7 @@
)
{
FailoverSession * fs = * sessions_iterator;
- fs->failover_in_progress = false;
+ fs->failoverComplete();
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Wed Oct 22 06:09:33 2008
@@ -38,7 +38,8 @@
namespace client {
FailoverSession::FailoverSession ( ) :
- failover_in_progress(false)
+ failover_in_progress(false),
+ failover_count(0)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -50,32 +51,108 @@
}
+
framing::FrameSet::shared_ptr
FailoverSession::get()
{
+ while(1)
+ {
+ try
+ {
return session.get();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
SessionId
-FailoverSession::getId() const
+FailoverSession::getId()
{
+ while(1)
+ {
+ try
+ {
return session.getId();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::close()
{
+ while(1)
+ {
+ try
+ {
session.close();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
void
FailoverSession::sync()
{
+ while(1)
+ {
+ try
+ {
+ session.sync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
- session.sync();
}
@@ -90,16 +167,53 @@
Execution&
FailoverSession::getExecution()
{
-
+ while(1)
+ {
+ try
+ {
return session.getExecution();
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
void
FailoverSession::flush()
{
-
+ while(1)
+ {
+ try
+ {
session.flush();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -109,8 +223,27 @@
bool notifyPeer
)
{
-
+ while(1)
+ {
+ try
+ {
session.markCompleted ( id, cumulative, notifyPeer );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -120,8 +253,27 @@
void
FailoverSession::executionSync()
{
-
+ while(1)
+ {
+ try
+ {
session.executionSync();
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -131,10 +283,29 @@
const string& value
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionResult ( commandId,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -149,7 +320,10 @@
const FieldTable& errorInfo
)
{
-
+ while(1)
+ {
+ try
+ {
session.executionException ( errorCode,
commandId,
classCode,
@@ -158,6 +332,22 @@
description,
errorInfo
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -179,7 +369,7 @@
acquireMode,
content
);
- break;
+ return;
}
catch ( ... )
{
@@ -197,8 +387,27 @@
void
FailoverSession::messageAccept ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
session.messageAccept ( transfers );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -209,11 +418,30 @@
const string& text
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageReject ( transfers,
code,
text
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -223,10 +451,29 @@
bool setRedelivered
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageRelease ( transfers,
setRedelivered
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -234,8 +481,26 @@
qpid::framing::MessageAcquireResult
FailoverSession::messageAcquire ( const SequenceSet& transfers )
{
-
+ while(1)
+ {
+ try
+ {
return session.messageAcquire ( transfers );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -245,10 +510,28 @@
const string& resumeId
)
{
-
+ while(1)
+ {
+ try
+ {
return session.messageResume ( destination,
resumeId
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -264,7 +547,10 @@
const FieldTable& arguments
)
{
-
+ while(1)
+ {
+ try
+ {
session.messageSubscribe ( queue,
destination,
acceptMode,
@@ -274,6 +560,22 @@
resumeTtl,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -281,8 +583,27 @@
void
FailoverSession::messageCancel ( const string& destinations )
{
-
+ while(1)
+ {
+ try
+ {
session.messageCancel ( destinations );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
+
}
@@ -292,9 +613,28 @@
uint8_t flowMode
)
{
+ while(1)
+ {
+ try
+ {
session.messageSetFlowMode ( destination,
flowMode
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -304,10 +644,29 @@
uint8_t unit,
uint32_t value)
{
+ while(1)
+ {
+ try
+ {
session.messageFlow ( destination,
unit,
value
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -315,7 +674,26 @@
void
FailoverSession::messageFlush(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageFlush ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -323,7 +701,26 @@
void
FailoverSession::messageStop(const string& destination)
{
+ while(1)
+ {
+ try
+ {
session.messageStop ( destination );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -331,7 +728,26 @@
void
FailoverSession::txSelect()
{
+ while(1)
+ {
+ try
+ {
session.txSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -339,7 +755,26 @@
void
FailoverSession::txCommit()
{
+ while(1)
+ {
+ try
+ {
session.txCommit ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -347,7 +782,26 @@
void
FailoverSession::txRollback()
{
+ while(1)
+ {
+ try
+ {
session.txRollback ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -355,7 +809,26 @@
void
FailoverSession::dtxSelect()
{
+ while(1)
+ {
+ try
+ {
session.dtxSelect ( );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -365,10 +838,28 @@
bool join,
bool resume)
{
+ while(1)
+ {
+ try
+ {
return session.dtxStart ( xid,
join,
resume
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -378,10 +869,28 @@
bool fail,
bool suspend)
{
+ while(1)
+ {
+ try
+ {
return session.dtxEnd ( xid,
fail,
suspend
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -390,9 +899,27 @@
FailoverSession::dtxCommit(const Xid& xid,
bool onePhase)
{
+ while(1)
+ {
+ try
+ {
return session.dtxCommit ( xid,
onePhase
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -400,7 +927,26 @@
void
FailoverSession::dtxForget(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
session.dtxForget ( xid );
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -408,7 +954,25 @@
qpid::framing::DtxGetTimeoutResult
FailoverSession::dtxGetTimeout(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxGetTimeout ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -416,7 +980,25 @@
qpid::framing::XaResult
FailoverSession::dtxPrepare(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxPrepare ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -424,7 +1006,25 @@
qpid::framing::DtxRecoverResult
FailoverSession::dtxRecover()
{
+ while(1)
+ {
+ try
+ {
return session.dtxRecover ( );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -432,7 +1032,25 @@
qpid::framing::XaResult
FailoverSession::dtxRollback(const Xid& xid)
{
+ while(1)
+ {
+ try
+ {
return session.dtxRollback ( xid );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -441,9 +1059,28 @@
FailoverSession::dtxSetTimeout(const Xid& xid,
uint32_t timeout)
{
+ while(1)
+ {
+ try
+ {
session.dtxSetTimeout ( xid,
timeout
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -457,6 +1094,10 @@
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDeclare ( exchange,
type,
alternateExchange,
@@ -465,6 +1106,21 @@
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -473,9 +1129,28 @@
FailoverSession::exchangeDelete(const string& exchange,
bool ifUnused)
{
+ while(1)
+ {
+ try
+ {
session.exchangeDelete ( exchange,
ifUnused
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -483,7 +1158,25 @@
qpid::framing::ExchangeQueryResult
FailoverSession::exchangeQuery(const string& name)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeQuery ( name );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -494,11 +1187,30 @@
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.exchangeBind ( queue,
exchange,
bindingKey,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -508,10 +1220,29 @@
const string& exchange,
const string& bindingKey)
{
+ while(1)
+ {
+ try
+ {
session.exchangeUnbind ( queue,
exchange,
bindingKey
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -522,11 +1253,29 @@
const string& bindingKey,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
return session.exchangeBound ( exchange,
queue,
bindingKey,
arguments
);
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -540,6 +1289,10 @@
bool autoDelete,
const FieldTable& arguments)
{
+ while(1)
+ {
+ try
+ {
session.queueDeclare ( queue,
alternateExchange,
passive,
@@ -548,6 +1301,21 @@
autoDelete,
arguments
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -557,10 +1325,29 @@
bool ifUnused,
bool ifEmpty)
{
+ while(1)
+ {
+ try
+ {
session.queueDelete ( queue,
ifUnused,
ifEmpty
);
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -568,7 +1355,26 @@
void
FailoverSession::queuePurge(const string& queue)
{
+ while(1)
+ {
+ try
+ {
session.queuePurge ( queue) ;
+ return;
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -576,7 +1382,25 @@
qpid::framing::QueueQueryResult
FailoverSession::queueQuery(const string& queue)
{
- return session.queueQuery ( queue );
+ while(1)
+ {
+ try
+ {
+ return session.queueQuery ( queue );
+ }
+ catch ( const std::exception& error )
+ {
+ if ( ! failover_in_progress )
+ throw ( error );
+ else
+ {
+ sys::Monitor::ScopedLock l(lock);
+ int current_failover_count = failover_count;
+ while ( current_failover_count == failover_count )
+ lock.wait();
+ }
+ }
+ }
}
@@ -587,6 +1411,7 @@
void
FailoverSession::prepareForFailover ( Connection newConnection )
{
+ failover_in_progress = true;
try
{
newSession = newConnection.newSession();
@@ -603,6 +1428,24 @@
}
+void
+FailoverSession::failoverStarting ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = true;
+}
+
+
+void
+FailoverSession::failoverComplete ( )
+{
+ sys::Monitor::ScopedLock l(lock);
+ failover_in_progress = false;
+ ++ failover_count;
+ lock.notifyAll();
+}
+
+
void
FailoverSession::failover ( )
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Wed Oct 22 06:09:33 2008
@@ -35,7 +35,7 @@
#include "qpid/client/SessionImpl.h"
#include "qpid/client/TypedResult.h"
#include "qpid/shared_ptr.h"
-#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Monitor.h"
#include <string>
@@ -61,7 +61,7 @@
framing::FrameSet::shared_ptr get();
- SessionId getId() const;
+ SessionId getId();
void close();
@@ -80,8 +80,6 @@
void sendCompletion ( );
- bool failover_in_progress;
-
// Wrapped functions from Session ----------------------------
@@ -293,15 +291,18 @@
// end Wrapped functions from Session ---------------------------
// Tells the FailoverSession to get ready for a failover.
+ void failoverStarting();
void prepareForFailover ( Connection newConnection );
-
void failover ( );
+ void failoverComplete();
void setFailoverSubscriptionManager(FailoverSubscriptionManager*);
private:
- typedef sys::Mutex::ScopedLock Lock;
- sys::Mutex lock;
+ sys::Monitor lock;
+ bool failover_in_progress;
+ int failover_count;
+
FailoverSubscriptionManager * failoverSubscriptionManager;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp Wed Oct 22 06:09:33 2008
@@ -48,7 +48,6 @@
sys::Monitor::ScopedLock l(lock);
newSession = _newSession;
newSessionIsValid = true;
- // lock.notifyAll();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=707065&r1=707064&r2=707065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Wed Oct 22 06:09:33 2008
@@ -24,6 +24,7 @@
#include "qpid/sys/PollableCondition.h"
#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/DispatchHandle.h"
#include "qpid/sys/Monitor.h"
#include <boost/function.hpp>
#include <boost/bind.hpp>