You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ak...@apache.org on 2008/07/24 23:46:31 UTC
svn commit: r679557 [3/3] - in /hadoop/zookeeper/trunk/src/c: ./ include/
src/ src/hashtable/ tests/
Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.cc Thu Jul 24 14:46:30 2008
@@ -66,17 +66,26 @@
// *****************************************************************************
// watcher action implementation
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path){
- if(zh==0 || zoo_get_context(zh)==0) return;
- WatcherAction* action=(WatcherAction*)zoo_get_context(zh);
- action->setWatcherTriggered();
+void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx){
+ if(zh==0 || ctx==0) return;
+ WatcherAction* action=(WatcherAction*)ctx;
- if(type==SESSION_EVENT && state==EXPIRED_SESSION_STATE)
- action->onSessionExpired(zh);
- if(type==CHANGED_EVENT)
+ if(type==SESSION_EVENT){
+ if(state==EXPIRED_SESSION_STATE)
+ action->onSessionExpired(zh);
+ else if(state==CONNECTING_STATE)
+ action->onConnectionLost(zh);
+ else if(state==CONNECTED_STATE)
+ action->onConnectionEstablished(zh);
+ }else if(type==CHANGED_EVENT)
action->onNodeValueChanged(zh,path);
+ else if(type==DELETED_EVENT)
+ action->onNodeDeleted(zh,path);
+ else if(type==CHILD_EVENT)
+ action->onChildChanged(zh,path);
// TODO: implement for the rest of the event types
// ...
+ action->setWatcherTriggered();
}
SyncedBoolCondition WatcherAction::isWatcherTriggered() const{
return SyncedBoolCondition(triggered_,mx_);
@@ -145,6 +154,167 @@
Mock_get_xid* Mock_get_xid::mock_=0;
//******************************************************************************
+// activateWatcher mock
+
+DECLARE_WRAPPER(void,activateWatcher,(watcher_registration_t* reg, int rc))
+{
+ if(!Mock_activateWatcher::mock_){
+ CALL_REAL(activateWatcher,(reg,rc));
+ }else{
+ Mock_activateWatcher::mock_->call(reg,rc);
+ }
+}
+Mock_activateWatcher* Mock_activateWatcher::mock_=0;
+
+class ActivateWatcherWrapper: public Mock_activateWatcher{
+public:
+ ActivateWatcherWrapper():ctx_(0),activated_(false){}
+
+ virtual void call(watcher_registration_t* reg, int rc){
+ CALL_REAL(activateWatcher,(reg,rc));
+ synchronized(mx_);
+ if(reg->context==ctx_){
+ activated_=true;
+ ctx_=0;
+ }
+ }
+
+ void setContext(void* ctx){
+ synchronized(mx_);
+ ctx_=ctx;
+ activated_=false;
+ }
+
+ SyncedBoolCondition isActivated() const{
+ return SyncedBoolCondition(activated_,mx_);
+ }
+ mutable Mutex mx_;
+ void* ctx_;
+ bool activated_;
+};
+
+WatcherActivationTracker::WatcherActivationTracker():
+ wrapper_(new ActivateWatcherWrapper)
+{
+}
+
+WatcherActivationTracker::~WatcherActivationTracker(){
+ delete wrapper_;
+}
+
+void WatcherActivationTracker::track(void* ctx){
+ wrapper_->setContext(ctx);
+}
+
+SyncedBoolCondition WatcherActivationTracker::isWatcherActivated() const{
+ return wrapper_->isActivated();
+}
+
+//******************************************************************************
+//
+DECLARE_WRAPPER(void,deliverWatchers,(zhandle_t* zh,int type,int state, const char* path))
+{
+ if(!Mock_deliverWatchers::mock_){
+ CALL_REAL(deliverWatchers,(zh,type,state,path));
+ }else{
+ Mock_deliverWatchers::mock_->call(zh,type,state,path);
+ }
+}
+
+Mock_deliverWatchers* Mock_deliverWatchers::mock_=0;
+
+struct RefCounterValue{
+ RefCounterValue(zhandle_t* const& zh,int32_t expectedCounter,Mutex& mx):
+ zh_(zh),expectedCounter_(expectedCounter),mx_(mx){}
+ bool operator()() const{
+ {
+ synchronized(mx_);
+ if(zh_==0)
+ return false;
+ }
+ return inc_ref_counter(zh_,0)==expectedCounter_;
+ }
+ zhandle_t* const& zh_;
+ int32_t expectedCounter_;
+ Mutex& mx_;
+};
+
+
+class DeliverWatchersWrapper: public Mock_deliverWatchers{
+public:
+ DeliverWatchersWrapper(int type,int state,bool terminate):
+ type_(type),state_(state),
+ allDelivered_(false),terminate_(terminate),zh_(0),deliveryCounter_(0){}
+ virtual void call(zhandle_t* zh,int type,int state, const char* path){
+ {
+ synchronized(mx_);
+ zh_=zh;
+ allDelivered_=false;
+ }
+ CALL_REAL(deliverWatchers,(zh,type,state,path));
+ if(type_==type && state_==state){
+ if(terminate_){
+ // prevent zhandle_t from being prematurely distroyed;
+ // this will also ensure that zookeeper_close() cleanups the thread
+ // resources by calling finish_adaptor()
+ inc_ref_counter(zh,1);
+ terminateZookeeperThreads(zh);
+ }
+ synchronized(mx_);
+ allDelivered_=true;
+ deliveryCounter_++;
+ }
+ }
+ SyncedBoolCondition isDelivered() const{
+ if(terminate_){
+ int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
+ assert(i<1000);
+ }
+ return SyncedBoolCondition(allDelivered_,mx_);
+ }
+ void resetDeliveryCounter(){
+ synchronized(mx_);
+ deliveryCounter_=0;
+ }
+ SyncedIntegerEqual deliveryCounterEquals(int expected) const{
+ if(terminate_){
+ int i=ensureCondition(RefCounterValue(zh_,1,mx_),1000);
+ assert(i<1000);
+ }
+ return SyncedIntegerEqual(deliveryCounter_,expected,mx_);
+ }
+ int type_;
+ int state_;
+ mutable Mutex mx_;
+ bool allDelivered_;
+ bool terminate_;
+ zhandle_t* zh_;
+ int deliveryCounter_;
+};
+
+WatcherDeliveryTracker::WatcherDeliveryTracker(
+ int type,int state,bool terminateCompletionThread):
+ deliveryWrapper_(new DeliverWatchersWrapper(
+ type,state,terminateCompletionThread)){
+}
+
+WatcherDeliveryTracker::~WatcherDeliveryTracker(){
+ delete deliveryWrapper_;
+}
+
+SyncedBoolCondition WatcherDeliveryTracker::isWatcherProcessingCompleted() const{
+ return deliveryWrapper_->isDelivered();
+}
+
+void WatcherDeliveryTracker::resetDeliveryCounter(){
+ deliveryWrapper_->resetDeliveryCounter();
+}
+
+SyncedIntegerEqual WatcherDeliveryTracker::deliveryCounterEquals(int expected) const{
+ return deliveryWrapper_->deliveryCounterEquals(expected);
+}
+
+//******************************************************************************
//
string HandshakeResponse::toString() const {
string buf;
@@ -185,6 +355,45 @@
return res;
}
+string ZooStatResponse::toString() const{
+ oarchive* oa=create_buffer_oarchive();
+
+ ReplyHeader h = {xid_,1,rc_};
+ serialize_ReplyHeader(oa, "hdr", &h);
+
+ SetDataResponse resp;
+ resp.stat=stat_;
+ serialize_SetDataResponse(oa, "reply", &resp);
+ int32_t len=htonl(get_buffer_len(oa));
+ string res((char*)&len,sizeof(len));
+ res.append(get_buffer(oa),get_buffer_len(oa));
+
+ close_buffer_oarchive(&oa,1);
+ return res;
+}
+
+string ZooGetChildrenResponse::toString() const{
+ oarchive* oa=create_buffer_oarchive();
+
+ ReplyHeader h = {xid_,1,rc_};
+ serialize_ReplyHeader(oa, "hdr", &h);
+
+ GetChildrenResponse resp;
+ // populate the string vector
+ allocate_String_vector(&resp.children,strings_.size());
+ for(int i=0;i<(int)strings_.size();++i)
+ resp.children.data[i]=strdup(strings_[i].c_str());
+ serialize_GetChildrenResponse(oa, "reply", &resp);
+ deallocate_GetChildrenResponse(&resp);
+
+ int32_t len=htonl(get_buffer_len(oa));
+ string res((char*)&len,sizeof(len));
+ res.append(get_buffer(oa),get_buffer_len(oa));
+
+ close_buffer_oarchive(&oa,1);
+ return res;
+}
+
string ZNodeEvent::toString() const{
oarchive* oa=create_buffer_oarchive();
struct WatcherEvent evt = {type_,0,(char*)path_.c_str()};
@@ -219,7 +428,7 @@
// Zookeeper server simulator
//
bool ZookeeperServer::hasMoreRecv() const{
- return recvHasMore.get()!=0;
+ return recvHasMore.get()!=0 || connectionLost;
}
ssize_t ZookeeperServer::callRecv(int s,void *buf,size_t len,int flags){
@@ -295,3 +504,8 @@
gettimeofday(&zh->last_recv,0);
gettimeofday(&zh->last_send,0);
}
+
+void terminateZookeeperThreads(zhandle_t* zh){
+ // this will cause the zookeeper threads to terminate
+ zh->close_requested=1;
+}
Modified: hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h (original)
+++ hadoop/zookeeper/trunk/src/c/tests/ZKMocks.h Thu Jul 24 14:46:30 2008
@@ -32,6 +32,11 @@
// Async API tests!
void forceConnected(zhandle_t* zh);
+/**
+ * Gracefully terminates zookeeper I/O and completion threads.
+ */
+void terminateZookeeperThreads(zhandle_t* zh);
+
// *****************************************************************************
// Abstract watcher action
struct SyncedBoolCondition;
@@ -42,7 +47,11 @@
virtual ~WatcherAction(){}
virtual void onSessionExpired(zhandle_t*){}
+ virtual void onConnectionEstablished(zhandle_t*){}
+ virtual void onConnectionLost(zhandle_t*){}
virtual void onNodeValueChanged(zhandle_t*,const char* path){}
+ virtual void onNodeDeleted(zhandle_t*,const char* path){}
+ virtual void onChildChanged(zhandle_t*,const char* path){}
SyncedBoolCondition isWatcherTriggered() const;
void setWatcherTriggered(){
@@ -57,7 +66,7 @@
// zh->context is a pointer to a WatcherAction instance
// based on the event type and state, the watcher calls a specific watcher
// action method
-void activeWatcher(zhandle_t *zh, int type, int state, const char *path);
+void activeWatcher(zhandle_t *zh, int type, int state, const char *path,void* ctx);
// *****************************************************************************
// a set of async completion signatures
@@ -104,6 +113,7 @@
zhandle_t* zh_;
};
+// a synchronized boolean condition
struct SyncedBoolCondition{
SyncedBoolCondition(const bool& cond,Mutex& mx):cond_(cond),mx_(mx){}
bool operator()() const{
@@ -113,6 +123,20 @@
const bool& cond_;
Mutex& mx_;
};
+
+// a synchronized integer comparison
+struct SyncedIntegerEqual{
+ SyncedIntegerEqual(const int& cond,int expected,Mutex& mx):
+ cond_(cond),expected_(expected),mx_(mx){}
+ bool operator()() const{
+ synchronized(mx_);
+ return cond_==expected_;
+ }
+ const int& cond_;
+ const int expected_;
+ Mutex& mx_;
+};
+
// *****************************************************************************
// make sure to call zookeeper_close() even in presence of exceptions
struct CloseFinally{
@@ -158,7 +182,6 @@
// *****************************************************************************
// flush_send_queue
-
class Mock_flush_send_queue: public Mock
{
public:
@@ -177,7 +200,6 @@
// *****************************************************************************
// get_xid
-
class Mock_get_xid: public Mock
{
public:
@@ -194,6 +216,57 @@
};
// *****************************************************************************
+// activateWatcher
+class Mock_activateWatcher: public Mock{
+public:
+ Mock_activateWatcher(){mock_=this;}
+ virtual ~Mock_activateWatcher(){mock_=0;}
+
+ virtual void call(watcher_registration_t* reg, int rc){}
+ static Mock_activateWatcher* mock_;
+};
+
+class ActivateWatcherWrapper;
+class WatcherActivationTracker{
+public:
+ WatcherActivationTracker();
+ ~WatcherActivationTracker();
+
+ void track(void* ctx);
+ SyncedBoolCondition isWatcherActivated() const;
+private:
+ ActivateWatcherWrapper* wrapper_;
+};
+
+// *****************************************************************************
+// deliverWatchers
+class Mock_deliverWatchers: public Mock{
+public:
+ Mock_deliverWatchers(){mock_=this;}
+ virtual ~Mock_deliverWatchers(){mock_=0;}
+
+ virtual void call(zhandle_t* zh,int type,int state, const char* path){}
+ static Mock_deliverWatchers* mock_;
+};
+
+class DeliverWatchersWrapper;
+class WatcherDeliveryTracker{
+public:
+ // filters deliveries by state and type
+ WatcherDeliveryTracker(int type,int state,bool terminateCompletionThread=true);
+ ~WatcherDeliveryTracker();
+
+ // if the thread termination requested (see the ctor params)
+ // this function will wait for the I/O and completion threads to
+ // terminate before returning a SyncBoolCondition instance
+ SyncedBoolCondition isWatcherProcessingCompleted() const;
+ void resetDeliveryCounter();
+ SyncedIntegerEqual deliveryCounterEquals(int expected) const;
+private:
+ DeliverWatchersWrapper* deliveryWrapper_;
+};
+
+// *****************************************************************************
// a zookeeper Stat wrapper
struct NodeStat: public Stat
{
@@ -220,6 +293,8 @@
virtual ~Response(){}
virtual void setXID(int32_t xid){}
+ // this method is used by the ZookeeperServer class to serialize
+ // the instance of Response
virtual std::string toString() const =0;
};
@@ -259,6 +334,41 @@
Stat stat_;
};
+// zoo_exists(), zoo_set() response
+class ZooStatResponse: public Response
+{
+public:
+ ZooStatResponse(int32_t xid=0,int rc=ZOK,const Stat& stat=NodeStat())
+ :xid_(xid),rc_(rc),stat_(stat)
+ {
+ }
+ virtual std::string toString() const;
+ virtual void setXID(int32_t xid) {xid_=xid;}
+
+private:
+ int32_t xid_;
+ int rc_;
+ Stat stat_;
+};
+
+// zoo_get_children()
+class ZooGetChildrenResponse: public Response
+{
+public:
+ typedef std::vector<std::string> StringVector;
+ ZooGetChildrenResponse(const StringVector& v,int rc=ZOK):
+ xid_(0),strings_(v),rc_(rc)
+ {
+ }
+
+ virtual std::string toString() const;
+ virtual void setXID(int32_t xid) {xid_=xid;}
+
+ int32_t xid_;
+ StringVector strings_;
+ int rc_;
+};
+
// PING response
class PingResponse: public Response
{
@@ -326,12 +436,12 @@
// this is a trigger that gets reset back to false
// a connect request will return a non-matching session id thus causing
// the client throw SESSION_EXPIRED
- bool sessionExpired;
+ volatile bool sessionExpired;
void returnSessionExpired(){ sessionExpired=true; }
- // this is a trigger that gets reset back to false
+ // this is a one shot trigger that gets reset back to false
// next recv call will return 0 length, thus simulating a connecton loss
- bool connectionLost;
+ volatile bool connectionLost;
void setConnectionLost() {connectionLost=true;}
// recv
Modified: hadoop/zookeeper/trunk/src/c/tests/wrappers.opt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/wrappers.opt?rev=679557&r1=679556&r2=679557&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/wrappers.opt (original)
+++ hadoop/zookeeper/trunk/src/c/tests/wrappers.opt Thu Jul 24 14:46:30 2008
@@ -2,3 +2,5 @@
-Wl,--wrap -Wl,free
-Wl,--wrap -Wl,flush_send_queue
-Wl,--wrap -Wl,get_xid
+-Wl,--wrap -Wl,deliverWatchers
+-Wl,--wrap -Wl,activateWatcher