You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/08/13 19:59:00 UTC
svn commit: r685624 [2/3] - in /hadoop/zookeeper/trunk/src/contrib: ./
zkfuse/ zkfuse/src/
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/log4cxx.properties Wed Aug 13 10:58:59 2008
@@ -0,0 +1,12 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=TRACE, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4cxx.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.category.zkfuse=TRACE
+
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/mutex.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MUTEX_H__
+#define __MUTEX_H__
+
+#include <pthread.h>
+#include <errno.h>
+#include <sys/time.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Cond;
+
+class Mutex {
+ friend class Cond;
+ public:
+ Mutex() {
+ pthread_mutexattr_init( &m_mutexAttr );
+ pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP );
+ pthread_mutex_init( &mutex, &m_mutexAttr );
+ }
+ ~Mutex() {
+ pthread_mutex_destroy(&mutex);
+ pthread_mutexattr_destroy( &m_mutexAttr );
+ }
+ void Acquire() { Lock(); }
+ void Release() { Unlock(); }
+ void Lock() {
+ pthread_mutex_lock(&mutex);
+ }
+ int TryLock() {
+ return pthread_mutex_trylock(&mutex);
+ }
+ void Unlock() {
+ pthread_mutex_unlock(&mutex);
+ }
+ private:
+ pthread_mutex_t mutex;
+ pthread_mutexattr_t m_mutexAttr;
+};
+
+class AutoLock {
+ public:
+ AutoLock(Mutex& mutex) : _mutex(mutex) {
+ mutex.Lock();
+ }
+ ~AutoLock() {
+ _mutex.Unlock();
+ }
+ private:
+ friend class AutoUnlockTemp;
+ Mutex& _mutex;
+};
+
+class AutoUnlockTemp {
+ public:
+ AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) {
+ _autoLock._mutex.Unlock();
+ }
+ ~AutoUnlockTemp() {
+ _autoLock._mutex.Lock();
+ }
+ private:
+ AutoLock & _autoLock;
+};
+
+class Cond {
+ public:
+ Cond() {
+ static pthread_condattr_t attr;
+ static bool inited = false;
+ if(!inited) {
+ inited = true;
+ pthread_condattr_init(&attr);
+ }
+ pthread_cond_init(&_cond, &attr);
+ }
+ ~Cond() {
+ pthread_cond_destroy(&_cond);
+ }
+
+ void Wait(Mutex& mutex) {
+ pthread_cond_wait(&_cond, &mutex.mutex);
+ }
+
+ bool Wait(Mutex& mutex, long long int timeout) {
+ struct timeval now;
+ gettimeofday( &now, NULL );
+ struct timespec abstime;
+ int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec;
+ microSecs += timeout * 1000;
+ abstime.tv_sec = microSecs / 1000000LL;
+ abstime.tv_nsec = (microSecs % 1000000LL) * 1000;
+ if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ void Signal() {
+ pthread_cond_signal(&_cond);
+ }
+
+ private:
+ pthread_cond_t _cond;
+};
+
+/**
+ * A wrapper class for {@link Mutex} and {@link Cond}.
+ */
+class Lock {
+ public:
+
+ void lock() {
+ m_mutex.Lock();
+ }
+
+ void unlock() {
+ m_mutex.Unlock();
+ }
+
+ void wait() {
+ m_cond.Wait( m_mutex );
+ }
+
+ bool wait(long long int timeout) {
+ return m_cond.Wait( m_mutex, timeout );
+ }
+
+ void notify() {
+ m_cond.Signal();
+ }
+
+ private:
+
+ /**
+ * The mutex.
+ */
+ Mutex m_mutex;
+
+ /**
+ * The condition associated with this lock's mutex.
+ */
+ Cond m_cond;
+};
+
+END_ZKFUSE_NAMESPACE
+
+#endif /* __MUTEX_H__ */
+
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.cc Wed Aug 13 10:58:59 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <log.h>
+
+#include "thread.h"
+
+DEFINE_LOGGER( LOG, "Thread" )
+
+START_ZKFUSE_NAMESPACE
+
+void Thread::Create(void* ctx, ThreadFunc func)
+{
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setstacksize(&attr, _stackSize);
+ int ret = pthread_create(&mThread, &attr, func, ctx);
+ if(ret != 0) {
+ LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) );
+ }
+ // pthread_attr_destroy(&attr);
+ _ctx = ctx;
+ _func = func;
+}
+
+END_ZKFUSE_NAMESPACE
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/thread.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __THREAD_H__
+#define __THREAD_H__
+
+#include <errno.h>
+#include <string.h>
+#include <assert.h>
+#include <pthread.h>
+
+#include "log.h"
+
+START_ZKFUSE_NAMESPACE
+
+class Thread {
+ public:
+ static const size_t defaultStackSize = 1024 * 1024;
+ typedef void* (*ThreadFunc) (void*);
+ Thread(size_t stackSize = defaultStackSize)
+ : _stackSize(stackSize), _ctx(NULL), _func(NULL)
+ {
+ memset( &mThread, 0, sizeof(mThread) );
+ }
+ ~Thread() { }
+
+ void Create(void* ctx, ThreadFunc func);
+ void Join() {
+ //avoid SEGFAULT because of unitialized mThread
+ //in case Create(...) was never called
+ if (_func != NULL) {
+ pthread_join(mThread, 0);
+ }
+ }
+ private:
+ pthread_t mThread;
+ void *_ctx;
+ ThreadFunc _func;
+ size_t _stackSize;
+};
+
+
+template<typename T>
+struct ThreadContext {
+ typedef void (T::*FuncPtr) (void);
+ ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {}
+ void run(void) {
+ (_ctx.*_func)();
+ }
+ T& _ctx;
+ FuncPtr _func;
+};
+
+template<typename T>
+void* ThreadExec(void *obj) {
+ ThreadContext<T>* tc = (ThreadContext<T>*)(obj);
+ assert(tc != 0);
+ tc->run();
+ return 0;
+}
+
+template <typename T>
+class CXXThread : public Thread {
+ public:
+ typedef void (T::*FuncPtr) (void);
+ CXXThread(size_t stackSize = Thread::defaultStackSize)
+ : Thread(stackSize), ctx(0) {}
+ ~CXXThread() { if (ctx) delete ctx; }
+
+ void Create(T& obj, FuncPtr func) {
+ assert(ctx == 0);
+ ctx = new ThreadContext<T>(obj, func);
+ Thread::Create(ctx, ThreadExec<T>);
+ }
+
+ private:
+ ThreadContext<T>* ctx;
+};
+
+
+END_ZKFUSE_NAMESPACE
+
+#endif /* __THREAD_H__ */
+
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.cc Wed Aug 13 10:58:59 2008
@@ -0,0 +1,879 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include <iostream>
+
+#include "blockingqueue.h"
+#include "thread.h"
+#include "zkadapter.h"
+
+using namespace std;
+using namespace zk;
+
+DEFINE_LOGGER( LOG, "zookeeper.adapter" )
+DEFINE_LOGGER( ZK_LOG, "zookeeper.core" )
+
+/**
+ * \brief A helper class to initialize ZK logging.
+ */
+class InitZooKeeperLogging
+{
+ public:
+ InitZooKeeperLogging() {
+ if (ZK_LOG->isDebugEnabled()
+#ifdef LOG4CXX_TRACE
+ || ZK_LOG->isTraceEnabled()
+#endif
+ )
+ {
+ zoo_set_debug_level( LOG_LEVEL_DEBUG );
+ } else if (ZK_LOG->isInfoEnabled()) {
+ zoo_set_debug_level( LOG_LEVEL_INFO );
+ } else if (ZK_LOG->isWarnEnabled()) {
+ zoo_set_debug_level( LOG_LEVEL_WARN );
+ } else {
+ zoo_set_debug_level( LOG_LEVEL_ERROR );
+ }
+ }
+};
+
+using namespace std;
+
+namespace zk
+{
+
+/**
+ * \brief This class provides logic for checking if a request can be retried.
+ */
+class RetryHandler
+{
+ public:
+ RetryHandler(const ZooKeeperConfig &zkConfig)
+ : m_zkConfig(zkConfig)
+ {
+ if (zkConfig.getAutoReconnect()) {
+ retries = 2;
+ } else {
+ retries = 0;
+ }
+ }
+
+ /**
+ * \brief Attempts to fix a side effect of the given RC.
+ *
+ * @param rc the ZK error code
+ * @return whether the error code has been handled and the caller should
+ * retry an operation the caused this error
+ */
+ bool handleRC(int rc)
+ {
+ TRACE( LOG, "handleRC" );
+
+ //check if the given error code is recoverable
+ if (!retryOnError(rc)) {
+ return false;
+ }
+ LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries );
+ if (retries-- > 0) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private:
+ /**
+ * The ZK config.
+ */
+ const ZooKeeperConfig &m_zkConfig;
+
+ /**
+ * The number of outstanding retries.
+ */
+ int retries;
+
+ /**
+ * Checks whether the given error entitles this adapter
+ * to retry the previous operation.
+ *
+ * @param zkErrorCode one of the ZK error code
+ */
+ static bool retryOnError(int zkErrorCode)
+ {
+ return (zkErrorCode == ZCONNECTIONLOSS ||
+ zkErrorCode == ZOPERATIONTIMEOUT);
+ }
+};
+
+
+//the implementation of the global ZK event watcher
+void zkWatcher(zhandle_t *zh, int type, int state, const char *path)
+{
+ TRACE( LOG, "zkWatcher" );
+
+ //a workaround for buggy ZK API
+ string sPath =
+ (path == NULL ||
+ state == SESSION_EVENT ||
+ state == NOTWATCHING_EVENT)
+ ? ""
+ : string(path);
+ LOG_INFO( LOG,
+ "Received a ZK event - type: %d, state: %d, path: '%s'",
+ type, state, sPath.c_str() );
+ ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh);
+ if (zka != NULL) {
+ zka->enqueueEvent( type, state, sPath );
+ } else {
+ LOG_ERROR( LOG,
+ "Skipping ZK event (type: %d, state: %d, path: '%s'), "
+ "because ZK passed no context",
+ type, state, sPath.c_str() );
+ }
+}
+
+
+
+// =======================================================================
+
+ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config,
+ ZKEventListener *listener,
+ bool establishConnection)
+ throw(ZooKeeperException)
+ : m_zkConfig(config),
+ mp_zkHandle(NULL),
+ m_terminating(false),
+ m_connected(false),
+ m_state(AS_DISCONNECTED)
+{
+ TRACE( LOG, "ZooKeeperAdapter" );
+
+ resetRemainingConnectTimeout();
+
+ //enforce setting up appropriate ZK log level
+ static InitZooKeeperLogging INIT_ZK_LOGGING;
+
+ if (listener != NULL) {
+ addListener(listener);
+ }
+
+ //start the event dispatcher thread
+ m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents );
+
+ //start the user event dispatcher thread
+ m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents );
+
+ //optionally establish the connection
+ if (establishConnection) {
+ reconnect();
+ }
+}
+
+ZooKeeperAdapter::~ZooKeeperAdapter()
+{
+ TRACE( LOG, "~ZooKeeperAdapter" );
+
+ try {
+ disconnect();
+ } catch (std::exception &e) {
+ LOG_ERROR( LOG,
+ "An exception while disconnecting from ZK: %s",
+ e.what() );
+ }
+ m_terminating = true;
+ m_userEventDispatcher.Join();
+ m_eventDispatcher.Join();
+}
+
+void
+ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException)
+{
+ TRACE( LOG, "validatePath" );
+
+ if (path.find( "/" ) != 0) {
+ throw ZooKeeperException( string("Node path must start with '/' but"
+ "it was '") +
+ path +
+ "'" );
+ }
+ if (path.length() > 1) {
+ if (path.rfind( "/" ) == path.length() - 1) {
+ throw ZooKeeperException( string("Node path must not end with "
+ "'/' but it was '") +
+ path +
+ "'" );
+ }
+ if (path.find( "//" ) != string::npos) {
+ throw ZooKeeperException( string("Node path must not contain "
+ "'//' but it was '") +
+ path +
+ "'" );
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::disconnect()
+{
+ TRACE( LOG, "disconnect" );
+ LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+
+ m_stateLock.lock();
+ if (mp_zkHandle != NULL) {
+ zookeeper_close( mp_zkHandle );
+ mp_zkHandle = NULL;
+ setState( AS_DISCONNECTED );
+ }
+ m_stateLock.unlock();
+}
+
+void
+ZooKeeperAdapter::reconnect() throw(ZooKeeperException)
+{
+ TRACE( LOG, "reconnect" );
+
+ m_stateLock.lock();
+ //clear the connection state
+ disconnect();
+
+ //establish a new connection to ZooKeeper
+ mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(),
+ zkWatcher,
+ m_zkConfig.getLeaseTimeout(),
+ NULL, this, 0);
+ resetRemainingConnectTimeout();
+ if (mp_zkHandle != NULL) {
+ setState( AS_CONNECTING );
+ m_stateLock.unlock();
+ } else {
+ m_stateLock.unlock();
+ throw ZooKeeperException(
+ string("Unable to connect to ZK running at '") +
+ m_zkConfig.getHosts() + "'" );
+ }
+
+ LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state );
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type, int state, const string &path)
+{
+ TRACE( LOG, "handleEvent" );
+ LOG_TRACE( LOG,
+ "type: %d, state %d, path: %s",
+ type, state, path.c_str() );
+ Listener2Context context, context2;
+ //ignore internal ZK events
+ if (type != SESSION_EVENT && type != NOTWATCHING_EVENT) {
+ m_zkContextsMutex.Acquire();
+ //check if the user context is available
+ if (type == CHANGED_EVENT || type == DELETED_EVENT) {
+ //we may have two types of interest here,
+ //in this case lets try to notify twice
+ context = findAndRemoveListenerContext( GET_NODE_DATA, path );
+ context2 = findAndRemoveListenerContext( NODE_EXISTS, path );
+ if (context.empty()) {
+ //make sure that the 2nd context is NULL and
+ // assign it to the 1st one
+ context = context2;
+ context2.clear();
+ }
+ } else if (type == CHILD_EVENT) {
+ context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path );
+ } else if (type == CREATED_EVENT) {
+ context = findAndRemoveListenerContext( NODE_EXISTS, path );
+ }
+ m_zkContextsMutex.Release();
+ }
+
+ handleEvent( type, state, path, context );
+ if (!context2.empty()) {
+ handleEvent( type, state, path, context2 );
+ }
+}
+
+void
+ZooKeeperAdapter::handleEvent(int type,
+ int state,
+ const string &path,
+ const Listener2Context &listeners)
+{
+ TRACE( LOG, "handleEvents" );
+
+ if (listeners.empty()) {
+ //propagate with empty context
+ ZKWatcherEvent event(type, state, path);
+ fireEvent( event );
+ } else {
+ for (Listener2Context::const_iterator i = listeners.begin();
+ i != listeners.end();
+ ++i) {
+ ZKWatcherEvent event(type, state, path, i->second);
+ if (i->first != NULL) {
+ fireEvent( i->first, event );
+ } else {
+ fireEvent( event );
+ }
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path)
+{
+ TRACE( LOG, "enqueueEvents" );
+
+ m_events.put( ZKWatcherEvent( type, state, path ) );
+}
+
+void
+ZooKeeperAdapter::processEvents()
+{
+ TRACE( LOG, "processEvents" );
+
+ while (!m_terminating) {
+ bool timedOut = false;
+ ZKWatcherEvent source = m_events.take( 100, &timedOut );
+ if (!timedOut) {
+ if (source.getType() == SESSION_EVENT) {
+ LOG_INFO( LOG,
+ "Received SESSION event, state: %d. Adapter state: %d",
+ source.getState(), m_state );
+ m_stateLock.lock();
+ if (source.getState() == CONNECTED_STATE) {
+ m_connected = true;
+ resetRemainingConnectTimeout();
+ setState( AS_CONNECTED );
+ } else if (source.getState() == CONNECTING_STATE) {
+ m_connected = false;
+ setState( AS_CONNECTING );
+ } else if (source.getState() == EXPIRED_SESSION_STATE) {
+ LOG_INFO( LOG, "Received EXPIRED_SESSION event" );
+ setState( AS_SESSION_EXPIRED );
+ }
+ m_stateLock.unlock();
+ }
+ m_userEvents.put( source );
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::processUserEvents()
+{
+ TRACE( LOG, "processUserEvents" );
+
+ while (!m_terminating) {
+ bool timedOut = false;
+ ZKWatcherEvent source = m_userEvents.take( 100, &timedOut );
+ if (!timedOut) {
+ try {
+ handleEvent( source.getType(),
+ source.getState(),
+ source.getPath() );
+ } catch (std::exception &e) {
+ LOG_ERROR( LOG,
+ "Unable to process event (type: %d, state: %d, "
+ "path: %s), because of exception: %s",
+ source.getType(),
+ source.getState(),
+ source.getPath().c_str(),
+ e.what() );
+ }
+ }
+ }
+}
+
+void
+ZooKeeperAdapter::registerContext(WatchableMethod method,
+ const string &path,
+ ZKEventListener *listener,
+ ContextType context)
+{
+ TRACE( LOG, "registerContext" );
+
+ m_zkContexts[method][path][listener] = context;
+}
+
+ZooKeeperAdapter::Listener2Context
+ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method,
+ const string &path)
+{
+ TRACE( LOG, "findAndRemoveListenerContext" );
+
+ Listener2Context listeners;
+ Path2Listener2Context::iterator elem = m_zkContexts[method].find( path );
+ if (elem != m_zkContexts[method].end()) {
+ listeners = elem->second;
+ m_zkContexts[method].erase( elem );
+ }
+ return listeners;
+}
+
+void
+ZooKeeperAdapter::setState(AdapterState newState)
+{
+ TRACE( LOG, "setState" );
+ if (newState != m_state) {
+ LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState );
+ m_state = newState;
+ m_stateLock.notify();
+ } else {
+ LOG_TRACE( LOG, "New state same as the current: %d", newState );
+ }
+}
+
+
+//TODO move this code to verifyConnection so reconnect()
+//is called from one place only
+void
+ZooKeeperAdapter::waitUntilConnected()
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "waitUntilConnected" );
+ long long int timeout = getRemainingConnectTimeout();
+ LOG_INFO( LOG,
+ "Waiting up to %lld ms until a connection to ZK is established",
+ timeout );
+ bool connected;
+ if (timeout > 0) {
+ long long int toWait = timeout;
+ while (m_state != AS_CONNECTED && toWait > 0) {
+ //check if session expired and reconnect if so
+ if (m_state == AS_SESSION_EXPIRED) {
+ LOG_INFO( LOG,
+ "Reconnecting because the current session has expired" );
+ reconnect();
+ }
+ struct timeval now;
+ gettimeofday( &now, NULL );
+ int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000);
+ LOG_TRACE( LOG, "About to wait %lld ms", toWait );
+ m_stateLock.wait( toWait );
+ gettimeofday( &now, NULL );
+ milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000;
+ toWait -= milliSecs;
+ }
+ waitedForConnect( timeout - toWait );
+ LOG_INFO( LOG, "Waited %lld ms", timeout - toWait );
+ }
+ connected = (m_state == AS_CONNECTED);
+ if (!connected) {
+ if (timeout > 0) {
+ LOG_WARN( LOG, "Timed out while waiting for connection to ZK" );
+ throw ZooKeeperException("Timed out while waiting for "
+ "connection to ZK");
+ } else {
+ LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" );
+ throw ZooKeeperException("Global timeout expired and still not "
+ "connected to ZK");
+ }
+ }
+ LOG_INFO( LOG, "Connected!" );
+}
+
+void
+ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException)
+{
+ TRACE( LOG, "verifyConnection" );
+
+ m_stateLock.lock();
+ try {
+ if (m_state == AS_DISCONNECTED) {
+ throw ZooKeeperException("Disconnected from ZK. " \
+ "Please use reconnect() before attempting to use any ZK API");
+ } else if (m_state != AS_CONNECTED) {
+ LOG_TRACE( LOG, "Checking if need to reconnect..." );
+ //we are not connected, so check if connection in progress...
+ if (m_state != AS_CONNECTING) {
+ LOG_TRACE( LOG,
+ "yes. Checking if allowed to auto-reconnect..." );
+ //...not in progres, so check if we can reconnect
+ if (!m_zkConfig.getAutoReconnect()) {
+ //...too bad, disallowed :(
+ LOG_TRACE( LOG, "no. Sorry." );
+ throw ZooKeeperException("ZK connection is down and "
+ "auto-reconnect is not allowed");
+ } else {
+ LOG_TRACE( LOG, "...yes. About to reconnect" );
+ }
+ //...we are good to retry the connection
+ reconnect();
+ } else {
+ LOG_TRACE( LOG, "...no, already in CONNECTING state" );
+ }
+ //wait until the connection is established
+ waitUntilConnected();
+ }
+ } catch (ZooKeeperException &e) {
+ m_stateLock.unlock();
+ throw;
+ }
+ m_stateLock.unlock();
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors,
+ string &returnPath)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createNode (internal)" );
+ validatePath( path );
+
+ const int MAX_PATH_LENGTH = 1024;
+ char realPath[MAX_PATH_LENGTH];
+ realPath[0] = 0;
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_create( mp_zkHandle,
+ path.c_str(),
+ value.c_str(),
+ value.length(),
+ &OPEN_ACL_UNSAFE,
+ flags,
+ realPath,
+ MAX_PATH_LENGTH );
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNODEEXISTS) {
+ //the node already exists
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ return false;
+ } else if (rc == ZNONODE && createAncestors) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ //one of the ancestors doesn't exist so lets start from the root
+ //and make sure the whole path exists, creating missing nodes if
+ //necessary
+ for (string::size_type pos = 1; pos != string::npos; ) {
+ pos = path.find( "/", pos );
+ if (pos != string::npos) {
+ try {
+ createNode( path.substr( 0, pos ), "", 0, true );
+ } catch (ZooKeeperException &e) {
+ throw ZooKeeperException( string("Unable to create "
+ "node ") +
+ path,
+ rc );
+ }
+ pos++;
+ } else {
+ //no more path components
+ return createNode( path, value, flags, false, returnPath );
+ }
+ }
+ }
+ LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to create node ") +
+ path,
+ rc );
+ } else {
+ LOG_INFO( LOG, "%s has been created", realPath );
+ returnPath = string( realPath );
+ return true;
+ }
+}
+
+bool
+ZooKeeperAdapter::createNode(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createNode" );
+
+ string createdPath;
+ return createNode( path, value, flags, createAncestors, createdPath );
+}
+
+int64_t
+ZooKeeperAdapter::createSequence(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "createSequence" );
+
+ string createdPath;
+ bool result = createNode( path,
+ value,
+ flags | SEQUENCE,
+ createAncestors,
+ createdPath );
+ if (!result) {
+ return -1;
+ } else {
+ //extract sequence number from the returned path
+ if (createdPath.find( path ) != 0) {
+ throw ZooKeeperException( string("Expecting returned path '") +
+ createdPath +
+ "' to start with '" +
+ path +
+ "'" );
+ }
+ string seqSuffix =
+ createdPath.substr( path.length(),
+ createdPath.length() - path.length() );
+ char *ptr = NULL;
+ int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 );
+ if (ptr != NULL && *ptr != '\0') {
+ throw ZooKeeperException( string("Expecting a number but got ") +
+ seqSuffix );
+ }
+ return seq;
+ }
+}
+
+bool
+ZooKeeperAdapter::deleteNode(const string &path,
+ bool recursive,
+ int version)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "deleteNode" );
+
+ validatePath( path );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_delete( mp_zkHandle, path.c_str(), version );
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNONODE) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ return false;
+ }
+ if (rc == ZNOTEMPTY && recursive) {
+ LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() );
+ //get all children and delete them recursively...
+ vector<string> nodeList;
+ getNodeChildren( nodeList, path, false );
+ for (vector<string>::const_iterator i = nodeList.begin();
+ i != nodeList.end();
+ ++i) {
+ deleteNode( *i, true );
+ }
+ //...and finally attempt to delete the node again
+ return deleteNode( path, false );
+ }
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to delete node ") + path,
+ rc );
+ } else {
+ LOG_INFO( LOG, "%s has been deleted", path.c_str() );
+ return true;
+ }
+}
+
+bool
+ZooKeeperAdapter::nodeExists(const string &path,
+ ZKEventListener *listener,
+ void *context, Stat *stat)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "nodeExists" );
+
+ validatePath( path );
+
+ struct Stat tmpStat;
+ if (stat == NULL) {
+ stat = &tmpStat;
+ }
+ memset( stat, 0, sizeof(Stat) );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_exists( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ stat );
+ if (rc == ZOK || rc == ZNONODE) {
+ registerContext( NODE_EXISTS, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_exists( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ stat );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ if (rc == ZNONODE) {
+ LOG_TRACE( LOG, "Node %s does not exist", path.c_str() );
+ return false;
+ }
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException(
+ string("Unable to check existence of node ") + path,
+ rc );
+ } else {
+ return true;
+ }
+}
+
+void
+ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList,
+ const string &path,
+ ZKEventListener *listener,
+ void *context)
+ throw (ZooKeeperException)
+{
+ TRACE( LOG, "getNodeChildren" );
+
+ validatePath( path );
+
+ String_vector children;
+ memset( &children, 0, sizeof(children) );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_get_children( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ &children );
+ if (rc == ZOK) {
+ registerContext( GET_NODE_CHILDREN, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_get_children( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ &children );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to get children of node ") +
+ path,
+ rc );
+ } else {
+ for (int i = 0; i < children.count; ++i) {
+ //convert each child's path from relative to absolute
+ string absPath(path);
+ if (path != "/") {
+ absPath.append( "/" );
+ }
+ absPath.append( children.data[i] );
+ nodeList.push_back( absPath );
+ }
+ //make sure the order is always deterministic
+ sort( nodeList.begin(), nodeList.end() );
+ }
+}
+
+string
+ZooKeeperAdapter::getNodeData(const string &path,
+ ZKEventListener *listener,
+ void *context, Stat *stat)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "getNodeData" );
+
+ validatePath( path );
+
+ const int MAX_DATA_LENGTH = 128 * 1024;
+ char buffer[MAX_DATA_LENGTH];
+ memset( buffer, 0, MAX_DATA_LENGTH );
+ struct Stat tmpStat;
+ if (stat == NULL) {
+ stat = &tmpStat;
+ }
+ memset( stat, 0, sizeof(Stat) );
+
+ int rc;
+ int len;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ len = MAX_DATA_LENGTH - 1;
+ if (context != NULL) {
+ m_zkContextsMutex.Acquire();
+ rc = zoo_get( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ buffer, &len, stat );
+ if (rc == ZOK) {
+ registerContext( GET_NODE_DATA, path, listener, context );
+ }
+ m_zkContextsMutex.Release();
+ } else {
+ rc = zoo_get( mp_zkHandle,
+ path.c_str(),
+ (listener != NULL ? 1 : 0),
+ buffer, &len, stat );
+ }
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException(
+ string("Unable to get data of node ") + path, rc
+ );
+ } else {
+ return string( buffer, buffer + len );
+ }
+}
+
+void
+ZooKeeperAdapter::setNodeData(const string &path,
+ const string &value,
+ int version)
+ throw(ZooKeeperException)
+{
+ TRACE( LOG, "setNodeData" );
+
+ validatePath( path );
+
+ int rc;
+ RetryHandler rh(m_zkConfig);
+ do {
+ verifyConnection();
+ rc = zoo_set( mp_zkHandle,
+ path.c_str(),
+ value.c_str(),
+ value.length(), version );
+ } while (rc != ZOK && rh.handleRC(rc));
+ if (rc != ZOK) {
+ LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() );
+ throw ZooKeeperException( string("Unable to set data for node ") +
+ path,
+ rc );
+ }
+}
+
+} /* end of 'namespace zk' */
+
Added: hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h?rev=685624&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h (added)
+++ hadoop/zookeeper/trunk/src/contrib/zkfuse/src/zkadapter.h Wed Aug 13 10:58:59 2008
@@ -0,0 +1,718 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ZKADAPTER_H__
+#define __ZKADAPTER_H__
+
+#include <string>
+#include <vector>
+#include <map>
+
+extern "C" {
+#include "zookeeper.h"
+}
+
+#include "log.h"
+#include "mutex.h"
+#include "thread.h"
+#include "blockingqueue.h"
+#include "event.h"
+
+using namespace std;
+using namespace zkfuse;
+
+namespace zk {
+
+/**
+ * \brief A cluster related exception.
+ */
+class ZooKeeperException :
+ public std::exception
+{
+ public:
+
+ /**
+ * \brief Constructor.
+ *
+ * @param msg the detailed message associated with this exception
+ */
+ ZooKeeperException(const string &msg) :
+ m_message(msg), m_zkErrorCode(0)
+ {}
+
+ /**
+ * \brief Constructor.
+ *
+ * @param msg the detailed message associated with this exception
+ * @param errorCode the ZK error code associated with this exception
+ */
+ ZooKeeperException(const string &msg, int errorCode) :
+ m_zkErrorCode(errorCode)
+ {
+ char tmp[100];
+ sprintf( tmp, " (ZK error code: %d)", errorCode );
+ m_message = msg + tmp;
+ }
+
+ /**
+ * \brief Destructor.
+ */
+ ~ZooKeeperException() throw() {}
+
+ /**
+ * \brief Returns detailed description of the exception.
+ */
+ const char *what() const throw() {
+ return m_message.c_str();
+ }
+
+ /**
+ * \brief Returns the ZK error code.
+ */
+ int getZKErrorCode() const {
+ return m_zkErrorCode;
+ }
+
+ private:
+
+ /**
+ * The detailed message associated with this exception.
+ */
+ string m_message;
+
+ /**
+ * The optional error code received from ZK.
+ */
+ int m_zkErrorCode;
+
+};
+
+/**
+ * \brief This class encapsulates configuration of a ZK client.
+ */
+class ZooKeeperConfig
+{
+ public:
+
+ /**
+ * \brief Constructor.
+ *
+ * @param hosts the comma separated list of host and port pairs of ZK nodes
+ * @param leaseTimeout the lease timeout (heartbeat)
+ * @param autoReconnect whether to allow for auto-reconnect
+ * @param connectTimeout the connect timeout, in milliseconds;
+ */
+ ZooKeeperConfig(const string &hosts,
+ int leaseTimeout,
+ bool autoReconnect = true,
+ long long int connectTimeout = 15000) :
+ m_hosts(hosts), m_leaseTimeout(leaseTimeout),
+ m_autoReconnect(autoReconnect), m_connectTimeout(connectTimeout) {}
+
+ /**
+ * \brief Returns the list of ZK hosts to connect to.
+ */
+ string getHosts() const { return m_hosts; }
+
+ /**
+ * \brief Returns the lease timeout.
+ */
+ int getLeaseTimeout() const { return m_leaseTimeout; }
+
+ /**
+ * \brief Returns whether {@link ZooKeeperAdapter} should attempt
+ * \brief to automatically reconnect in case of a connection failure.
+ */
+ bool getAutoReconnect() const { return m_autoReconnect; }
+
+ /**
+ * \brief Gets the connect timeout.
+ *
+ * @return the connect timeout
+ */
+ long long int getConnectTimeout() const { return m_connectTimeout; }
+
+ private:
+
+ /**
+ * The host addresses of ZK nodes.
+ */
+ const string m_hosts;
+
+ /**
+ * The ZK lease timeout.
+ */
+ const int m_leaseTimeout;
+
+ /**
+ * True if this adapater should attempt to autoreconnect in case
+ * the current session has been dropped.
+ */
+ const bool m_autoReconnect;
+
+ /**
+ * How long to wait, in milliseconds, before a connection
+ * is established to ZK.
+ */
+ const long long int m_connectTimeout;
+
+};
+
+/**
+ * \brief A data value object representing a watcher event received from the ZK.
+ */
+class ZKWatcherEvent
+{
+ public:
+
+ /**
+ * \brief The type representing the user's context.
+ */
+ typedef void *ContextType;
+
+ /**
+ * \brief Constructor.
+ *
+ * @param type the type of this event
+ * @param state the state of this event
+ * @param path the corresponding path, may be empty for some event types
+ * @param context the user specified context; possibly NULL
+ */
+ ZKWatcherEvent() :
+ m_type(-1), m_state(-1), m_path(""), mp_context(NULL) {}
+
+ /**
+ * \brief Constructor.
+ *
+ * @param type the type of this event
+ * @param state the state of this event
+ * @param path the corresponding path, may be empty for some event types
+ * @param context the user specified context; possibly NULL
+ */
+ ZKWatcherEvent(int type, int state, const string &path,
+ ContextType context = NULL) :
+ m_type(type), m_state(state), m_path(path), mp_context(context) {}
+
+ int getType() const { return m_type; }
+ int getState() const { return m_state; }
+ string const &getPath() const { return m_path; }
+ ContextType getContext() const { return mp_context; }
+
+ bool operator==(const ZKWatcherEvent &we) const {
+ return m_type == we.m_type && m_state == we.m_state
+ && m_path == we.m_path && mp_context == we.mp_context;
+ }
+
+ private:
+
+ /**
+ * The type of this event. It can be either CREATED_EVENT, DELETED_EVENT,
+ * CHANGED_EVENT, CHILD_EVENT, SESSION_EVENT or NOTWATCHING_EVENT.
+ * See zookeeper.h for more details.
+ */
+ const int m_type;
+
+ /**
+ * The state of ZK at the time of sending this event.
+ * It can be either CONNECTING_STATE, ASSOCIATING_STATE,
+ * CONNECTED_STATE, EXPIRED_SESSION_STATE or AUTH_FAILED_STATE.
+ * See {@file zookeeper.h} for more details.
+ */
+ const int m_state;
+
+ /**
+ * The corresponding path of the node in subject. It may be empty
+ * for some event types.
+ */
+ const string m_path;
+
+ /**
+ * The pointer to the user specified context, possibly NULL.
+ */
+ ContextType mp_context;
+
+};
+
+/**
+ * \brief The type definition of ZK event source.
+ */
+typedef EventSource<ZKWatcherEvent> ZKEventSource;
+
+/**
+ * \brief The type definition of ZK event listener.
+ */
+typedef EventListener<ZKWatcherEvent> ZKEventListener;
+
+/**
+ * \brief This is a wrapper around ZK C synchrounous API.
+ */
+class ZooKeeperAdapter
+ : public ZKEventSource
+{
+ public:
+ /**
+ * \brief The global function that handles all ZK asynchronous notifications.
+ */
+ friend void zkWatcher(zhandle_t *, int, int, const char *);
+
+ /**
+ * \brief The type representing the user's context.
+ */
+ typedef void *ContextType;
+
+ /**
+ * \brief The map type of ZK event listener to user specified context mapping.
+ */
+ typedef map<ZKEventListener *, ContextType> Listener2Context;
+
+ /**
+ * \brief The map type of ZK path's to listener's contexts.
+ */
+ typedef map<string, Listener2Context> Path2Listener2Context;
+
+ /**
+ * \brief All possible states of this client, in respect to
+ * \brief connection to the ZK server.
+ */
+ enum AdapterState {
+ //mp_zkHandle is NULL
+ AS_DISCONNECTED = 0,
+ //mp_zkHandle is valid but this client is reconnecting
+ AS_CONNECTING,
+ //mp_zkHandle is valid and this client is connected
+ AS_CONNECTED,
+ //mp_zkHandle is valid, however no more calls can be made to ZK API
+ AS_SESSION_EXPIRED
+ };
+
+ /**
+ * \brief Constructor.
+ * Attempts to create a ZK adapter, optionally connecting
+ * to the ZK. Note, that if the connection is to be established
+ * and the given listener is NULL, some events may be lost,
+ * as they may arrive asynchronously before this method finishes.
+ *
+ * @param config the ZK configuration
+ * @param listener the event listener to be used for listening
+ * on incoming ZK events;
+ * if <code>NULL</code> not used
+ * @param establishConnection whether to establish connection to the ZK
+ *
+ * @throw ZooKeeperException if cannot establish connection to the given ZK
+ */
+ ZooKeeperAdapter(ZooKeeperConfig config,
+ ZKEventListener *listener = NULL,
+ bool establishConnection = false)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Destructor.
+ */
+ ~ZooKeeperAdapter();
+
+ /**
+ * \brief Returns the current config.
+ */
+ const ZooKeeperConfig &getZooKeeperConfig() const {
+ return m_zkConfig;
+ }
+
+ /**
+ * \brief Restablishes connection to the ZK.
+ * If this adapter is already connected, the current connection
+ * will be dropped and a new connection will be established.
+ *
+ * @throw ZooKeeperException if cannot establish connection to the ZK
+ */
+ void reconnect() throw(ZooKeeperException);
+
+ /**
+ * \brief Disconnects from the ZK and unregisters {@link #mp_zkHandle}.
+ */
+ void disconnect();
+
+ /**
+ * \brief Creates a new node identified by the given path.
+ * This method will optionally attempt to create all missing ancestors.
+ *
+ * @param path the absolute path name of the node to be created
+ * @param value the initial value to be associated with the node
+ * @param flags the ZK flags of the node to be created
+ * @param createAncestors if true and there are some missing ancestor nodes,
+ * this method will attempt to create them
+ *
+ * @return true if the node has been successfully created; false otherwise
+ * @throw ZooKeeperException if the operation has failed
+ */
+ bool createNode(const string &path,
+ const string &value = "",
+ int flags = 0,
+ bool createAncestors = true)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Creates a new sequence node using the give path as the prefix.
+ * This method will optionally attempt to create all missing ancestors.
+ *
+ * @param path the absolute path name of the node to be created;
+ * @param value the initial value to be associated with the node
+ * @param flags the ZK flags of the sequence node to be created
+ * (in addition to SEQUENCE)
+ * @param createAncestors if true and there are some missing ancestor
+ * nodes, this method will attempt to create them
+ *
+ * @return the sequence number associate with newly created node,
+ * or -1 if it couldn't be created
+ * @throw ZooKeeperException if the operation has failed
+ */
+ int64_t createSequence(const string &path,
+ const string &value = "",
+ int flags = 0,
+ bool createAncestors = true)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Deletes a node identified by the given path.
+ *
+ * @param path the absolute path name of the node to be deleted
+ * @param recursive if true this method will attempt to remove
+ * all children of the given node if any exist
+ * @param version the expected version of the node. The function will
+ * fail if the actual version of the node does not match
+ * the expected version
+ *
+ * @return true if the node has been deleted; false otherwise
+ * @throw ZooKeeperException if the operation has failed
+ */
+ bool deleteNode(const string &path, bool recursive = false, int version = -1)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Checks whether the given node exists or not.
+ *
+ * @param path the absolute path name of the node to be checked
+ * @param listener the listener for ZK watcher events;
+ * passing non <code>NULL</code> effectively establishes
+ * a ZK watch on the given node
+ * @param context the user specified context that is to be passed
+ * in a corresponding {@link ZKWatcherEvent} at later time;
+ * not used if <code>listener</code> is <code>NULL</code>
+ * @param stat the optional node statistics to be filled in by ZK
+ *
+ * @return true if the given node exists; false otherwise
+ * @throw ZooKeeperException if the operation has failed
+ */
+ bool nodeExists(const string &path,
+ ZKEventListener *listener = NULL,
+ void *context = NULL,
+ Stat *stat = NULL)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Retrieves list of all children of the given node.
+ *
+ * @param path the absolute path name of the node for which to get children
+ * @param listener the listener for ZK watcher events;
+ * passing non <code>NULL</code> effectively establishes
+ * a ZK watch on the given node
+ * @param context the user specified context that is to be passed
+ * in a corresponding {@link ZKWatcherEvent} at later time;
+ * not used if <code>listener</code> is <code>NULL</code>
+ *
+ * @return the list of absolute paths of child nodes, possibly empty
+ * @throw ZooKeeperException if the operation has failed
+ */
+ void getNodeChildren(vector<string> &children,
+ const string &path,
+ ZKEventListener *listener = NULL,
+ void *context = NULL)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Gets the given node's data.
+ *
+ * @param path the absolute path name of the node to get data from
+ * @param listener the listener for ZK watcher events;
+ * passing non <code>NULL</code> effectively establishes
+ * a ZK watch on the given node
+ * @param context the user specified context that is to be passed
+ * in a corresponding {@link ZKWatcherEvent} at later time;
+ * not used if <code>listener</code> is <code>NULL</code>
+ * @param stat the optional node statistics to be filled in by ZK
+ *
+ * @return the node's data
+ * @throw ZooKeeperException if the operation has failed
+ */
+ string getNodeData(const string &path,
+ ZKEventListener *listener = NULL,
+ void *context = NULL,
+ Stat *stat = NULL)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Sets the given node's data.
+ *
+ * @param path the absolute path name of the node to get data from
+ * @param value the node's data to be set
+ * @param version the expected version of the node. The function will
+ * fail if the actual version of the node does not match
+ * the expected version
+ *
+ * @throw ZooKeeperException if the operation has failed
+ */
+ void setNodeData(const string &path, const string &value, int version = -1)
+ throw(ZooKeeperException);
+
+ /**
+ * \brief Validates the given path to a node in ZK.
+ *
+ * @param the path to be validated
+ *
+ * @throw ZooKeeperException if the given path is not valid
+ * (for instance it doesn't start with "/")
+ */
+ static void validatePath(const string &path) throw(ZooKeeperException);
+
+ /**
+ * Returns the current state of this adapter.
+ *
+ * @return the current state of this adapter
+ * @see AdapterState
+ */
+ AdapterState getState() const {
+ return m_state;
+ }
+
+ private:
+
+ /**
+ * This enum defines methods from this class than can trigger an event.
+ */
+ enum WatchableMethod {
+ NODE_EXISTS = 0,
+ GET_NODE_CHILDREN,
+ GET_NODE_DATA
+ };
+
+ /**
+ * \brief Creates a new node identified by the given path.
+ * This method is used internally to implement {@link createNode(...)}
+ * and {@link createSequence(...)}. On success, this method will set
+ * <code>createdPath</code>.
+ *
+ * @param path the absolute path name of the node to be created
+ * @param value the initial value to be associated with the node
+ * @param flags the ZK flags of the node to be created
+ * @param createAncestors if true and there are some missing ancestor nodes,
+ * this method will attempt to create them
+ * @param createdPath the actual path of the node that has been created;
+ * useful for sequences
+ *
+ * @return true if the node has been successfully created; false otherwise
+ * @throw ZooKeeperException if the operation has failed
+ */
+ bool createNode(const string &path,
+ const string &value,
+ int flags,
+ bool createAncestors,
+ string &createdPath)
+ throw(ZooKeeperException);
+
+ /**
+ * Handles an asynchronous event received from the ZK.
+ */
+ void handleEvent(int type, int state, const string &path);
+
+ /**
+ * Handles an asynchronous event received from the ZK.
+ * This method iterates over all listeners and passes the event
+ * to each of them.
+ */
+ void handleEvent(int type, int state, const string &path,
+ const Listener2Context &listeners);
+
+ /**
+ * \brief Enqueues the given event in {@link #m_events} queue.
+ */
+ void enqueueEvent(int type, int state, const string &path);
+
+ /**
+ * \brief Processes all ZK adapter events in a loop.
+ */
+ void processEvents();
+
+ /**
+ * \brief Processes all user events in a loop.
+ */
+ void processUserEvents();
+
+ /**
+ * \brief Registers the given context in the {@link #m_zkContexts}
+ * \brief contexts map.
+ *
+ * @param method the method where the given path is being used
+ * @param path the path of interest
+ * @param listener the event listener to call back later on
+ * @param context the user specified context to be passed back to user
+ */
+ void registerContext(WatchableMethod method, const string &path,
+ ZKEventListener *listener, ContextType context);
+
+ /**
+ * \brief Attempts to find a listener to context map in the contexts'
+ * \brief map, based on the specified criteria.
+ * If the context is found, it will be removed the udnerlying map.
+ *
+ * @param method the method type identify Listener2Context map
+ * @param path the path to be used to search in the Listener2Context map
+ *
+ * @return the context map associated with the given method and path,
+ * or empty map if not found
+ */
+ Listener2Context findAndRemoveListenerContext(WatchableMethod method,
+ const string &path);
+
+ /**
+ * Sets the new state in case it's different then the current one.
+ * This method assumes that {@link #m_stateLock} has been already locked.
+ *
+ * @param newState the new state to be set
+ */
+ void setState(AdapterState newState);
+
+ /**
+ * Waits until this client gets connected. The total wait time
+ * is given by {@link getRemainingConnectTimeout()}.
+ * If a timeout elapses, this method will throw an exception.
+ *
+ * @throw ZooKeeperException if unable to connect within the given timeout
+ */
+ void waitUntilConnected()
+ throw(ZooKeeperException);
+
+ /**
+ * Verifies whether the connection is established,
+ * optionally auto reconnecting.
+ *
+ * @throw ZooKeeperConnection if this client is disconnected
+ * and auto-reconnect failed or was not allowed
+ */
+ void verifyConnection() throw(ZooKeeperException);
+
+ /**
+ * Returns the remaining connect timeout. The timeout resets
+ * to {@link #m_connectTimeout} on a successfull connection to the ZK.
+ *
+ * @return the remaining connect timeout, in milliseconds
+ */
+ long long int getRemainingConnectTimeout() {
+ return m_remainingConnectTimeout;
+ }
+
+ /**
+ * Resets the remaining connect timeout to {@link #m_connectTimeout}.
+ */
+ void resetRemainingConnectTimeout() {
+ m_remainingConnectTimeout = m_zkConfig.getConnectTimeout();
+ }
+
+ /**
+ * Updates the remaining connect timeout to reflect the given wait time.
+ *
+ * @param time the time for how long waited so far on connect to succeed
+ */
+ void waitedForConnect(long long time) {
+ m_remainingConnectTimeout -= time;
+ }
+
+ private:
+
+ /**
+ * The mutex use to protect {@link #m_zkContexts}.
+ */
+ zkfuse::Mutex m_zkContextsMutex;
+
+ /**
+ * The map of registered ZK paths that are being watched.
+ * Each entry maps a function type to another map of registered contexts.
+ *
+ * @see WatchableMethod
+ */
+ map<int, Path2Listener2Context> m_zkContexts;
+
+ /**
+ * The current ZK configuration.
+ */
+ const ZooKeeperConfig m_zkConfig;
+
+ /**
+ * The current ZK session.
+ */
+ zhandle_t *mp_zkHandle;
+
+ /**
+ * The blocking queue of all events waiting to be processed by ZK adapter.
+ */
+ BlockingQueue<ZKWatcherEvent> m_events;
+
+ /**
+ * The blocking queue of all events waiting to be processed by users
+ * of ZK adapter.
+ */
+ BlockingQueue<ZKWatcherEvent> m_userEvents;
+
+ /**
+ * The thread that dispatches all events from {@link #m_events} queue.
+ */
+ CXXThread<ZooKeeperAdapter> m_eventDispatcher;
+
+ /**
+ * The thread that dispatches all events from {@link #m_userEvents} queue.
+ */
+ CXXThread<ZooKeeperAdapter> m_userEventDispatcher;
+
+ /**
+ * Whether {@link #m_eventDispatcher} is terminating.
+ */
+ volatile bool m_terminating;
+
+ /**
+ * Whether this adapter is connected to the ZK.
+ */
+ volatile bool m_connected;
+
+ /**
+ * The state of this adapter.
+ */
+ AdapterState m_state;
+
+ /**
+ * The lock used to synchronize access to {@link #m_state}.
+ */
+ Lock m_stateLock;
+
+ /**
+ * How much time left for the connect to succeed, in milliseconds.
+ */
+ long long int m_remainingConnectTimeout;
+
+};
+
+} /* end of 'namespace zk' */
+
+#endif /* __ZKADAPTER_H__ */