You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2009/10/14 01:14:05 UTC
svn commit: r824971 - in /hadoop/zookeeper/trunk: CHANGES.txt
src/contrib/zkpython/src/c/zookeeper.c
src/contrib/zkpython/src/test/connection_test.py
Author: phunt
Date: Tue Oct 13 23:14:05 2009
New Revision: 824971
URL: http://svn.apache.org/viewvc?rev=824971&view=rev
Log:
ZOOKEEPER-541. zkpython limited to 256 handles
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c
hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 13 23:14:05 2009
@@ -87,6 +87,8 @@
ZOOKEEEPER-510. zkpython lumps all exceptions as IOError, needs specialized
exceptions for KeeperException types (henry & pat via mahadev)
+
+ ZOOKEEPER-541. zkpython limited to 256 handles (henry robinson via phunt)
IMPROVEMENTS:
ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
Modified: hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c (original)
+++ hadoop/zookeeper/trunk/src/contrib/zkpython/src/c/zookeeper.c Tue Oct 13 23:14:05 2009
@@ -19,11 +19,7 @@
#include <Python.h>
#include <zookeeper.h>
#include <assert.h>
-
-#define MAX_ZHANDLES 256
-static zhandle_t* zhandles[MAX_ZHANDLES];
-static int num_zhandles = 0;
-
+
//////////////////////////////////////////////
// EXCEPTIONS
PyObject *ZooKeeperException = NULL;
@@ -122,7 +118,73 @@
// the global watchers for each connection - but they're
// inaccessible without pulling in zk_adaptor.h, which I'm
// trying to avoid.
-static pywatcher_t *watchers[MAX_ZHANDLES];
+static pywatcher_t **watchers;
+
+// We keep an array of zhandles available for use.
+// When a zhandle is correctly closed, the C client
+// frees the memory so we set the zhandles[i] entry to NULL.
+// This entry can then be re-used
+static zhandle_t** zhandles = NULL;
+static int num_zhandles = 0;
+static int max_zhandles = 0;
+#define REAL_MAX_ZHANDLES 32768
+
+// Allocates an initial zhandle and watcher array
+void init_zhandles(int num) {
+ zhandles = malloc(sizeof(zhandle_t*)*num);
+ watchers = malloc(sizeof(pywatcher_t*)*num);
+ max_zhandles = num;
+ num_zhandles = 0;
+ memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+}
+
+// Note that the following zhandle functions are not
+// thread-safe. The C-Python runtime does not seem to
+// pre-empt a thread that is in a C module, so there's
+// no need for synchronisation.
+
+// Doubles the size of the zhandle / watcher array
+// Returns 0 if the new array would be >= REAL_MAX_ZHANDLES
+// in size.
+int resize_zhandles() {
+ zhandle_t **tmp = zhandles;
+ pywatcher_t ** wtmp = watchers;
+ if (max_zhandles >= REAL_MAX_ZHANDLES >> 1) {
+ return -1;
+ }
+ max_zhandles *= 2;
+ zhandles = malloc(sizeof(zhandle_t*)*max_zhandles);
+ memset(zhandles, 0, sizeof(zhandle_t*)*max_zhandles);
+ memcpy(zhandles, tmp, sizeof(zhandle_t*)*max_zhandles/2);
+
+ watchers = malloc(sizeof(pywatcher_t*)*max_zhandles);
+ memset(watchers, 0, sizeof(pywatcher_t*)*max_zhandles);
+ memcpy(watchers, wtmp, sizeof(pywatcher_t*)*max_zhandles/2);
+
+ free(wtmp);
+ free(tmp);
+ return 0;
+}
+
+// Find a free zhandle - this is expensive, but we
+// expect it to be infrequently called.
+// There are optimisations that can be made if this turns out
+// to be problematic.
+// Returns -1 if no free handle is found.
+unsigned int next_zhandle() {
+ int i = 0;
+ for (i=0;i<max_zhandles;++i) {
+ if (zhandles[i] == NULL) {
+ num_zhandles++;
+ return i;
+ }
+ }
+
+ return -1;
+}
+
+/////////////////////////////////////
+// Pywatcher funcs
pywatcher_t *create_pywatcher(int zh, PyObject* cb, int permanent)
{
@@ -248,11 +310,16 @@
clientid_t cid;
cid.client_id = -1;
const char *passwd;
+ int handle = next_zhandle();
+ if (handle == -1) {
+ resize_zhandles();
+ handle = next_zhandle();
+ }
- if (num_zhandles >= MAX_ZHANDLES) {
- PyErr_SetString( ZooKeeperException, "Too many ZooKeeper handles created, max is 256" );
- return NULL;
- }
+ if (handle == -1) {
+ PyErr_SetString(ZooKeeperException,"Couldn't find a free zhandle, something is very wrong");
+ return NULL;
+ }
if (!PyArg_ParseTuple(args, "s|Oi(Ls)", &host, &watcherfn, &recv_timeout, &cid.client_id, &passwd))
return NULL;
@@ -262,9 +329,9 @@
}
pywatcher_t *pyw = NULL;
if (watcherfn != Py_None) {
- pyw = create_pywatcher(num_zhandles, watcherfn,1);
+ pyw = create_pywatcher(handle, watcherfn,1);
}
- watchers[num_zhandles] = pyw;
+ watchers[handle] = pyw;
zhandle_t *zh = zookeeper_init( host, watcherfn != Py_None ? watcher_dispatch : NULL,
recv_timeout, cid.client_id == -1 ? 0 : &cid,
pyw,
@@ -272,12 +339,12 @@
if (zh == NULL)
{
- PyErr_SetString( ZooKeeperException, "Unknown error" );
+ PyErr_SetString( ZooKeeperException, "Could not internally obtain zookeeper handle" );
return NULL;
}
- zhandles[num_zhandles] = zh;
- return Py_BuildValue( "i", num_zhandles++ );
+ zhandles[handle] = zh;
+ return Py_BuildValue( "i", handle);
}
///////////////////////////////////////////////////////
@@ -1053,6 +1120,7 @@
PyMODINIT_FUNC initzookeeper() {
PyEval_InitThreads();
PyObject *module = Py_InitModule("zookeeper", ZooKeeperMethods );
+ init_zhandles(32);
ZooKeeperException = PyErr_NewException("zookeeper.ZooKeeperException",
PyExc_Exception,
Modified: hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py?rev=824971&r1=824970&r2=824971&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py (original)
+++ hadoop/zookeeper/trunk/src/contrib/zkpython/src/test/connection_test.py Tue Oct 13 23:14:05 2009
@@ -19,6 +19,7 @@
import unittest, threading
import zookeeper, zktestbase
+ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}
class ConnectionTest(zktestbase.TestBase):
"""Test whether we can make a connection"""
@@ -55,6 +56,69 @@
self.handle,
"/")
+ def testhandlereuse(self):
+ """
+ Test a) multiple concurrent connections b) reuse of closed handles
+ """
+ cv = threading.Condition()
+ self.connected = False
+ def connection_watcher(handle, type, state, path):
+ cv.acquire()
+ self.connected = True
+ self.assertEqual(zookeeper.CONNECTED_STATE, state)
+ self.handle = handle
+ cv.notify()
+ cv.release()
+
+ cv.acquire()
+ handles = [ zookeeper.init(self.host) for i in xrange(10) ]
+ ret = zookeeper.init(self.host, connection_watcher)
+ cv.wait(15.0)
+ cv.release()
+ self.assertEqual(self.connected, True, "Connection timed out to " + self.host)
+ self.assertEqual(True, all( [ zookeeper.state(handle) == zookeeper.CONNECTED_STATE for handle in handles ] ),
+ "Not all connections succeeded")
+ oldhandle = handles[3]
+ zookeeper.close(oldhandle)
+ newhandle = zookeeper.init(self.host)
+
+ # This assertion tests *internal* behaviour; i.e. that the module
+ # correctly reuses closed handles. This is therefore implementation
+ # dependent.
+ self.assertEqual(newhandle, oldhandle, "Didn't get reused handle")
+
+ def testmanyhandles(self):
+ """
+ Test the ability of the module to support many handles.
+ """
+ # We'd like to do more, but currently the C client doesn't
+ # work with > 83 handles (fails to create a pipe) on MacOS 10.5.8
+ handles = [ zookeeper.init(self.host) for i in xrange(63) ]
+
+ cv = threading.Condition()
+ self.connected = False
+ def connection_watcher(handle, type, state, path):
+ cv.acquire()
+ self.connected = True
+ self.assertEqual(zookeeper.CONNECTED_STATE, state)
+ self.handle = handle
+ cv.notify()
+ cv.release()
+
+ cv.acquire()
+ ret = zookeeper.init(self.host, connection_watcher)
+ cv.wait(15.0)
+ cv.release()
+ self.assertEqual(self.connected, True, "Connection timed out to " + self.host)
+
+ for i,h in enumerate(handles):
+ path = "/zkpython-test-handles-%s" % str(i)
+ self.assertEqual(path, zookeeper.create(h, path, "", [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL))
+
+ self.assertEqual(True, all( zookeeper.close(h) == zookeeper.OK for h in handles ))
+
+
+
def tearDown(self):
pass