You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sy...@apache.org on 2020/07/31 14:29:00 UTC
[zookeeper] branch branch-3.6 updated: ZOOKEEPER-3885: add locking
for watchers hashtables
This is an automated email from the ASF dual-hosted git repository.
symat pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.6 by this push:
new 07400df ZOOKEEPER-3885: add locking for watchers hashtables
07400df is described below
commit 07400df7f882515c7a7979db393b586fe56efb97
Author: Tudor Bosman <tu...@rockset.com>
AuthorDate: Fri Jul 31 13:42:18 2020 +0000
ZOOKEEPER-3885: add locking for watchers hashtables
See the comments in the JIRA issue.
Author: Tudor Bosman <tu...@rockset.com>
Reviewers: Mate Szalay-Beko <sy...@apache.org>, Damien Diederen <dd...@crosstwine.com>, Enrico Olivelli <eo...@apache.org>
Closes #1403 from tudor/htlocking1
(cherry picked from commit b776b2360ac282fc4eef1e86fcf185d7a6c3eae5)
Signed-off-by: Mate Szalay-Beko <sy...@apache.org>
---
.../zookeeper-client-c/src/mt_adaptor.c | 24 ++++++++++++++++++++--
.../zookeeper-client-c/src/st_adaptor.c | 10 +++++++++
.../zookeeper-client-c/src/zk_adaptor.h | 5 +++++
.../zookeeper-client-c/src/zookeeper.c | 12 +++++++++++
4 files changed, 49 insertions(+), 2 deletions(-)
diff --git a/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
index 38cced4..668c2a0 100644
--- a/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
+++ b/zookeeper-client/zookeeper-client-c/src/mt_adaptor.c
@@ -256,12 +256,13 @@ int adaptor_init(zhandle_t *zh)
pthread_mutex_init(&zh->to_process.lock,0);
pthread_mutex_init(&adaptor_threads->zh_lock,0);
pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
- // to_send must be recursive mutex
+ pthread_mutex_init(&adaptor_threads->watchers_lock,0);
+ // to_send must be recursive mutex
pthread_mutexattr_init(&recursive_mx_attr);
pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr);
pthread_mutexattr_destroy(&recursive_mx_attr);
-
+
pthread_mutex_init(&zh->sent_requests.lock,0);
pthread_cond_init(&zh->sent_requests.cond,0);
pthread_mutex_init(&zh->completions_to_process.lock,0);
@@ -530,6 +531,25 @@ int unlock_reconfig(struct _zhandle *zh)
}
}
+int lock_watchers(struct _zhandle *zh)
+{
+ struct adaptor_threads *adaptor = zh->adaptor_priv;
+ if (adaptor) {
+ return pthread_mutex_lock(&adaptor->watchers_lock);
+ } else {
+ return 0;
+ }
+}
+int unlock_watchers(struct _zhandle *zh)
+{
+ struct adaptor_threads *adaptor = zh->adaptor_priv;
+ if (adaptor) {
+ return pthread_mutex_unlock(&adaptor->watchers_lock);
+ } else {
+ return 0;
+ }
+}
+
int enter_critical(zhandle_t* zh)
{
struct adaptor_threads *adaptor = zh->adaptor_priv;
diff --git a/zookeeper-client/zookeeper-client-c/src/st_adaptor.c b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
index ddc2582..07540b7 100644
--- a/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
+++ b/zookeeper-client/zookeeper-client-c/src/st_adaptor.c
@@ -94,6 +94,16 @@ int unlock_reconfig(struct _zhandle *zh)
return 0;
}
+int lock_watchers(struct _zhandle *zh)
+{
+ return 0;
+}
+
+int unlock_watchers(struct _zhandle *zh)
+{
+ return 0;
+}
+
int enter_critical(zhandle_t* zh)
{
return 0;
diff --git a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
index 8157472..305efbd 100644
--- a/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
+++ b/zookeeper-client/zookeeper-client-c/src/zk_adaptor.h
@@ -166,6 +166,7 @@ struct adaptor_threads {
pthread_mutex_t lock; // ... and a lock
pthread_mutex_t zh_lock; // critical section lock
pthread_mutex_t reconfig_lock; // lock for reconfiguring cluster's ensemble
+ pthread_mutex_t watchers_lock; // lock for watcher operations
#ifdef WIN32
SOCKET self_pipe[2];
#else
@@ -290,6 +291,10 @@ int zoo_unlock_auth(zhandle_t *zh);
int lock_reconfig(struct _zhandle *zh);
int unlock_reconfig(struct _zhandle *zh);
+// watchers hashtable lock
+int lock_watchers(struct _zhandle *zh);
+int unlock_watchers(struct _zhandle *zh);
+
// critical section guards
int enter_critical(zhandle_t* zh);
int leave_critical(zhandle_t* zh);
diff --git a/zookeeper-client/zookeeper-client-c/src/zookeeper.c b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
index 3ba3c52..092ec93 100644
--- a/zookeeper-client/zookeeper-client-c/src/zookeeper.c
+++ b/zookeeper-client/zookeeper-client-c/src/zookeeper.c
@@ -2002,9 +2002,11 @@ static int send_set_watches(zhandle_t *zh)
int rc;
req.relativeZxid = zh->last_zxid;
+ lock_watchers(zh);
req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
+ unlock_watchers(zh);
// return if there are no pending watches
if (!req.dataWatches.count && !req.existWatches.count &&
@@ -2850,7 +2852,9 @@ static int queue_session_event(zhandle_t *zh, int state)
}
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
+ lock_watchers(zh);
cptr->c.watcher_result = collectWatchers(zh, ZOO_SESSION_EVENT, "");
+ unlock_watchers(zh);
queue_completion(&zh->completions_to_process, cptr, 0);
if (process_async(zh->outstanding_sync)) {
process_completions(zh);
@@ -3156,7 +3160,9 @@ int zookeeper_process(zhandle_t *zh, int events)
/* We are doing a notification, so there is no pending request */
c = create_completion_entry(zh, WATCHER_EVENT_XID,-1,0,0,0,0);
c->buffer = bptr;
+ lock_watchers(zh);
c->c.watcher_result = collectWatchers(zh, type, path);
+ unlock_watchers(zh);
// We cannot free until now, otherwise path will become invalid
deallocate_WatcherEvent(&evt);
@@ -3211,8 +3217,10 @@ int zookeeper_process(zhandle_t *zh, int events)
// Update last_zxid only when it is a request response
zh->last_zxid = hdr.zxid;
}
+ lock_watchers(zh);
activateWatcher(zh, cptr->watcher, rc);
deactivateWatcher(zh, cptr->watcher_deregistration, rc);
+ unlock_watchers(zh);
if (cptr->c.void_result != SYNCHRONOUS_MARKER) {
LOG_DEBUG(LOGCALLBACK(zh), "Queueing asynchronous response");
@@ -4503,19 +4511,23 @@ static int aremove_watches(
goto done;
}
+ lock_watchers(zh);
if (!pathHasWatcher(zh, server_path, wtype, watcher, watcherCtx)) {
rc = ZNOWATCHER;
+ unlock_watchers(zh);
goto done;
}
if (local) {
removeWatchers(zh, server_path, wtype, watcher, watcherCtx);
+ unlock_watchers(zh);
#ifdef THREADED
notify_sync_completion((struct sync_completion *)data);
#endif
rc = ZOK;
goto done;
}
+ unlock_watchers(zh);
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);