You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/06/09 21:01:11 UTC
svn commit: r783096 - in /hadoop/zookeeper/trunk: CHANGES.txt
src/c/src/mt_adaptor.c src/c/src/zk_adaptor.h src/c/src/zookeeper.c
src/c/tests/TestClient.cc
Author: mahadev
Date: Tue Jun 9 19:01:11 2009
New Revision: 783096
URL: http://svn.apache.org/viewvc?rev=783096&view=rev
Log:
ZOOKEEPER-375. zoo_add_auth only retains most recent auth on re-sync. (mahadev)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c
hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h
hadoop/zookeeper/trunk/src/c/src/zookeeper.c
hadoop/zookeeper/trunk/src/c/tests/TestClient.cc
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=783096&r1=783095&r2=783096&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Jun 9 19:01:11 2009
@@ -115,6 +115,9 @@
ZOOKEEPER-435. allow "super" admin digest based auth to be configurable (phunt via breed)
+ ZOOKEEPER-375. zoo_add_auth only retains most recent auth on re-sync.
+(mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-308. improve the atomic broadcast performance 3x.
(breed via mahadev)
Modified: hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c?rev=783096&r1=783095&r2=783096&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c (original)
+++ hadoop/zookeeper/trunk/src/c/src/mt_adaptor.c Tue Jun 9 19:01:11 2009
@@ -43,11 +43,11 @@
void zoo_lock_auth(zhandle_t *zh)
{
- pthread_mutex_lock(&zh->auth.lock);
+ pthread_mutex_lock(&zh->auth_h.lock);
}
void zoo_unlock_auth(zhandle_t *zh)
{
- pthread_mutex_unlock(&zh->auth.lock);
+ pthread_mutex_unlock(&zh->auth_h.lock);
}
void lock_buffer_list(buffer_head_t *l)
{
@@ -175,7 +175,7 @@
set_nonblock(adaptor_threads->self_pipe[1]);
set_nonblock(adaptor_threads->self_pipe[0]);
- pthread_mutex_init(&zh->auth.lock,0);
+ pthread_mutex_init(&zh->auth_h.lock,0);
zh->adaptor_priv = adaptor_threads;
pthread_mutex_init(&zh->to_process.lock,0);
@@ -237,7 +237,7 @@
pthread_cond_destroy(&zh->completions_to_process.cond);
pthread_mutex_destroy(&adaptor->zh_lock);
- pthread_mutex_destroy(&zh->auth.lock);
+ pthread_mutex_destroy(&zh->auth_h.lock);
close(adaptor->self_pipe[0]);
close(adaptor->self_pipe[1]);
Modified: hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h?rev=783096&r1=783095&r2=783096&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h (original)
+++ hadoop/zookeeper/trunk/src/c/src/zk_adaptor.h Tue Jun 9 19:01:11 2009
@@ -107,9 +107,7 @@
struct buffer auth;
void_completion_t completion;
const char* data;
-#ifdef THREADED
- pthread_mutex_t lock;
-#endif
+ struct _auth_info *next;
} auth_info;
/**
@@ -156,7 +154,15 @@
int self_pipe[2];
};
#endif
-
+
+/** the auth list for adding auth */
+typedef struct _auth_list_head {
+ auth_info *auth;
+#ifdef THREADED
+ pthread_mutex_t lock;
+#endif
+} auth_list_head_t;
+
/**
* This structure represents the connection to zookeeper.
*/
@@ -187,7 +193,7 @@
char primer_storage_buffer[40]; /* the true size of primer_storage */
volatile int state;
void *context;
- struct _auth_info auth; /* authentication data */
+ auth_list_head_t auth_h; /* authentication data list */
/* zookeeper_close is not reentrant because it de-allocates the zhandler.
* This guard variable is used to defer the destruction of zhandle till
* right before top-level API call returns to the caller */
@@ -204,6 +210,7 @@
zk_hashtable* active_child_watchers;
};
+
int adaptor_init(zhandle_t *zh);
void adaptor_finish(zhandle_t *zh);
void adaptor_destroy(zhandle_t *zh);
Modified: hadoop/zookeeper/trunk/src/c/src/zookeeper.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/src/zookeeper.c?rev=783096&r1=783095&r2=783096&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/src/zookeeper.c (original)
+++ hadoop/zookeeper/trunk/src/c/src/zookeeper.c Tue Jun 9 19:01:11 2009
@@ -139,6 +139,12 @@
#define COMPLETION_ACLLIST 4
#define COMPLETION_STRING 5
+typedef struct _auth_completion_list {
+ void_completion_t completion;
+ const char *auth_data;
+ struct _auth_completion_list *next;
+} auth_completion_list_t;
+
typedef struct _completion_list {
int xid;
int completion_type; /* one of the COMPLETION_* values */
@@ -198,22 +204,123 @@
return zh->recv_timeout;
}
-static void init_auth_info(auth_info *auth)
+/** these functions are thread unsafe, so make sure that
+ zoo_lock_auth is called before you access them **/
+static auth_info* get_last_auth(auth_list_head_t *auth_list) {
+ auth_info *element;
+ element = auth_list->auth;
+ if (element == NULL) {
+ return NULL;
+ }
+ while (element->next != NULL) {
+ element = element->next;
+ }
+ return element;
+}
+
+static void free_auth_completion(auth_completion_list_t *a_list) {
+ auth_completion_list_t *tmp, *ftmp;
+ if (a_list == NULL) {
+ return;
+ }
+ tmp = a_list->next;
+ while (tmp != NULL) {
+ ftmp = tmp;
+ tmp = tmp->next;
+ ftmp->completion = NULL;
+ ftmp->auth_data = NULL;
+ free(ftmp);
+ }
+ a_list->completion = NULL;
+ a_list->auth_data = NULL;
+ a_list->next = NULL;
+ return;
+}
+
+static void add_auth_completion(auth_completion_list_t* a_list, void_completion_t* completion,
+ const char *data) {
+ auth_completion_list_t *element;
+ auth_completion_list_t *n_element;
+ element = a_list;
+ if (a_list->completion == NULL) {
+ //this is the first element
+ a_list->completion = *completion;
+ a_list->next = NULL;
+ a_list->auth_data = data;
+ return;
+ }
+ while (element->next != NULL) {
+ element = element->next;
+ }
+ n_element = (auth_completion_list_t*) malloc(sizeof(auth_completion_list_t));
+ n_element->next = NULL;
+ n_element->completion = *completion;
+ n_element->auth_data = data;
+ element->next = n_element;
+ return;
+}
+
+static void get_auth_completions(auth_list_head_t *auth_list, auth_completion_list_t *a_list) {
+ auth_info *element;
+ element = auth_list->auth;
+ if (element == NULL) {
+ return;
+ }
+ while (element) {
+ if (element->completion) {
+ add_auth_completion(a_list, &element->completion, element->data);
+ }
+ element->completion = NULL;
+ element = element->next;
+ }
+ return;
+}
+
+static void add_last_auth(auth_list_head_t *auth_list, auth_info *add_el) {
+ auth_info *element;
+ element = auth_list->auth;
+ if (element == NULL) {
+ //first element in the list
+ auth_list->auth = add_el;
+ return;
+ }
+ while (element->next != NULL) {
+ element = element->next;
+ }
+ element->next = add_el;
+ return;
+}
+
+static void init_auth_info(auth_list_head_t *auth_list)
{
- auth->scheme=NULL;
- auth->auth.buff=NULL;
- auth->auth.len=0;
- auth->state=0;
- auth->completion=0;
- auth->data=0;
-}
-
-static void free_auth_info(auth_info *auth)
-{
- if(auth->scheme!=NULL)
- free(auth->scheme);
- deallocate_Buffer(&auth->auth);
- init_auth_info(auth);
+ auth_list->auth = NULL;
+}
+
+static void mark_active_auth(zhandle_t *zh) {
+ auth_list_head_t auth_h = zh->auth_h;
+ auth_info *element;
+ if (auth_h.auth == NULL) {
+ return;
+ }
+ element = auth_h.auth;
+ while (element->next != NULL) {
+ element->state = 1;
+ element = element->next;
+ }
+}
+
+static void free_auth_info(auth_list_head_t *auth_list)
+{
+ auth_info *auth = auth_list->auth;
+ while (auth != NULL) {
+ if(auth->scheme!=NULL)
+ free(auth->scheme);
+ deallocate_Buffer(&auth->auth);
+ auth_info *old_auth = auth;
+ auth = auth->next;
+ free(old_auth);
+ }
+ init_auth_info(auth_list);
}
int is_unrecoverable(zhandle_t *zh)
@@ -265,7 +372,7 @@
free(zh->addrs);
zh->addrs = NULL;
}
- free_auth_info(&zh->auth);
+ free_auth_info(&zh->auth_h);
destroy_zk_hashtable(zh->active_node_watchers);
destroy_zk_hashtable(zh->active_exist_watchers);
destroy_zk_hashtable(zh->active_child_watchers);
@@ -507,6 +614,7 @@
zh->state = 0;
zh->context = context;
zh->recv_timeout = recv_timeout;
+ init_auth_info(&zh->auth_h);
if (watcher) {
zh->watcher = watcher;
} else {
@@ -762,8 +870,8 @@
struct oarchive *oa;
struct ReplyHeader h;
void_completion_t auth_completion = NULL;
- const char *auth_data = NULL;
-
+ auth_completion_list_t a_list, *a_tmp;
+
lock_completion_list(&zh->sent_requests);
tmp_list = zh->sent_requests;
zh->sent_requests.head = 0;
@@ -802,18 +910,21 @@
}
}
}
-
+ a_list.completion = NULL;
+ a_list.next = NULL;
zoo_lock_auth(zh);
- if (zh->auth.completion) {
- auth_completion = zh->auth.completion;
- auth_data = zh->auth.data;
- zh->auth.completion = 0;
- }
+ get_auth_completions(&zh->auth_h, &a_list);
zoo_unlock_auth(zh);
-
- if (auth_completion) {
- auth_completion(reason, auth_data);
+ a_tmp = &a_list;
+ // chain call user's completion function
+ while (a_tmp->completion != NULL) {
+ auth_completion = a_tmp->completion;
+ auth_completion(reason, a_tmp->auth_data);
+ a_tmp = a_tmp->next;
+ if (a_tmp == NULL)
+ break;
}
+ free_auth_completion(&a_list);
}
static void cleanup_bufs(zhandle_t *zh,int callCompletion,int rc)
@@ -871,8 +982,9 @@
static void auth_completion_func(int rc, zhandle_t* zh)
{
void_completion_t auth_completion = NULL;
- const char *auth_data = NULL;
-
+ auth_completion_list_t a_list;
+ auth_completion_list_t *a_tmp;
+
if(zh==NULL)
return;
@@ -881,61 +993,82 @@
if(rc!=0){
zh->state=ZOO_AUTH_FAILED_STATE;
}else{
- zh->auth.state=1; // active
- }
-
- if (zh->auth.completion) {
- auth_completion = zh->auth.completion;
- auth_data = zh->auth.data;
- zh->auth.completion=0;
+ //change state for all auths
+ mark_active_auth(zh);
}
-
+ a_list.completion = NULL;
+ a_list.next = NULL;
+ get_auth_completions(&zh->auth_h, &a_list);
zoo_unlock_auth(zh);
-
if (rc) {
LOG_ERROR(("Authentication scheme %s failed. Connection closed.",
- zh->auth.scheme));
+ zh->auth_h.auth->scheme));
}
else {
- LOG_INFO(("Authentication scheme %s succeeded", zh->auth.scheme));
+ LOG_INFO(("Authentication scheme %s succeeded", zh->auth_h.auth->scheme));
}
-
+ a_tmp = &a_list;
// chain call user's completion function
- if (auth_completion) {
- auth_completion(rc, auth_data);
+ while (a_tmp->completion != NULL) {
+ auth_completion = a_tmp->completion;
+ auth_completion(rc, a_tmp->auth_data);
+ a_tmp = a_tmp->next;
+ if (a_tmp == NULL)
+ break;
}
+ free_auth_completion(&a_list);
}
-static int send_auth_info(zhandle_t *zh)
-{
+static int send_info_packet(zhandle_t *zh, auth_info* auth) {
struct oarchive *oa;
struct RequestHeader h = { .xid = AUTH_XID, .type = SETAUTH_OP};
struct AuthPacket req;
int rc;
-
- zoo_lock_auth(zh);
-
- if(zh->auth.scheme==NULL) {
- zoo_unlock_auth(zh);
- return ZOK; // there is nothing to send
- }
-
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
-
req.type=0; // ignored by the server
- req.scheme = zh->auth.scheme;
- req.auth = zh->auth.auth;
+ req.scheme = auth->scheme;
+ req.auth = auth->auth;
rc = rc < 0 ? rc : serialize_AuthPacket(oa, "req", &req);
-
- zoo_unlock_auth(zh);
-
/* add this buffer to the head of the send queue */
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
-
+
+ return rc;
+}
+
+/** send all auths, not just the last one **/
+static int send_auth_info(zhandle_t *zh) {
+ int rc;
+ zoo_lock_auth(zh);
+ auth_info *auth;
+ auth = zh->auth_h.auth;
+ if (auth == NULL) {
+ zoo_unlock_auth(zh);
+ return ZOK;
+ }
+ while (auth->next != NULL) {
+ rc = send_info_packet(zh, auth);
+ auth = auth->next;
+ }
+ zoo_unlock_auth(zh);
+ LOG_DEBUG(("Sending all auth info request to %s", format_current_endpoint_info(zh)));
+ return (rc <0) ? ZMARSHALLINGERROR:ZOK;
+}
+
+static int send_last_auth_info(zhandle_t *zh)
+{
+ int rc;
+ zoo_lock_auth(zh);
+ auth_info *auth = get_last_auth(&zh->auth_h);
+ if(auth==NULL) {
+ zoo_unlock_auth(zh);
+ return ZOK; // there is nothing to send
+ }
+ rc = send_info_packet(zh, auth);
+ zoo_unlock_auth(zh);
LOG_DEBUG(("Sending auth info request to %s",format_current_endpoint_info(zh)));
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
@@ -2382,7 +2515,7 @@
int certLen,void_completion_t completion, const void *data)
{
struct buffer auth;
-
+ auth_info *authinfo;
if(scheme==NULL || zh==NULL)
return ZBADARGUMENTS;
@@ -2402,18 +2535,18 @@
}
zoo_lock_auth(zh);
-
- free_auth_info(&zh->auth);
- zh->auth.scheme=strdup(scheme);
+ authinfo = (auth_info*) malloc(sizeof(auth_info));
+ authinfo->scheme=strdup(scheme);
if(auth.buff)
- zh->auth.auth=auth;
- zh->auth.completion=completion;
- zh->auth.data=data;
-
+ authinfo->auth=auth;
+ authinfo->completion=completion;
+ authinfo->data=data;
+ authinfo->next = NULL;
+ add_last_auth(&zh->auth_h, authinfo);
zoo_unlock_auth(zh);
-
+
if(zh->state == ZOO_CONNECTED_STATE || zh->state == ZOO_ASSOCIATING_STATE)
- return send_auth_info(zh);
+ return send_last_auth_info(zh);
return ZOK;
}
Modified: hadoop/zookeeper/trunk/src/c/tests/TestClient.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/c/tests/TestClient.cc?rev=783096&r1=783095&r2=783096&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/c/tests/TestClient.cc (original)
+++ hadoop/zookeeper/trunk/src/c/tests/TestClient.cc Tue Jun 9 19:01:11 2009
@@ -97,6 +97,8 @@
typedef struct watchCtx {
private:
list<evt_t> events;
+ watchCtx(const watchCtx&);
+ watchCtx& operator=(const watchCtx&);
public:
bool connected;
zhandle_t *zh;
@@ -270,7 +272,8 @@
#define COUNT 100
static zhandle_t *async_zk;
-
+ static volatile int count;
+
static void statCompletion(int rc, const struct Stat *stat, const void *data) {
int tmp = (int) (long) data;
CPPUNIT_ASSERT_EQUAL(tmp, rc);
@@ -289,12 +292,21 @@
free(path);
}
}
-
+
+ static void waitForVoidCompletion(int seconds) {
+ time_t expires = time(0) + seconds;
+ while(count == 0 && time(0) < expires) {
+ sleep(1);
+ }
+ count--;
+ }
+
static void voidCompletion(int rc, const void *data) {
int tmp = (int) (long) data;
CPPUNIT_ASSERT_EQUAL(tmp, rc);
+ count++;
}
-
+
static void verifyCreateFails(const char *path, zhandle_t *zk) {
CPPUNIT_ASSERT_EQUAL((int)ZBADARGUMENTS, zoo_create(zk,
path, "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0));
@@ -363,47 +375,68 @@
void testAuth() {
int rc;
-
- watchctx_t ctx1;
+ count = 0;
+ watchctx_t ctx1, ctx2, ctx3;
zhandle_t *zk = createClient(&ctx1);
-
+
rc = zoo_add_auth(0, "", 0, 0, voidCompletion, (void*)-1);
CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc);
-
+
rc = zoo_add_auth(zk, 0, 0, 0, voidCompletion, (void*)-1);
CPPUNIT_ASSERT_EQUAL((int) ZBADARGUMENTS, rc);
-
+
// auth as pat, create /tauth1, close session
rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion,
(void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
-
+ waitForVoidCompletion(3);
+
+ CPPUNIT_ASSERT(count == 0);
+
rc = zoo_create(zk, "/tauth1", "", 0, &ZOO_CREATOR_ALL_ACL, 0, 0, 0);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
- // auth as pat w/bad pass, access /tauth1, verify failure
- watchctx_t ctx2;
zk = createClient(&ctx2);
rc = zoo_add_auth(zk, "digest", "pat:passwd2", 11, voidCompletion,
(void*)ZOK);
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
-
+ waitForVoidCompletion(3);
+ CPPUNIT_ASSERT(count == 0);
+
char buf[1024];
int blen = sizeof(buf);
struct Stat stat;
rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZNOAUTH, rc);
-
// add auth pat w/correct pass verify success
rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion,
(void*)ZOK);
+
CPPUNIT_ASSERT_EQUAL((int) ZOK, rc);
-
+ rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
+ CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
+ waitForVoidCompletion(3);
+ CPPUNIT_ASSERT(count == 0);
+ //create a new client
+ zk = createClient(&ctx3);
+ rc = zoo_add_auth(zk, "digest", "pat:passwd", 10, voidCompletion, (void*) ZOK);
+ waitForVoidCompletion(3);
+ CPPUNIT_ASSERT(count == 0);
+ rc = zoo_add_auth(zk, "ip", "none", 4, voidCompletion, (void*)ZOK);
+ //make the server forget the auths
+ waitForVoidCompletion(3);
+ CPPUNIT_ASSERT(count == 0);
+
+ stopServer();
+ CPPUNIT_ASSERT(ctx3.waitForDisconnected(zk));
+ startServer();
+ CPPUNIT_ASSERT(ctx3.waitForConnected(zk));
+ // now try getting the data
rc = zoo_get(zk, "/tauth1", 0, buf, &blen, &stat);
CPPUNIT_ASSERT_EQUAL((int)ZOK, rc);
}
-
+
void testNullData() {
watchctx_t ctx;
zhandle_t *zk = createClient(&ctx);
@@ -697,6 +730,7 @@
}
};
+volatile int Zookeeper_simpleSystem::count;
zhandle_t *Zookeeper_simpleSystem::async_zk;
const char Zookeeper_simpleSystem::hostPorts[] = "127.0.0.1:22181";
CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_simpleSystem);