You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/04/22 08:05:16 UTC
svn commit: r936623 - in /hadoop/zookeeper/branches/branch-3.3: CHANGES.txt
src/contrib/zkpython/src/c/zookeeper.c
Author: phunt
Date: Thu Apr 22 06:05:15 2010
New Revision: 936623
URL: http://svn.apache.org/viewvc?rev=936623&view=rev
Log:
ZOOKEEPER-631. zkpython's C code could do with a style clean-up
Modified:
hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
hadoop/zookeeper/branches/branch-3.3/src/contrib/zkpython/src/c/zookeeper.c
Modified: hadoop/zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/CHANGES.txt?rev=936623&r1=936622&r2=936623&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ hadoop/zookeeper/branches/branch-3.3/CHANGES.txt Thu Apr 22 06:05:15 2010
@@ -15,6 +15,9 @@ BUGFIXES:
ZOOKEEPER-741. root level create on REST proxy fails (phunt)
+ ZOOKEEPER-631. zkpython's C code could do with a style clean-up
+ (henry robinson via phunt)
+
Release 3.3.0 - 2010-03-24
Non-backward compatible changes:
Modified: hadoop/zookeeper/branches/branch-3.3/src/contrib/zkpython/src/c/zookeeper.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/branches/branch-3.3/src/contrib/zkpython/src/c/zookeeper.c?rev=936623&r1=936622&r2=936623&view=diff
==============================================================================
--- hadoop/zookeeper/branches/branch-3.3/src/contrib/zkpython/src/c/zookeeper.c (original)
+++ hadoop/zookeeper/branches/branch-3.3/src/contrib/zkpython/src/c/zookeeper.c Thu Apr 22 06:05:15 2010
@@ -19,7 +19,7 @@
#include <Python.h>
#include <zookeeper.h>
#include <assert.h>
-
+
//////////////////////////////////////////////
// EXCEPTIONS
PyObject *ZooKeeperException = NULL;
@@ -50,324 +50,385 @@ PyObject *NothingException;
PyObject *err_to_exception(int errcode) {
switch (errcode) {
- case ZSYSTEMERROR:
- return SystemErrorException;
- case ZRUNTIMEINCONSISTENCY:
- return RuntimeInconsistencyException;
- case ZDATAINCONSISTENCY:
- return DataInconsistencyException;
- case ZCONNECTIONLOSS:
- return ConnectionLossException;
- case ZMARSHALLINGERROR:
- return MarshallingErrorException;
- case ZUNIMPLEMENTED:
- return UnimplementedException;
- case ZOPERATIONTIMEOUT:
- return OperationTimeoutException;
- case ZBADARGUMENTS:
- return BadArgumentsException;
- case ZAPIERROR:
- return ApiErrorException;
- case ZNONODE:
- return NoNodeException;
- case ZNOAUTH:
- return NoAuthException;
- case ZBADVERSION:
- return BadVersionException;
- case ZNOCHILDRENFOREPHEMERALS:
- return NoChildrenForEphemeralsException;
- case ZNODEEXISTS:
- return NodeExistsException;
- case ZINVALIDACL:
- return InvalidACLException;
- case ZAUTHFAILED:
- return AuthFailedException;
- case ZNOTEMPTY:
- return NotEmptyException;
- case ZSESSIONEXPIRED:
- return SessionExpiredException;
- case ZINVALIDCALLBACK:
- return InvalidCallbackException;
- case ZSESSIONMOVED:
- return SessionMovedException;
+ case ZSYSTEMERROR:
+ return SystemErrorException;
+ case ZRUNTIMEINCONSISTENCY:
+ return RuntimeInconsistencyException;
+ case ZDATAINCONSISTENCY:
+ return DataInconsistencyException;
+ case ZCONNECTIONLOSS:
+ return ConnectionLossException;
+ case ZMARSHALLINGERROR:
+ return MarshallingErrorException;
+ case ZUNIMPLEMENTED:
+ return UnimplementedException;
+ case ZOPERATIONTIMEOUT:
+ return OperationTimeoutException;
+ case ZBADARGUMENTS:
+ return BadArgumentsException;
+ case ZAPIERROR:
+ return ApiErrorException;
+ case ZNONODE:
+ return NoNodeException;
+ case ZNOAUTH:
+ return NoAuthException;
+ case ZBADVERSION:
+ return BadVersionException;
+ case ZNOCHILDRENFOREPHEMERALS:
+ return NoChildrenForEphemeralsException;
+ case ZNODEEXISTS:
+ return NodeExistsException;
+ case ZINVALIDACL:
+ return InvalidACLException;
+ case ZAUTHFAILED:
+ return AuthFailedException;
+ case ZNOTEMPTY:
+ return NotEmptyException;
+ case ZSESSIONEXPIRED:
+ return SessionExpiredException;
+ case ZINVALIDCALLBACK:
+ return InvalidCallbackException;
+ case ZSESSIONMOVED:
+ return SessionMovedException;
- case ZOK:
- default:
- return NULL;
- }
+ case ZOK:
+ default:
+ return NULL;
+ }
}
-#define CHECK_ZHANDLE(z) if ( (z) < 0 || (z) >= num_zhandles) { \
- PyErr_SetString( ZooKeeperException, "zhandle out of range" ); \
-return NULL; \
-} else if ( zhandles[(z)] == NULL ) { \
- PyErr_SetString(ZooKeeperException, "zhandle already freed"); \
- return NULL; \
- }
+#define CHECK_ZHANDLE(z) if ( (z) < 0 || (z) >= num_zhandles) { \
+ PyErr_SetString( ZooKeeperException, "zhandle out of range" ); \
+return NULL; \
+} else if ( zhandles[(z)] == NULL ) { \
+ PyErr_SetString(ZooKeeperException, "zhandle already freed"); \
+ return NULL; \
+ }
-/////////////////////////////////////////////
-// HELPER FUNCTIONS
+/* Contains all the state required for a watcher callback - these are
+ passed to the *dispatch functions as void*, cast to pywatcher_t and
+ then their callback member is invoked if not NULL */
typedef struct {
int zhandle;
PyObject *callback;
int permanent;
}pywatcher_t;
-// This array exists because we need to ref. count
-// the global watchers for each connection - but they're
-// inaccessible without pulling in zk_adaptor.h, which I'm
-// trying to avoid.
+/* This array exists because we need to ref. count the global watchers
+ for each connection - but they're inaccessible without pulling in
+ zk_adaptor.h, which I'm trying to avoid. */
static pywatcher_t **watchers;
-// We keep an array of zhandles available for use.
-// When a zhandle is correctly closed, the C client
-// frees the memory so we set the zhandles[i] entry to NULL.
-// This entry can then be re-used
+/* We keep an array of zhandles available for use. When a zhandle is
+ correctly closed, the C client frees the memory so we set the
+ zhandles[i] entry to NULL. This entry can then be re-used. */
static zhandle_t** zhandles = NULL;
static int num_zhandles = 0;
static int max_zhandles = 0;
#define REAL_MAX_ZHANDLES 32768
-// Allocates an initial zhandle and watcher array
-void init_zhandles(int num) {
- zhandles = malloc(sizeof(zhandle_t*)*num);
- watchers = malloc(sizeof(pywatcher_t*)*num);
- max_zhandles = num;
- num_zhandles = 0;
- memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
-}
-
-// Note that the following zhandle functions are not
-// thread-safe. The C-Python runtime does not seem to
-// pre-empt a thread that is in a C module, so there's
-// no need for synchronisation.
-
-// Doubles the size of the zhandle / watcher array
-// Returns 0 if the new array would be >= REAL_MAX_ZHANDLES
-// in size.
-int resize_zhandles() {
- zhandle_t **tmp = zhandles;
- pywatcher_t ** wtmp = watchers;
- if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) {
- return -1;
- }
- max_zhandles *= 2;
- zhandles = malloc(sizeof(zhandle_t*)*max_zhandles);
- memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
- memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2);
-
- watchers = malloc(sizeof(pywatcher_t*)*max_zhandles);
- memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles);
- memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2);
-
- free(wtmp);
- free(tmp);
- return 0;
-}
-
-// Find a free zhandle - this is expensive, but we
-// expect it to be infrequently called.
-// There are optimisations that can be made if this turns out
-// to be problematic.
-// Returns -1 if no free handle is found.
-unsigned int next_zhandle() {
- int i = 0;
- for (i=0;i<max_zhandles;++i) {
- if (zhandles[i] == NULL) {
- num_zhandles++;
- return i;
- }
- }
+/* -------------------------------------------------------------------------- */
+/* zhandles - unique connection ids - tracking */
+/* -------------------------------------------------------------------------- */
+
+
+/* Allocates an initial zhandle and watcher array */
+int init_zhandles(int num) {
+ zhandles = malloc(sizeof(zhandle_t*)*num);
+ watchers = malloc(sizeof(pywatcher_t*)*num);
+ if (zhandles == NULL || watchers == NULL) {
+ return 0;
+ }
+ max_zhandles = num;
+ num_zhandles = 0;
+ memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+ return 1;
+}
+
+/* Note that the following zhandle functions are not thread-safe. The
+ C-Python runtime does not seem to pre-empt a thread that is in a C
+ module, so there's no need for synchronisation. */
+
+/* Doubles the size of the zhandle / watcher array Returns 0 if the
+ new array would be >= REAL_MAX_ZHANDLES in size. Called when zhandles
+ is full. Returns 0 if allocation failed or if max num zhandles
+ exceeded. */
+int resize_zhandles(void) {
+ zhandle_t **tmp = zhandles;
+ pywatcher_t ** wtmp = watchers;
+ if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) {
+ return 0;
+ }
+ max_zhandles *= 2;
+ zhandles = malloc(sizeof(zhandle_t*)*max_zhandles);
+ if (zhandles == NULL) {
+ PyErr_SetString(PyExc_MemoryError, "malloc for new zhandles failed");
+ return 0;
+ }
+ memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+ memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2);
+
+ watchers = malloc(sizeof(pywatcher_t*)*max_zhandles);
+ if (watchers == NULL) {
+ PyErr_SetString(PyExc_MemoryError, "malloc for new watchers failed");
+ return 0;
+ }
+ memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles);
+ memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2);
- return -1;
+ free(wtmp);
+ free(tmp);
+ return 1;
+}
+
+/* Find a free zhandle - this iterates through the list of open
+ zhandles, but we expect it to be infrequently called. There are
+ optimisations that can be made if this turns out to be problematic.
+ Returns -1 if no free handle is found - resize_handles() can be
+ called in that case. */
+unsigned int next_zhandle(void) {
+ int i = 0;
+ for (i=0;i<max_zhandles;++i) {
+ if (zhandles[i] == NULL) {
+ num_zhandles++;
+ return i;
+ }
+ }
+
+ return -1;
}
-/////////////////////////////////////
-// Pywatcher funcs
+/* -------------------------------------------------------------------------- */
+/* Utility functions to construct and deallocate data structures */
+/* -------------------------------------------------------------------------- */
+
+/* Creates a new pywatcher_t to hold connection state, a callback
+ object and a flag to say if the watcher is permanent. Takes a new
+ reference to the callback object. */
pywatcher_t *create_pywatcher(int zh, PyObject* cb, int permanent)
{
pywatcher_t *ret = (pywatcher_t*)calloc(sizeof(pywatcher_t),1);
+ if (ret == NULL) {
+ PyErr_SetString(PyExc_MemoryError, "calloc failed in create_pywatcher");
+ return NULL;
+ }
Py_INCREF(cb);
ret->zhandle = zh; ret->callback = cb; ret->permanent = permanent;
return ret;
}
-void free_pywatcher( pywatcher_t *pw)
+/* Releases the reference taken in create_pywatcher to the callback,
+ then frees the allocated pywatcher_t* */
+void free_pywatcher(pywatcher_t *pw)
{
+ if (pw == NULL) {
+ return;
+ }
Py_DECREF(pw->callback);
+
free(pw);
}
-
+/* Constructs a new stat object. Returns Py_None if stat == NULL or a
+ dictionary containing all the stat information otherwise. In either
+ case, takes a reference to the returned object. */
PyObject *build_stat( const struct Stat *stat )
{
- if (stat == NULL) {
- return Py_None;
- }
+ if (stat == NULL) {
+ Py_INCREF(Py_None);
+ return Py_None;
+ }
return Py_BuildValue( "{s:K, s:K, s:K, s:K,"
- "s:i, s:i, s:i, s:K,"
- "s:i, s:i, s:K}",
- "czxid", stat->czxid,
- "mzxid", stat->mzxid,
- "ctime", stat->ctime,
- "mtime", stat->mtime,
- "version", stat->version,
- "cversion", stat->cversion,
- "aversion", stat->aversion,
- "ephemeralOwner", stat->ephemeralOwner,
- "dataLength", stat->dataLength,
- "numChildren", stat->numChildren,
- "pzxid", stat->pzxid );
-}
-
+ "s:i, s:i, s:i, s:K,"
+ "s:i, s:i, s:K}",
+ "czxid", stat->czxid,
+ "mzxid", stat->mzxid,
+ "ctime", stat->ctime,
+ "mtime", stat->mtime,
+ "version", stat->version,
+ "cversion", stat->cversion,
+ "aversion", stat->aversion,
+ "ephemeralOwner", stat->ephemeralOwner,
+ "dataLength", stat->dataLength,
+ "numChildren", stat->numChildren,
+ "pzxid", stat->pzxid );
+}
+
+/* Creates a new list of strings from a String_vector. Returns the
+ empty list if the String_vector is NULL. Takes a reference to the
+ returned PyObject and gives that reference to the caller. */
PyObject *build_string_vector(const struct String_vector *sv)
{
PyObject *ret;
- if (!sv)
- {
- ret = PyList_New(0);
- }
- else
- {
- ret = PyList_New(sv->count);
- if (ret)
- {
- int i;
- for (i=0;i<sv->count;++i)
- {
- PyObject *s = PyString_FromString(sv->data[i]);
- if (!s)
- {
- Py_DECREF(ret);
- ret = NULL;
- break;
- }
- PyList_SetItem(ret, i, s);
- }
+ if (!sv) {
+ return PyList_New(0);
+ }
+
+ ret = PyList_New(sv->count);
+ if (ret) {
+ int i;
+ for (i=0;i<sv->count;++i) {
+ PyObject *s = PyString_FromString(sv->data[i]);
+ if (!s) {
+ if (ret != Py_None) {
+ Py_DECREF(ret);
}
+ ret = NULL;
+ break;
+ }
+ PyList_SetItem(ret, i, s);
}
+ }
return ret;
}
+/* Returns 1 if the PyObject is a valid representation of an ACL, and
+ 0 otherwise. */
+int check_is_acl(PyObject *o) {
+ int i;
+ if (o == NULL) {
+ return 0;
+ }
+ if (!PyList_Check(o)) {
+ return 0;
+ }
+ for (i=0;i<PyList_Size(o);++i) {
+ if (!PyDict_Check(PyList_GetItem(o,i))) {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+/* Macro form to throw exception if o is not an ACL */
+#define CHECK_ACLS(o) if (check_is_acl(o) == 0) { \
+ PyErr_SetString(err_to_exception(ZINVALIDACL), zerror(ZINVALIDACL)); \
+ return NULL; \
+ }
+
+
+/* Creates a new list of ACL dictionaries from an ACL_vector. Returns
+ the empty list if the ACL_vector is NULL. Takes a reference to the
+ returned PyObject and gives that reference to the caller. */
PyObject *build_acls( const struct ACL_vector *acls )
{
- PyObject *ret = PyList_New( acls->count );
+ if (acls == NULL) {
+ return PyList_New(0);
+ }
+
+ PyObject *ret = PyList_New(acls->count);
int i;
- for (i=0;i<acls->count;++i)
- {
- PyObject *acl = Py_BuildValue( "{s:i, s:s, s:s}",
- "perms", acls->data[i].perms,
- "scheme", acls->data[i].id.scheme,
- "id", acls->data[i].id.id );
- PyList_SetItem(ret, i, acl);
- }
+ for (i=0;i<acls->count;++i) {
+ PyObject *acl = Py_BuildValue( "{s:i, s:s, s:s}",
+ "perms", acls->data[i].perms,
+ "scheme", acls->data[i].id.scheme,
+ "id", acls->data[i].id.id );
+ PyList_SetItem(ret, i, acl);
+ }
return ret;
}
-void parse_acls( struct ACL_vector *acls, PyObject *pyacls )
+/* Parse the Python representation of an ACL list into an ACL_vector
+ (which needs subsequent freeing) */
+int parse_acls(struct ACL_vector *acls, PyObject *pyacls)
{
PyObject *a;
- acls->count = PyList_Size( pyacls );
- acls->data = (struct ACL *)calloc( acls->count, sizeof(struct ACL) );
int i;
- for (i=0;i<acls->count;++i)
- {
- a = PyList_GetItem( pyacls, i );
- // a is now a dictionary
- PyObject *perms = PyDict_GetItemString( a, "perms" );
- acls->data[i].perms = (int32_t)(PyInt_AsLong(perms));
- acls->data[i].id.id = strdup( PyString_AsString( PyDict_GetItemString( a, "id" ) ) );
- acls->data[i].id.scheme = strdup( PyString_AsString( PyDict_GetItemString( a, "scheme" ) ) );
- }
+ if (acls == NULL || pyacls == NULL) {
+ PyErr_SetString(PyExc_ValueError, "acls or pyacls NULL in parse_acls");
+ return 0;
+ }
+
+ acls->count = PyList_Size( pyacls );
+
+ // Is this a list? If not, we can't do anything
+ if (PyList_Check(pyacls) == 0) {
+ PyErr_SetString(InvalidACLException, "List of ACLs required in parse_acls");
+ return 0;
+ }
+
+ acls->data = (struct ACL *)calloc(acls->count, sizeof(struct ACL));
+ if (acls->data == NULL) {
+ PyErr_SetString(PyExc_MemoryError, "calloc failed in parse_acls");
+ return 0;
+ }
+
+ for (i=0;i<acls->count;++i) {
+ a = PyList_GetItem(pyacls, i);
+ // a is now a dictionary
+ PyObject *perms = PyDict_GetItemString( a, "perms" );
+ acls->data[i].perms = (int32_t)(PyInt_AsLong(perms));
+ acls->data[i].id.id = strdup( PyString_AsString( PyDict_GetItemString( a, "id" ) ) );
+ acls->data[i].id.scheme = strdup( PyString_AsString( PyDict_GetItemString( a, "scheme" ) ) );
+ }
+ return 1;
}
+/* Deallocates the memory allocated inside an ACL_vector, but not the
+ ACL_vector itself */
void free_acls( struct ACL_vector *acls )
{
+ if (acls == NULL) {
+ return;
+ }
int i;
- for (i=0;i<acls->count;++i)
- {
- free(acls->data[i].id.id);
- free(acls->data[i].id.scheme);
- }
+ for (i=0;i<acls->count;++i) {
+ free(acls->data[i].id.id);
+ free(acls->data[i].id.scheme);
+ }
free(acls->data);
}
-/////////////////////////////////////////////
+/* -------------------------------------------------------------------------- */
+/* Watcher and callback implementation */
+/* -------------------------------------------------------------------------- */
+
/* Every watcher invocation goes through this dispatch point, which
a) acquires the global interpreter lock
- b) unpacks the PyObject to call from the passed context pointer, which
- handily includes the index of the relevant zookeeper handle to pass back to Python.
- c) Makes the call into Python, checking for error conditions which we are responsible for
- detecting and doing something about (we just print the error and plough right on)
- d) releases the lock after freeing up the context object, which is only used for one
- watch invocation (watches are one-shot)
+
+ b) unpacks the PyObject to call from the passed context pointer,
+ which handily includes the index of the relevant zookeeper handle
+ to pass back to Python.
+
+ c) Makes the call into Python, checking for error conditions which
+ we are responsible for detecting and doing something about (we just
+ print the error and plough right on)
+
+ d) releases the lock after freeing up the context object, which is
+ only used for one watch invocation (watches are one-shot, unless
+ 'permanent' != 0)
*/
-void watcher_dispatch(zhandle_t *zzh, int type, int state, const char *path, void *context)
+void watcher_dispatch(zhandle_t *zzh, int type, int state,
+ const char *path, void *context)
{
PyGILState_STATE gstate;
pywatcher_t *pyw = (pywatcher_t*)context;
PyObject *callback = pyw->callback;
+ if (callback == NULL) {
+ // This is unexpected
+ char msg[256];
+ sprintf(msg, "pywatcher: %d %p %d", pyw->zhandle, pyw->callback, pyw->permanent);
+ PyErr_SetString(PyExc_ValueError, msg);
+ return;
+ }
+
gstate = PyGILState_Ensure();
PyObject *arglist = Py_BuildValue("(i,i,i,s)", pyw->zhandle,type, state, path);
- if (PyObject_CallObject((PyObject*)callback, arglist) == NULL)
+ if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) {
PyErr_Print();
- if (pyw->permanent == 0)
- free_pywatcher(pyw);
- PyGILState_Release(gstate);
-}
-
-static PyObject *pyzookeeper_init(PyObject *self, PyObject *args)
-{
- const char *host;
- PyObject *watcherfn = Py_None;
- int recv_timeout = 10000;
- // int clientid = -1;
- clientid_t cid;
- cid.client_id = -1;
- const char *passwd;
- int handle = next_zhandle();
- if (handle == -1) {
- resize_zhandles();
- handle = next_zhandle();
- }
-
- if (handle == -1) {
- PyErr_SetString(ZooKeeperException,"Couldn't find a free zhandle, something is very wrong");
- return NULL;
- }
-
- if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, &cid.client_id, &passwd))
- return NULL;
-
- if (cid.client_id != -1) {
- strncpy(cid.passwd, passwd, 16*sizeof(char));
}
- pywatcher_t *pyw = NULL;
- if (watcherfn != Py_None) {
- pyw = create_pywatcher(handle, watcherfn,1);
+ if (pyw->permanent == 0) {
+ free_pywatcher(pyw);
}
- watchers[handle] = pyw;
- zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? watcher_dispatch : NULL,
- recv_timeout, cid.client_id == -1 ? 0 : &cid,
- pyw,
- 0 );
-
- if (zh == NULL)
- {
- PyErr_SetString( ZooKeeperException, "Could not internally obtain zookeeper handle" );
- return NULL;
- }
-
- zhandles[handle] = zh;
- return Py_BuildValue( "i", handle);
+ PyGILState_Release(gstate);
}
-///////////////////////////////////////////////////////
-// Similar kind of mechanisms for various completion
-// types
+/* The completion callbacks (from asynchronous calls) are implemented similarly */
+/* Called when an asynchronous call that returns void completes and
+ dispatches user provided callback */
void void_completion_dispatch(int rc, const void *data)
{
PyGILState_STATE gstate;
@@ -383,6 +444,8 @@ void void_completion_dispatch(int rc, co
PyGILState_Release(gstate);
}
+/* Called when an asynchronous call that returns a stat structure
+ completes and dispatches user provided callback */
void stat_completion_dispatch(int rc, const struct Stat *stat, const void *data)
{
PyGILState_STATE gstate;
@@ -391,13 +454,19 @@ void stat_completion_dispatch(int rc, co
return;
PyObject *callback = pyw->callback;
gstate = PyGILState_Ensure();
- PyObject *arglist = Py_BuildValue("(i,i,N)", pyw->zhandle,rc, build_stat(stat));
+ PyObject *pystat = build_stat(stat);
+ PyObject *arglist = Py_BuildValue("(i,i,O)", pyw->zhandle,rc, pystat);
+ Py_DECREF(pystat);
+
if (PyObject_CallObject((PyObject*)callback, arglist) == NULL)
PyErr_Print();
free_pywatcher(pyw);
PyGILState_Release(gstate);
}
+/* Called when an asynchronous call that returns a stat structure and
+ some untyped data completes and dispatches user provided
+ callback (used by aget) */
void data_completion_dispatch(int rc, const char *value, int value_len, const struct Stat *stat, const void *data)
{
PyGILState_STATE gstate;
@@ -406,13 +475,18 @@ void data_completion_dispatch(int rc, co
return;
PyObject *callback = pyw->callback;
gstate = PyGILState_Ensure();
- PyObject *arglist = Py_BuildValue("(i,i,s#,O)", pyw->zhandle,rc, value,value_len, build_stat(stat));
+ PyObject *pystat = build_stat(stat);
+ PyObject *arglist = Py_BuildValue("(i,i,s#,O)", pyw->zhandle,rc, value,value_len, pystat);
+ Py_DECREF(pystat);
+
if (PyObject_CallObject((PyObject*)callback, arglist) == NULL)
PyErr_Print();
free_pywatcher(pyw);
PyGILState_Release(gstate);
}
+/* Called when an asynchronous call that returns a list of strings
+ completes and dispatches user provided callback */
void strings_completion_dispatch(int rc, const struct String_vector *strings, const void *data)
{
PyGILState_STATE gstate;
@@ -424,22 +498,26 @@ void strings_completion_dispatch(int rc,
PyObject *pystrings = build_string_vector(strings);
if (pystrings)
{
- PyObject *arglist = Py_BuildValue("(i,i,O)", pyw->zhandle, rc, pystrings);
+ PyObject *arglist = Py_BuildValue("(i,i,O)", pyw->zhandle, rc, pystrings);
if (arglist == NULL || PyObject_CallObject((PyObject*)callback, arglist) == NULL)
PyErr_Print();
}
else
PyErr_Print();
+ Py_DECREF(pystrings);
free_pywatcher(pyw);
PyGILState_Release(gstate);
}
+/* Called when an asynchronous call that returns a single string
+ completes and dispatches user provided callback */
void string_completion_dispatch(int rc, const char *value, const void *data)
{
PyGILState_STATE gstate;
pywatcher_t *pyw = (pywatcher_t*)data;
- if (pyw == NULL)
+ if (pyw == NULL) {
return;
+ }
PyObject *callback = pyw->callback;
gstate = PyGILState_Ensure();
PyObject *arglist = Py_BuildValue("(i,i,s)", pyw->zhandle,rc, value);
@@ -449,52 +527,115 @@ void string_completion_dispatch(int rc,
PyGILState_Release(gstate);
}
+/* Called when an asynchronous call that returns a list of ACLs
+ completes and dispatches user provided callback */
void acl_completion_dispatch(int rc, struct ACL_vector *acl, struct Stat *stat, const void *data)
{
PyGILState_STATE gstate;
pywatcher_t *pyw = (pywatcher_t*)data;
- if (pyw == NULL)
+ if (pyw == NULL) {
return;
+ }
PyObject *callback = pyw->callback;
gstate = PyGILState_Ensure();
- PyObject *arglist = Py_BuildValue("(i,i,O,O)", pyw->zhandle,rc, build_acls(acl), build_stat(stat));
- if (PyObject_CallObject((PyObject*)callback, arglist) == NULL)
+ PyObject *pystat = build_stat(stat);
+ PyObject *pyacls = build_acls(acl);
+ PyObject *arglist = Py_BuildValue("(i,i,O,O)", pyw->zhandle,rc, pyacls, pystat);
+
+ Py_DECREF(pystat);
+ Py_DECREF(pyacls);
+
+ if (PyObject_CallObject((PyObject*)callback, arglist) == NULL) {
PyErr_Print();
+ }
free_pywatcher(pyw);
PyGILState_Release(gstate);
}
-int check_is_acl(PyObject *o) {
- int i;
- if (!PyList_Check(o)) {
- return 0;
- }
- for (i=0;i<PyList_Size(o);++i) {
- if (!PyDict_Check(PyList_GetItem(o,i))) {
- return 0;
- }
- }
- return 1;
-}
-
-#define CHECK_ACLS(o) if (check_is_acl(o) == 0) {\
- PyErr_SetString(err_to_exception(ZINVALIDACL), zerror(ZINVALIDACL));\
- return NULL;\
- }
+/* -------------------------------------------------------------------------- */
+/* ZOOKEEPER API IMPLEMENTATION */
+/* -------------------------------------------------------------------------- */
-///////////////////////////////////////////////////////
-// Asynchronous API
+static PyObject *pyzookeeper_init(PyObject *self, PyObject *args)
+{
+ const char *host;
+ PyObject *watcherfn = Py_None;
+ int recv_timeout = 10000;
+ // int clientid = -1;
+ clientid_t cid;
+ cid.client_id = -1;
+ const char *passwd;
+ int handle = next_zhandle();
+ if (handle == -1) {
+ if (resize_zhandles() == 0) {
+ return NULL;
+ }
+ handle = next_zhandle();
+ }
+
+ if (handle == -1) {
+ PyErr_SetString(ZooKeeperException,"Couldn't find a free zhandle, something is very wrong");
+ return NULL;
+ }
+
+ if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, &cid.client_id, &passwd))
+ return NULL;
+
+ if (cid.client_id != -1) {
+ strncpy(cid.passwd, passwd, 16*sizeof(char));
+ }
+ pywatcher_t *pyw = NULL;
+ if (watcherfn != Py_None) {
+ pyw = create_pywatcher(handle, watcherfn,1);
+ if (pyw == NULL) {
+ return NULL;
+ }
+ }
+ watchers[handle] = pyw;
+ zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? watcher_dispatch : NULL,
+ recv_timeout, cid.client_id == -1 ? 0 : &cid,
+ pyw,
+ 0 );
+
+ if (zh == NULL)
+ {
+ PyErr_SetString( ZooKeeperException, "Could not internally obtain zookeeper handle" );
+ return NULL;
+ }
+
+ zhandles[handle] = zh;
+ return Py_BuildValue( "i", handle);
+}
+
+
+/* -------------------------------------------------------------------------- */
+/* Asynchronous API implementation */
+/* -------------------------------------------------------------------------- */
+
+/* Asynchronous node creation, returns integer error code */
PyObject *pyzoo_acreate(PyObject *self, PyObject *args)
{
int zkhid; char *path; char *value; int valuelen;
struct ACL_vector acl; int flags = 0;
PyObject *completion_callback = Py_None;
PyObject *pyacls = Py_None;
- if (!PyArg_ParseTuple(args, "iss#O|iO", &zkhid, &path, &value, &valuelen, &pyacls, &flags, &completion_callback))
+ if (!PyArg_ParseTuple(args, "iss#O|iO", &zkhid, &path,
+ &value, &valuelen, &pyacls, &flags,
+ &completion_callback)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
CHECK_ACLS(pyacls);
- parse_acls(&acl, pyacls);
+ if (parse_acls(&acl, pyacls) == 0) {
+ return NULL;
+ }
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
+ return NULL;
+ }
+ }
int err = zoo_acreate( zhandles[zkhid],
path,
value,
@@ -502,7 +643,7 @@ PyObject *pyzoo_acreate(PyObject *self,
pyacls == Py_None ? NULL : &acl,
flags,
string_completion_dispatch,
- completion_callback != Py_None ? create_pywatcher(zkhid, completion_callback,0 ) : NULL );
+ pyw);
free_acls(&acl);
if (err != ZOK)
{
@@ -512,6 +653,7 @@ PyObject *pyzoo_acreate(PyObject *self,
return Py_BuildValue("i", err);
}
+/* Asynchronous node deletion, returns integer error code */
PyObject *pyzoo_adelete(PyObject *self, PyObject *args)
{
int zkhid; char *path; int version = -1;
@@ -520,38 +662,58 @@ PyObject *pyzoo_adelete(PyObject *self,
return NULL;
CHECK_ZHANDLE(zkhid);
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
+ return NULL;
+ }
+ }
+
int err = zoo_adelete( zhandles[zkhid],
- path,
- version,
- void_completion_dispatch,
- completion_callback != Py_None ? create_pywatcher(zkhid,
- completion_callback,
- 0 ) : NULL );
+ path,
+ version,
+ void_completion_dispatch,
+ pyw);
- if (err != ZOK)
- {
+ if (err != ZOK) {
PyErr_SetString(err_to_exception(err), zerror(err));
return NULL;
}
return Py_BuildValue("i", err);
}
+/* Asynchronous node existance check, returns integer error code */
PyObject *pyzoo_aexists(PyObject *self, PyObject *args)
{
int zkhid; char *path;
PyObject *completion_callback = Py_None;
PyObject *exists_watch = Py_None;
if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path,
- &exists_watch, &completion_callback))
+ &exists_watch, &completion_callback))
return NULL;
CHECK_ZHANDLE(zkhid);
+ void *comp_pyw = NULL;
+ if (completion_callback != Py_None) {
+ comp_pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (comp_pyw == NULL) {
+ return NULL;
+ }
+ }
+ void *exist_pyw = NULL;
+ if (exists_watch != Py_None) {
+ exist_pyw = create_pywatcher(zkhid, exists_watch, 0);
+ if (exist_pyw == NULL) {
+ return NULL;
+ }
+ }
int err = zoo_awexists( zhandles[zkhid],
- path,
- exists_watch != Py_None ? watcher_dispatch : NULL,
- exists_watch != Py_None ? create_pywatcher(zkhid, exists_watch,0) : NULL,
- stat_completion_dispatch,
- (completion_callback != Py_None) ? create_pywatcher(zkhid, completion_callback,0) : NULL );
+ path,
+ exists_watch != Py_None ? watcher_dispatch : NULL,
+ exist_pyw,
+ stat_completion_dispatch,
+ comp_pyw);
if (err != ZOK)
{
@@ -561,32 +723,49 @@ PyObject *pyzoo_aexists(PyObject *self,
return Py_BuildValue("i", err);;
}
+/* Asynchronous node data retrieval, returns integer error code */
PyObject *pyzoo_aget(PyObject *self, PyObject *args)
{
int zkhid; char *path;
PyObject *completion_callback = Py_None;
PyObject *get_watch = Py_None;
+ void *comp_pw = NULL;
+ void *watch_pw = NULL;
+
if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path,
- &get_watch, &completion_callback))
+ &get_watch, &completion_callback)) {
return NULL;
+ }
+
CHECK_ZHANDLE(zkhid);
- int err = zoo_awget( zhandles[zkhid],
- path,
- get_watch != Py_None ? watcher_dispatch : NULL,
- get_watch != Py_None ? create_pywatcher(zkhid, get_watch,0) : NULL,
- data_completion_dispatch,
- completion_callback != Py_None ?
- create_pywatcher(zkhid, completion_callback,0 ) : NULL );
-
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ if (get_watch != Py_None) {
+ if ((watch_pw = create_pywatcher(zkhid, get_watch, 0)) == NULL) {
return NULL;
}
+ }
+
+ if (completion_callback != Py_None) {
+ if ((comp_pw = create_pywatcher(zkhid, completion_callback, 0)) == NULL) {
+ return NULL;
+ }
+ }
+
+ int err = zoo_awget( zhandles[zkhid],
+ path,
+ get_watch != Py_None ? watcher_dispatch : NULL,
+ watch_pw,
+ data_completion_dispatch,
+ comp_pw);
+
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);
}
+/* Asynchronous node contents update, returns integer error code */
PyObject *pyzoo_aset(PyObject *self, PyObject *args)
{
int zkhid; char *path; char *buffer; int buflen; int version=-1;
@@ -594,122 +773,168 @@ PyObject *pyzoo_aset(PyObject *self, PyO
if (!PyArg_ParseTuple(args, "iss#|iO", &zkhid, &path, &buffer, &buflen, &version, &completion_callback))
return NULL;
CHECK_ZHANDLE(zkhid);
-
- int err = zoo_aset( zhandles[zkhid],
- path,
- buffer,
- buflen,
- version,
- stat_completion_dispatch,
- completion_callback != Py_None ? create_pywatcher(zkhid,
- completion_callback,
- 0 ) : NULL );
-
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
return NULL;
}
+ }
+ int err = zoo_aset( zhandles[zkhid],
+ path,
+ buffer,
+ buflen,
+ version,
+ stat_completion_dispatch,
+ pyw);
+
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);
}
+/* Asynchronous node child retrieval, returns integer error code */
PyObject *pyzoo_aget_children(PyObject *self, PyObject *args)
{
int zkhid; char *path;
PyObject *completion_callback = Py_None;
PyObject *get_watch;
if (!PyArg_ParseTuple(args, "is|OO", &zkhid, &path,
- &get_watch, &completion_callback))
+ &get_watch, &completion_callback))
return NULL;
CHECK_ZHANDLE(zkhid);
- int err = zoo_awget_children( zhandles[zkhid],
- path,
- get_watch != Py_None ? watcher_dispatch : NULL,
- get_watch != Py_None ? create_pywatcher(zkhid, get_watch,0) : NULL,
- strings_completion_dispatch,
- completion_callback != Py_None ?
- create_pywatcher(zkhid, completion_callback,0) : NULL );
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ void *get_pyw = NULL;
+ if (get_watch != Py_None) {
+ get_pyw = create_pywatcher(zkhid, get_watch, 0);
+ if (get_pyw == NULL) {
return NULL;
}
+ }
+
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
+ return NULL;
+ }
+ }
+
+ int err = zoo_awget_children( zhandles[zkhid],
+ path,
+ get_watch != Py_None ? watcher_dispatch : NULL,
+ pyw,
+ strings_completion_dispatch,
+ pyw);
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);;
}
+/* Asynchronous sync, returns integer error code */
PyObject *pyzoo_async(PyObject *self, PyObject *args)
{
int zkhid; char *path;
PyObject *completion_callback = Py_None;
if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path,
- &completion_callback))
+ &completion_callback)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
- int err = zoo_async( zhandles[zkhid],
- path,
- string_completion_dispatch,
- completion_callback != Py_None ?
- create_pywatcher(zkhid, completion_callback,0) : NULL );
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
return NULL;
}
+ }
+
+ int err = zoo_async( zhandles[zkhid],
+ path,
+ string_completion_dispatch,
+ pyw);
+
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);;
}
+/* Asynchronous node ACL retrieval, returns integer error code */
PyObject *pyzoo_aget_acl(PyObject *self, PyObject *args)
{
int zkhid; char *path;
PyObject *completion_callback = Py_None;
if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path,
- &completion_callback))
+ &completion_callback)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
- int err = zoo_aget_acl( zhandles[zkhid],
- path,
- acl_completion_dispatch,
- completion_callback != Py_None ?
- create_pywatcher(zkhid, completion_callback,0) : NULL );
-
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
return NULL;
}
+ }
+
+ int err = zoo_aget_acl( zhandles[zkhid],
+ path,
+ acl_completion_dispatch,
+ pyw);
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);;
}
+/* Asynchronous node ACL update, returns integer error code */
PyObject *pyzoo_aset_acl(PyObject *self, PyObject *args)
{
int zkhid; char *path; int version;
PyObject *completion_callback = Py_None, *pyacl;
struct ACL_vector aclv;
if (!PyArg_ParseTuple(args, "isiO|O", &zkhid, &path, &version,
- &pyacl, &completion_callback))
+ &pyacl, &completion_callback)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
CHECK_ACLS(pyacl);
- parse_acls( &aclv, pyacl );
- int err = zoo_aset_acl( zhandles[zkhid],
- path,
- version,
- &aclv,
- void_completion_dispatch,
- completion_callback != Py_None ?
- create_pywatcher(zkhid, completion_callback,0) : NULL );
- free_acls(&aclv);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ if (parse_acls(&aclv, pyacl) == 0) {
+ return NULL;
+ }
+
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
return NULL;
}
+ }
+
+ int err = zoo_aset_acl( zhandles[zkhid],
+ path,
+ version,
+ &aclv,
+ void_completion_dispatch,
+ pyw);
+ free_acls(&aclv);
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);;
}
+/* Asynchronous authorization addition, returns integer error code */
PyObject *pyzoo_add_auth(PyObject *self, PyObject *args)
{
int zkhid;
@@ -717,27 +942,38 @@ PyObject *pyzoo_add_auth(PyObject *self,
int certLen;
PyObject *completion_callback;
- if (!PyArg_ParseTuple(args, "iss#O", &zkhid, &scheme, &cert, &certLen, &completion_callback))
+ if (!PyArg_ParseTuple(args, "iss#O", &zkhid, &scheme, &cert, &certLen,
+ &completion_callback)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
- int err = zoo_add_auth( zhandles[zkhid],
- scheme,
- cert,
- certLen,
- completion_callback != Py_None ? void_completion_dispatch : NULL,
- completion_callback != Py_None ? create_pywatcher(zkhid, completion_callback,0) : NULL );
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ void *pyw = NULL;
+ if (completion_callback != Py_None) {
+ pyw = create_pywatcher(zkhid, completion_callback, 0);
+ if (pyw == NULL) {
return NULL;
}
+ }
+
+ int err = zoo_add_auth( zhandles[zkhid],
+ scheme,
+ cert,
+ certLen,
+ void_completion_dispatch,
+ pyw);
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);
}
-///////////////////////////////////////////////////////
-// synchronous API
+/* -------------------------------------------------------------------------- */
+/* Synchronous API implementation */
+/* -------------------------------------------------------------------------- */
+/* Synchronous node creation, returns node path string */
static PyObject *pyzoo_create(PyObject *self, PyObject *args)
{
char *path;
@@ -753,18 +989,20 @@ static PyObject *pyzoo_create(PyObject *
CHECK_ZHANDLE(zkhid);
struct ACL_vector aclv;
CHECK_ACLS(acl);
- parse_acls(&aclv,acl);
+ if (parse_acls(&aclv,acl) == 0) {
+ return NULL;
+ }
zhandle_t *zh = zhandles[zkhid];
int err = zoo_create(zh, path, values, valuelen, &aclv, flags, realbuf, maxbuf_len);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("s", realbuf);
}
+/* Synchronous node deletion, returns integer error code */
static PyObject *pyzoo_delete(PyObject *self, PyObject *args)
{
int zkhid;
@@ -775,67 +1013,83 @@ static PyObject *pyzoo_delete(PyObject *
CHECK_ZHANDLE(zkhid);
zhandle_t *zh = zhandles[zkhid];
int err = zoo_delete(zh, path, version);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);
}
-// Returns None if the node does not exists
+/* Synchronous node existance check, returns stat if exists, None if
+ absent */
static PyObject *pyzoo_exists(PyObject *self, PyObject *args)
{
int zkhid; char *path; PyObject *watcherfn = Py_None;
struct Stat stat;
- if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn))
+ if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
zhandle_t *zh = zhandles[zkhid];
pywatcher_t *pw = NULL;
- if (watcherfn != Py_None)
+ void *callback = NULL;
+ if (watcherfn != Py_None) {
pw = create_pywatcher(zkhid, watcherfn,0);
- int err = zoo_wexists(zh, path, watcherfn != Py_None ? watcher_dispatch : NULL, pw, &stat);
- if (err != ZOK && err != ZNONODE)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- free_pywatcher( pw );
+ callback = watcher_dispatch;
+ if (pw == NULL) {
return NULL;
}
- if (err == ZOK)
- return build_stat( &stat );
- if (err == ZNONODE)
+ }
+ int err = zoo_wexists(zh, path, callback, pw, &stat);
+ if (err != ZOK && err != ZNONODE) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ free_pywatcher(pw);
+ return NULL;
+ }
+ if (err == ZNONODE) {
+ Py_INCREF(Py_None);
return Py_None; // This isn't exceptional
- return NULL;
+ }
+ return build_stat(&stat);
}
+/* Synchronous node child retrieval, returns list of children's path
+ as strings */
static PyObject *pyzoo_get_children(PyObject *self, PyObject *args)
{
int zkhid;
char *path;
PyObject *watcherfn = Py_None;
struct String_vector strings;
- if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn))
+ if (!PyArg_ParseTuple(args, "is|O", &zkhid, &path, &watcherfn)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
pywatcher_t *pw = NULL;
- if (watcherfn != Py_None)
+ void *callback = NULL;
+ if (watcherfn != Py_None) {
pw = create_pywatcher( zkhid, watcherfn, 0 );
- int err = zoo_wget_children(zhandles[zkhid], path,
- watcherfn != Py_None ? watcher_dispatch : NULL,
- pw, &strings );
-
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
+ callback = watcher_dispatch;
+ if (pw == NULL) {
return NULL;
}
+ }
+ int err = zoo_wget_children(zhandles[zkhid], path,
+ callback,
+ pw, &strings );
+
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ free_pywatcher(pw);
+ return NULL;
+ }
PyObject *ret = build_string_vector(&strings);
deallocate_String_vector(&strings);
return ret;
}
+/* Synchronous node data update, returns integer error code */
static PyObject *pyzoo_set(PyObject *self, PyObject *args)
{
int zkhid;
@@ -843,20 +1097,22 @@ static PyObject *pyzoo_set(PyObject *sel
char *buffer;
int buflen;
int version = -1;
- if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen, &version))
+ if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen,
+ &version)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
int err = zoo_set(zhandles[zkhid], path, buffer, buflen, version);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);
}
+/* Synchronous node data update, returns node's stat data structure */
static PyObject *pyzoo_set2(PyObject *self, PyObject *args)
{
int zkhid;
@@ -864,31 +1120,31 @@ static PyObject *pyzoo_set2(PyObject *se
char *buffer;
int buflen;
int version = -1;
- if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen, &version))
+ if (!PyArg_ParseTuple(args, "iss#|i", &zkhid, &path, &buffer, &buflen,
+ &version)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
struct Stat *stat = NULL;
int err = zoo_set2(zhandles[zkhid], path, buffer, buflen, version, stat);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return build_stat(stat);
}
-// As per ZK documentation, datanodes are limited
-// to 1Mb. Why not do a stat followed by a get, to
-// determine how big the buffer should be? Because the znode
-// may get updated between calls, so we can't guarantee a
-// complete get anyhow.
+/* As per ZK documentation, datanodes are limited to 1Mb. Why not do a
+ stat followed by a get, to determine how big the buffer should be?
+ Because the znode may get updated between calls, so we can't
+ guarantee a complete get anyhow. */
#define GET_BUFFER_SIZE 1024*1024
-// pyzoo_get has an extra parameter over the java/C equivalents.
-// If you set the fourth integer parameter buffer_len, we return
-// min(buffer_len, datalength) bytes. This is set by default to
-// GET_BUFFER_SIZE
+/* pyzoo_get has an extra parameter over the java/C equivalents. If
+ you set the fourth integer parameter buffer_len, we return
+ min(buffer_len, datalength) bytes. This is set by default to
+ GET_BUFFER_SIZE */
static PyObject *pyzoo_get(PyObject *self, PyObject *args)
{
int zkhid;
@@ -898,31 +1154,40 @@ static PyObject *pyzoo_get(PyObject *sel
struct Stat stat;
PyObject *watcherfn = Py_None;
pywatcher_t *pw = NULL;
- if (!PyArg_ParseTuple(args, "is|Oi", &zkhid, &path, &watcherfn, &buffer_len))
+ if (!PyArg_ParseTuple(args, "is|Oi", &zkhid, &path, &watcherfn, &buffer_len)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
- if (watcherfn != Py_None)
- {
- pw = create_pywatcher( zkhid, watcherfn,0 );
- }
- buffer = malloc(sizeof(char)*buffer_len);
+ if (watcherfn != Py_None) {
+ pw = create_pywatcher( zkhid, watcherfn,0 );
+ if (pw == NULL) {
+ return NULL;
+ }
+ }
+ buffer = malloc(sizeof(char)*buffer_len);
+ if (buffer == NULL) {
+ free(pw);
+ PyErr_SetString(PyExc_MemoryError, "buffer could not be allocated in pyzoo_get");
+ return NULL;
+ }
+
int err = zoo_wget(zhandles[zkhid], path,
- watcherfn != Py_None ? watcher_dispatch : NULL,
- pw, buffer,
- &buffer_len, &stat);
-
+ watcherfn != Py_None ? watcher_dispatch : NULL,
+ pw, buffer,
+ &buffer_len, &stat);
+
PyObject *stat_dict = build_stat( &stat );
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
- PyObject *ret = Py_BuildValue( "(s#,N)", buffer,buffer_len, stat_dict );
- free(buffer);
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
+ PyObject *ret = Py_BuildValue( "(s#,N)", buffer,buffer_len, stat_dict );
+ free(buffer);
return ret;
}
+/* Synchronous node ACL retrieval, returns list of ACLs */
PyObject *pyzoo_get_acl(PyObject *self, PyObject *args)
{
int zkhid;
@@ -933,16 +1198,19 @@ PyObject *pyzoo_get_acl(PyObject *self,
return NULL;
CHECK_ZHANDLE(zkhid);
int err = zoo_get_acl( zhandles[zkhid], path, &acl, &stat );
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
- PyObject *pystat = build_stat( &stat );
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
+ PyObject *pystat = build_stat( &stat );
PyObject *acls = build_acls( &acl );
- return Py_BuildValue( "(N,N)", pystat, acls );
+ PyObject *ret = Py_BuildValue( "(O,O)", pystat, acls );
+ Py_DECREF(pystat);
+ Py_DECREF(acls);
+ return ret;
}
+/* Synchronous node ACL update, returns integer error code */
PyObject *pyzoo_set_acl(PyObject *self, PyObject *args)
{
int zkhid;
@@ -950,43 +1218,53 @@ PyObject *pyzoo_set_acl(PyObject *self,
int version;
PyObject *pyacls;
struct ACL_vector acl;
- if (!PyArg_ParseTuple(args, "isiO", &zkhid, &path, &version, &pyacls))
+ if (!PyArg_ParseTuple(args, "isiO", &zkhid, &path, &version, &pyacls)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
- parse_acls(&acl, pyacls);
+ if (parse_acls(&acl, pyacls) == 0) {
+ return NULL;
+ }
int err = zoo_set_acl(zhandles[zkhid], path, version, &acl );
free_acls(&acl);
- if (err != ZOK)
- {
- PyErr_SetString(err_to_exception(err), zerror(err));
- return NULL;
- }
+ if (err != ZOK) {
+ PyErr_SetString(err_to_exception(err), zerror(err));
+ return NULL;
+ }
return Py_BuildValue("i", err);;
}
-///////////////////////////////////////////////////
-// session and context methods
+/* -------------------------------------------------------------------------- */
+/* Session and context methods */
+/* -------------------------------------------------------------------------- */
+
+/* Closes a connection, returns integer error code */
PyObject *pyzoo_close(PyObject *self, PyObject *args)
{
int zkhid;
- if (!PyArg_ParseTuple(args, "i", &zkhid))
+ if (!PyArg_ParseTuple(args, "i", &zkhid)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
int ret = zookeeper_close(zhandles[zkhid]);
- zhandles[zkhid] = NULL; // The zk C client frees the zhandle
+ zhandles[zkhid] = NULL; // The zk C client frees the zhandle
return Py_BuildValue("i", ret);
}
+/* Returns the ID of current client as a tuple (client_id, passwd) */
PyObject *pyzoo_client_id(PyObject *self, PyObject *args)
{
int zkhid;
- if (!PyArg_ParseTuple(args, "i", &zkhid))
+ if (!PyArg_ParseTuple(args, "i", &zkhid)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
const clientid_t *cid = zoo_client_id(zhandles[zkhid]);
- return Py_BuildValue("(Ls)", cid->client_id, cid->passwd);
+ return Py_BuildValue("(L,s)", cid->client_id, cid->passwd);
}
+/* DO NOT USE - context is used internally. This method is not exposed
+ in the Python module */
PyObject *pyzoo_get_context(PyObject *self, PyObject *args)
{
int zkhid;
@@ -996,60 +1274,76 @@ PyObject *pyzoo_get_context(PyObject *se
PyObject *context = NULL;
context = (PyObject*)zoo_get_context(zhandles[zkhid]);
if (context) return context;
+ Py_INCREF(Py_None);
return Py_None;
}
+/* DO NOT USE - context is used internally. This method is not exposed
+ in the Python module */
PyObject *pyzoo_set_context(PyObject *self, PyObject *args)
{
int zkhid;
PyObject *context;
- if (!PyArg_ParseTuple(args, "iO", &zkhid, &context))
+ if (!PyArg_ParseTuple(args, "iO", &zkhid, &context)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
PyObject *py_context = (PyObject*)zoo_get_context(zhandles[zkhid]);
- if (py_context != NULL) {
+ if (py_context != NULL && py_context != Py_None) {
Py_DECREF(py_context);
}
Py_INCREF(context);
zoo_set_context(zhandles[zkhid], (void*)context);
+ Py_INCREF(Py_None);
return Py_None;
}
-///////////////////////////////////////////////////////
-// misc
+
+/* -------------------------------------------------------------------------- */
+/* Miscellaneous methods */
+/* -------------------------------------------------------------------------- */
+
+/* Sets the global watcher. Returns None */
PyObject *pyzoo_set_watcher(PyObject *self, PyObject *args)
{
int zkhid;
PyObject *watcherfn;
- if (!PyArg_ParseTuple(args, "iO", &zkhid, &watcherfn))
+ if (!PyArg_ParseTuple(args, "iO", &zkhid, &watcherfn)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
pywatcher_t *pyw = watchers[zkhid];
if (pyw != NULL) {
free_pywatcher( pyw );
}
+ // Create a *permanent* watcher object, not deallocated when called
pyw = create_pywatcher(zkhid, watcherfn,1);
+ if (pyw == NULL) {
+ return NULL;
+ }
watchers[zkhid] = pyw;
zoo_set_watcher(zhandles[zkhid], watcher_dispatch);
zoo_set_context(zhandles[zkhid], pyw);
-
+ Py_INCREF(Py_None);
return Py_None;
}
+/* Returns an integer code representing the current connection
+ state */
PyObject *pyzoo_state(PyObject *self, PyObject *args)
{
int zkhid;
- if (!PyArg_ParseTuple(args,"i",&zkhid))
+ if (!PyArg_ParseTuple(args,"i",&zkhid)) {
return NULL;
+ }
CHECK_ZHANDLE(zkhid);
int state = zoo_state(zhandles[zkhid]);
return Py_BuildValue("i",state);
}
-// Synchronous calls return ZOK or throw an exception, but async calls get
-// an integer so should use this. This could perhaps use standardising.
+/* Convert an integer error code into a string */
PyObject *pyzerror(PyObject *self, PyObject *args)
{
int rc;
@@ -1058,6 +1352,7 @@ PyObject *pyzerror(PyObject *self, PyObj
return Py_BuildValue("s", zerror(rc));
}
+/* Returns the integer receive timeout for a connection */
PyObject *pyzoo_recv_timeout(PyObject *self, PyObject *args)
{
int zkhid;
@@ -1068,6 +1363,7 @@ PyObject *pyzoo_recv_timeout(PyObject *s
return Py_BuildValue("i",recv_timeout);
}
+/* Returns > 0 if connection is unrecoverable, 0 otherwise */
PyObject *pyis_unrecoverable(PyObject *self, PyObject *args)
{
int zkhid;
@@ -1078,43 +1374,58 @@ PyObject *pyis_unrecoverable(PyObject *s
return Py_BuildValue("i",ret); // TODO: make this a boolean
}
+/* Set the debug level for logging, returns None */
PyObject *pyzoo_set_debug_level(PyObject *self, PyObject *args)
{
int loglevel;
if (!PyArg_ParseTuple(args, "i", &loglevel))
- return NULL;
+ return NULL;
zoo_set_debug_level((ZooLogLevel)loglevel);
+ Py_INCREF(Py_None);
return Py_None;
}
+
static PyObject *log_stream = NULL;
+/* Set the output file-like object for logging output. Returns Py_None */
PyObject *pyzoo_set_log_stream(PyObject *self, PyObject *args)
{
PyObject *pystream = NULL;
- if (!PyArg_ParseTuple(args,"O",&pystream))
+ if (!PyArg_ParseTuple(args,"O",&pystream)) {
+ PyErr_SetString(PyExc_ValueError, "Must supply a Python object to set_log_stream");
return NULL;
- if (!PyFile_Check(pystream))
+ }
+ if (!PyFile_Check(pystream)) {
+ PyErr_SetString(PyExc_ValueError, "Must supply a file object to set_log_stream");
return NULL;
+ }
+ /* Release the previous reference to log_stream that we took */
if (log_stream != NULL) {
Py_DECREF(log_stream);
}
+
log_stream = pystream;
Py_INCREF(log_stream);
zoo_set_log_stream(PyFile_AsFile(log_stream));
+ Py_INCREF(Py_None);
return Py_None;
}
+/* Set the connection order - randomized or in-order. Returns None. */
PyObject *pyzoo_deterministic_conn_order(PyObject *self, PyObject *args)
{
int yesOrNo;
if (!PyArg_ParseTuple(args, "i",&yesOrNo))
return NULL;
zoo_deterministic_conn_order( yesOrNo );
+ Py_INCREF(Py_None);
return Py_None;
}
-///////////////////////////////////////////////////
+/* -------------------------------------------------------------------------- */
+/* Module setup */
+/* -------------------------------------------------------------------------- */
#include "pyzk_docstrings.h"
@@ -1131,11 +1442,6 @@ static PyMethodDef ZooKeeperMethods[] =
{"set_acl", pyzoo_set_acl, METH_VARARGS, pyzk_set_acl_doc },
{"close", pyzoo_close, METH_VARARGS, pyzk_close_doc },
{"client_id", pyzoo_client_id, METH_VARARGS, pyzk_client_id_doc },
- // DO NOT USE get / set_context. Context is used internally
- // to pass the python watcher to a dispatch function. If you want
- // context, set it through set_watcher.
- // {"get_context", pyzoo_get_context, METH_VARARGS, "" },
- // {"set_context", pyzoo_set_context, METH_VARARGS, "" },
{"set_watcher", pyzoo_set_watcher, METH_VARARGS },
{"state", pyzoo_state, METH_VARARGS, pyzk_state_doc },
{"recv_timeout",pyzoo_recv_timeout, METH_VARARGS },
@@ -1154,29 +1460,34 @@ static PyMethodDef ZooKeeperMethods[] =
{"aset_acl", pyzoo_aset_acl, METH_VARARGS, pyzk_aset_acl_doc },
{"zerror", pyzerror, METH_VARARGS, pyzk_zerror_doc },
{"add_auth", pyzoo_add_auth, METH_VARARGS, pyzk_add_auth_doc },
+ /* DO NOT USE get / set_context. Context is used internally to pass
+ the python watcher to a dispatch function. If you want context, set
+ it through set_watcher. */
+ // {"get_context", pyzoo_get_context, METH_VARARGS, "" },
+ // {"set_context", pyzoo_set_context, METH_VARARGS, "" },
{NULL, NULL}
};
#define ADD_INTCONSTANT(x) PyModule_AddIntConstant(module, #x, ZOO_##x)
#define ADD_INTCONSTANTZ(x) PyModule_AddIntConstant(module, #x, Z##x)
-
-
#define ADD_EXCEPTION(x) x = PyErr_NewException("zookeeper."#x, ZooKeeperException, NULL); \
- Py_INCREF(x); \
+ Py_INCREF(x); \
PyModule_AddObject(module, #x, x);
-PyMODINIT_FUNC initzookeeper() {
+PyMODINIT_FUNC initzookeeper(void) {
PyEval_InitThreads();
PyObject *module = Py_InitModule("zookeeper", ZooKeeperMethods );
- init_zhandles(32);
+ if (init_zhandles(32) == 0) {
+ return; // TODO: Is there any way to raise an exception here?
+ }
ZooKeeperException = PyErr_NewException("zookeeper.ZooKeeperException",
- PyExc_Exception,
- NULL);
+ PyExc_Exception,
+ NULL);
- PyModule_AddObject(module, "ZooKeeperException", ZooKeeperException);
+ PyModule_AddObject(module, "ZooKeeperException", ZooKeeperException);
Py_INCREF(ZooKeeperException);
ADD_INTCONSTANT(PERM_READ);
@@ -1258,11 +1569,3 @@ PyMODINIT_FUNC initzookeeper() {
ADD_EXCEPTION(NothingException);
ADD_EXCEPTION(SessionMovedException);
}
-
-int main(int argc, char **argv)
-{
- Py_SetProgramName(argv[0]);
- Py_Initialize();
- initzookeeper();
- return 0;
-}