You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/01/25 21:49:23 UTC

[32/50] [abbrv] helix git commit: Move zkclient from I0ITec to Helix codebase.

Move zkclient from I0ITec to Helix codebase.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7fc03f4c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7fc03f4c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7fc03f4c

Branch: refs/heads/master
Commit: 7fc03f4c3f66625ed347a130be4205ce9d9419b4
Parents: 9b9da19
Author: Lei Xia <lx...@linkedin.com>
Authored: Wed Dec 13 15:57:28 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Wed Jan 24 18:32:32 2018 -0800

----------------------------------------------------------------------
 .../org/apache/helix/manager/zk/ZkClient.java   |    3 +-
 .../helix/manager/zk/zookeeper/ZkClient.java    | 1226 ++++++++++++++++++
 .../manager/zk/zookeeper/ZkEventThread.java     |   85 ++
 3 files changed, 1313 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index c9f7ccf..182c77e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -48,8 +48,9 @@ import java.util.concurrent.Callable;
  * ZkClient jar Ideally we should commit the changes we do here to ZKClient.
  */
 
-public class ZkClient extends org.I0Itec.zkclient.ZkClient {
+public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+
   public static final int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
   public static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
new file mode 100644
index 0000000..d26a274
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -0,0 +1,1226 @@
+/**
+ * Copyright 2010 the original author or authors.
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.helix.manager.zk.zookeeper;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkLock;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.I0Itec.zkclient.ExceptionUtil;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.helix.manager.zk.zookeeper.ZkEventThread.ZkEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper
+ */
+public class ZkClient implements Watcher {
+  private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+
+  protected final IZkConnection _connection;
+  protected final long operationRetryTimeoutInMillis;
+  private final Map<String, Set<IZkChildListener>> _childListener =
+      new ConcurrentHashMap<String, Set<IZkChildListener>>();
+  private final ConcurrentHashMap<String, Set<IZkDataListener>> _dataListener =
+      new ConcurrentHashMap<String, Set<IZkDataListener>>();
+  private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<IZkStateListener>();
+  private KeeperState _currentState;
+  private final ZkLock _zkEventLock = new ZkLock();
+  private boolean _shutdownTriggered;
+  private ZkEventThread _eventThread;
+  // TODO PVo remove this later
+  private Thread _zookeeperEventThread;
+  private ZkSerializer _zkSerializer;
+  private volatile boolean _closed;
+
+  public ZkClient(String serverstring) {
+    this(serverstring, Integer.MAX_VALUE);
+  }
+
+  public ZkClient(String zkServers, int connectionTimeout) {
+    this(new ZkConnection(zkServers), connectionTimeout);
+  }
+
+  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout);
+  }
+
+  public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
+      ZkSerializer zkSerializer) {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer);
+  }
+
+  /**
+   *
+   * @param zkServers
+   *            The Zookeeper servers
+   * @param sessionTimeout
+   *            The session timeout in milli seconds
+   * @param connectionTimeout
+   *            The connection timeout in milli seconds
+   * @param zkSerializer
+   *            The Zookeeper data serializer
+   * @param operationRetryTimeout
+   *            Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like
+   *            connection loss with the Zookeeper servers. During such failures, this
+   *            <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each
+   *            operation is retried. A value lesser than 0 is considered as
+   *            "retry forever until a connection has been reestablished".
+   */
+  public ZkClient(final String zkServers, final int sessionTimeout, final int connectionTimeout,
+      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
+    this(new ZkConnection(zkServers, sessionTimeout), connectionTimeout, zkSerializer,
+        operationRetryTimeout);
+  }
+
+  public ZkClient(IZkConnection connection) {
+    this(connection, Integer.MAX_VALUE);
+  }
+
+  public ZkClient(IZkConnection connection, int connectionTimeout) {
+    this(connection, connectionTimeout, new SerializableSerializer());
+  }
+
+  public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer) {
+    this(zkConnection, connectionTimeout, zkSerializer, -1);
+  }
+
+  /**
+   *
+   * @param zkConnection
+   *            The Zookeeper servers
+   * @param connectionTimeout
+   *            The connection timeout in milli seconds
+   * @param zkSerializer
+   *            The Zookeeper data serializer
+   * @param operationRetryTimeout
+   *            Most operations done through this {@link org.I0Itec.zkclient.ZkClient} are retried in cases like
+   *            connection loss with the Zookeeper servers. During such failures, this
+   *            <code>operationRetryTimeout</code> decides the maximum amount of time, in milli seconds, each
+   *            operation is retried. A value lesser than 0 is considered as
+   *            "retry forever until a connection has been reestablished".
+   */
+  public ZkClient(final IZkConnection zkConnection, final int connectionTimeout,
+      final ZkSerializer zkSerializer, final long operationRetryTimeout) {
+    if (zkConnection == null) {
+      throw new NullPointerException("Zookeeper connection is null!");
+    }
+    _connection = zkConnection;
+    _zkSerializer = zkSerializer;
+    this.operationRetryTimeoutInMillis = operationRetryTimeout;
+    connect(connectionTimeout, this);
+  }
+
+  public void setZkSerializer(ZkSerializer zkSerializer) {
+    _zkSerializer = zkSerializer;
+  }
+
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
+    synchronized (_childListener) {
+      Set<IZkChildListener> listeners = _childListener.get(path);
+      if (listeners == null) {
+        listeners = new CopyOnWriteArraySet<IZkChildListener>();
+        _childListener.put(path, listeners);
+      }
+      listeners.add(listener);
+    }
+    return watchForChilds(path);
+  }
+
+  public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
+    synchronized (_childListener) {
+      final Set<IZkChildListener> listeners = _childListener.get(path);
+      if (listeners != null) {
+        listeners.remove(childListener);
+      }
+    }
+  }
+
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    Set<IZkDataListener> listeners;
+    synchronized (_dataListener) {
+      listeners = _dataListener.get(path);
+      if (listeners == null) {
+        listeners = new CopyOnWriteArraySet<IZkDataListener>();
+        _dataListener.put(path, listeners);
+      }
+      listeners.add(listener);
+    }
+    watchForData(path);
+    LOG.debug("Subscribed data changes for " + path);
+  }
+
+  public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
+    synchronized (_dataListener) {
+      final Set<IZkDataListener> listeners = _dataListener.get(path);
+      if (listeners != null) {
+        listeners.remove(dataListener);
+      }
+      if (listeners == null || listeners.isEmpty()) {
+        _dataListener.remove(path);
+      }
+    }
+  }
+
+  public void subscribeStateChanges(final IZkStateListener listener) {
+    synchronized (_stateListener) {
+      _stateListener.add(listener);
+    }
+  }
+
+  public void unsubscribeStateChanges(IZkStateListener stateListener) {
+    synchronized (_stateListener) {
+      _stateListener.remove(stateListener);
+    }
+  }
+
+  public void unsubscribeAll() {
+    synchronized (_childListener) {
+      _childListener.clear();
+    }
+    synchronized (_dataListener) {
+      _dataListener.clear();
+    }
+    synchronized (_stateListener) {
+      _stateListener.clear();
+    }
+  }
+
+  // </listeners>
+
+  /**
+   * Create a persistent node.
+   *
+   * @param path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createPersistent(String path)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    createPersistent(path, false);
+  }
+
+  /**
+   * Create a persistent node and set its ACLs.
+   *
+   * @param path
+   * @param createParents
+   *            if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the
+   *            path already exists
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createPersistent(String path, boolean createParents)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  /**
+   * Create a persistent node and set its ACLs.
+   *
+   * @param path
+   * @param acl
+   *            List of ACL permissions to assign to the node
+   * @param createParents
+   *            if true all parent dirs are created as well and no {@link ZkNodeExistsException} is thrown in case the
+   *            path already exists
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createPersistent(String path, boolean createParents, List<ACL> acl)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    try {
+      create(path, null, acl, CreateMode.PERSISTENT);
+    } catch (ZkNodeExistsException e) {
+      if (!createParents) {
+        throw e;
+      }
+    } catch (ZkNoNodeException e) {
+      if (!createParents) {
+        throw e;
+      }
+      String parentDir = path.substring(0, path.lastIndexOf('/'));
+      createPersistent(parentDir, createParents, acl);
+      createPersistent(path, createParents, acl);
+    }
+  }
+
+  /**
+   * Create a persistent node.
+   *
+   * @param path
+   * @param data
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createPersistent(String path, Object data)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, data, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Create a persistent node.
+   *
+   * @param path
+   * @param data
+   * @param acl
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createPersistent(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Create a persistent, sequental node.
+   *
+   * @param path
+   * @param data
+   * @return create node's path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String createPersistentSequential(String path, Object data)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  /**
+   * Create a persistent, sequential node and set its ACL.
+   *
+   * @param path
+   * @param acl
+   * @param data
+   * @return create node's path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String createPersistentSequential(String path, Object data, List<ACL> acl)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  /**
+   * Create an ephemeral node.
+   *
+   * @param path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createEphemeral(final String path)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, null, CreateMode.EPHEMERAL);
+  }
+
+  /**
+   * Create an ephemeral node and set its ACL.
+   *
+   * @param path
+   * @param acl
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createEphemeral(final String path, final List<ACL> acl)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, null, acl, CreateMode.EPHEMERAL);
+  }
+
+  /**
+   * Create a node.
+   *
+   * @param path
+   * @param data
+   * @param mode
+   * @return create node's path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String create(final String path, Object data, final CreateMode mode)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+  }
+
+  /**
+   * Create a node with ACL.
+   *
+   * @param path
+   * @param data
+   * @param acl
+   * @param mode
+   * @return create node's path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) {
+    if (path == null) {
+      throw new NullPointerException("Missing value for path");
+    }
+    if (acl == null || acl.size() == 0) {
+      throw new NullPointerException("Missing value for ACL");
+    }
+    final byte[] bytes = data == null ? null : serialize(data);
+
+    return retryUntilConnected(new Callable<String>() {
+      @Override public String call() throws Exception {
+        return _connection.create(path, bytes, acl, mode);
+      }
+    });
+
+  }
+
+  /**
+   * Create an ephemeral node.
+   *
+   * @param path
+   * @param data
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createEphemeral(final String path, final Object data)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, data, CreateMode.EPHEMERAL);
+  }
+
+  /**
+   * Create an ephemeral node.
+   *
+   * @param path
+   * @param data
+   * @param acl
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public void createEphemeral(final String path, final Object data, final List<ACL> acl)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    create(path, data, acl, CreateMode.EPHEMERAL);
+  }
+
+  /**
+   * Create an ephemeral, sequential node.
+   *
+   * @param path
+   * @param data
+   * @return created path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String createEphemeralSequential(final String path, final Object data)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  /**
+   * Create an ephemeral, sequential node with ACL.
+   *
+   * @param path
+   * @param data
+   * @param acl
+   * @return created path
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs
+   */
+  public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  @Override public void process(WatchedEvent event) {
+    LOG.debug("Received event: " + event);
+    _zookeeperEventThread = Thread.currentThread();
+
+    boolean stateChanged = event.getPath() == null;
+    boolean znodeChanged = event.getPath() != null;
+    boolean dataChanged =
+        event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted
+            || event.getType() == EventType.NodeCreated
+            || event.getType() == EventType.NodeChildrenChanged;
+
+    getEventLock().lock();
+    try {
+
+      // We might have to install child change event listener if a new node was created
+      if (getShutdownTrigger()) {
+        LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath()
+            + "}' since shutdown triggered");
+        return;
+      }
+      if (stateChanged) {
+        processStateChanged(event);
+      }
+      if (dataChanged) {
+        processDataOrChildChange(event);
+      }
+    } finally {
+      if (stateChanged) {
+        getEventLock().getStateChangedCondition().signalAll();
+
+        // If the session expired we have to signal all conditions, because watches might have been removed and
+        // there is no guarantee that those
+        // conditions will be signaled at all after an Expired event
+        // TODO PVo write a test for this
+        if (event.getState() == KeeperState.Expired) {
+          getEventLock().getZNodeEventCondition().signalAll();
+          getEventLock().getDataChangedCondition().signalAll();
+          // We also have to notify all listeners that something might have changed
+          fireAllEvents();
+        }
+      }
+      if (znodeChanged) {
+        getEventLock().getZNodeEventCondition().signalAll();
+      }
+      if (dataChanged) {
+        getEventLock().getDataChangedCondition().signalAll();
+      }
+      getEventLock().unlock();
+      LOG.debug("Leaving process event");
+    }
+  }
+
+  private void fireAllEvents() {
+    for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) {
+      fireChildChangedEvents(entry.getKey(), entry.getValue());
+    }
+    for (Entry<String, Set<IZkDataListener>> entry : _dataListener.entrySet()) {
+      fireDataChangedEvents(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public List<String> getChildren(String path) {
+    return getChildren(path, hasListeners(path));
+  }
+
+  protected List<String> getChildren(final String path, final boolean watch) {
+    return retryUntilConnected(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        return _connection.getChildren(path, watch);
+      }
+    });
+  }
+
+  /**
+   * Counts number of children for the given path.
+   *
+   * @param path
+   * @return number of children or 0 if path does not exist.
+   */
+  public int countChildren(String path) {
+    try {
+      return getChildren(path).size();
+    } catch (ZkNoNodeException e) {
+      return 0;
+    }
+  }
+
+  protected boolean exists(final String path, final boolean watch) {
+    return retryUntilConnected(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        return _connection.exists(path, watch);
+      }
+    });
+  }
+
+  public boolean exists(final String path) {
+    return exists(path, hasListeners(path));
+  }
+
+  private void processStateChanged(WatchedEvent event) {
+    LOG.info("zookeeper state changed (" + event.getState() + ")");
+    setCurrentState(event.getState());
+    if (getShutdownTrigger()) {
+      return;
+    }
+    fireStateChangedEvent(event.getState());
+    if (event.getState() == KeeperState.Expired) {
+      try {
+        reconnect();
+        fireNewSessionEvents();
+      } catch (final Exception e) {
+        LOG.info(
+            "Unable to re-establish connection. Notifying consumer of the following exception: ",
+            e);
+        fireSessionEstablishmentError(e);
+      }
+    }
+  }
+
+  private void fireNewSessionEvents() {
+    for (final IZkStateListener stateListener : _stateListener) {
+      _eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
+
+        @Override public void run() throws Exception {
+          stateListener.handleNewSession();
+        }
+      });
+    }
+  }
+
+  private void fireStateChangedEvent(final KeeperState state) {
+    for (final IZkStateListener stateListener : _stateListener) {
+      _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
+
+        @Override public void run() throws Exception {
+          stateListener.handleStateChanged(state);
+        }
+      });
+    }
+  }
+
+  private void fireSessionEstablishmentError(final Throwable error) {
+    for (final IZkStateListener stateListener : _stateListener) {
+      _eventThread
+          .send(new ZkEvent("Session establishment error(" + error + ") sent to " + stateListener) {
+
+            @Override public void run() throws Exception {
+              stateListener.handleSessionEstablishmentError(error);
+            }
+          });
+    }
+  }
+
+  private boolean hasListeners(String path) {
+    Set<IZkDataListener> dataListeners = _dataListener.get(path);
+    if (dataListeners != null && dataListeners.size() > 0) {
+      return true;
+    }
+    Set<IZkChildListener> childListeners = _childListener.get(path);
+    if (childListeners != null && childListeners.size() > 0) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean deleteRecursive(String path) {
+    List<String> children;
+    try {
+      children = getChildren(path, false);
+    } catch (ZkNoNodeException e) {
+      return true;
+    }
+
+    for (String subPath : children) {
+      if (!deleteRecursive(path + "/" + subPath)) {
+        return false;
+      }
+    }
+
+    return delete(path);
+  }
+
+  private void processDataOrChildChange(WatchedEvent event) {
+    final String path = event.getPath();
+
+    if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated
+        || event.getType() == EventType.NodeDeleted) {
+      Set<IZkChildListener> childListeners = _childListener.get(path);
+      if (childListeners != null && !childListeners.isEmpty()) {
+        fireChildChangedEvents(path, childListeners);
+      }
+    }
+
+    if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted
+        || event.getType() == EventType.NodeCreated) {
+      Set<IZkDataListener> listeners = _dataListener.get(path);
+      if (listeners != null && !listeners.isEmpty()) {
+        fireDataChangedEvents(event.getPath(), listeners);
+      }
+    }
+  }
+
+  private void fireDataChangedEvents(final String path, Set<IZkDataListener> listeners) {
+    for (final IZkDataListener listener : listeners) {
+      _eventThread.send(new ZkEvent("Data of " + path + " changed sent to " + listener) {
+
+        @Override public void run() throws Exception {
+          // reinstall watch
+          exists(path, true);
+          try {
+            Object data = readData(path, null, true);
+            listener.handleDataChange(path, data);
+          } catch (ZkNoNodeException e) {
+            listener.handleDataDeleted(path);
+          }
+        }
+      });
+    }
+  }
+
+  private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners) {
+    try {
+      // reinstall the watch
+      for (final IZkChildListener listener : childListeners) {
+        _eventThread.send(new ZkEvent("Children of " + path + " changed sent to " + listener) {
+
+          @Override public void run() throws Exception {
+            try {
+              // if the node doesn't exist we should listen for the root node to reappear
+              exists(path);
+              List<String> children = getChildren(path);
+              listener.handleChildChange(path, children);
+            } catch (ZkNoNodeException e) {
+              listener.handleChildChange(path, null);
+            }
+          }
+        });
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to fire child changed event. Unable to getChildren.  ", e);
+    }
+  }
+
+  public boolean waitUntilExists(String path, TimeUnit timeUnit, long time)
+      throws ZkInterruptedException {
+    Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
+    LOG.debug("Waiting until znode '" + path + "' becomes available.");
+    if (exists(path)) {
+      return true;
+    }
+    acquireEventLock();
+    try {
+      while (!exists(path, true)) {
+        boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout);
+        if (!gotSignal) {
+          return false;
+        }
+      }
+      return true;
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
+  protected Set<IZkDataListener> getDataListener(String path) {
+    return _dataListener.get(path);
+  }
+
+  public void waitUntilConnected() throws ZkInterruptedException {
+    waitUntilConnected(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
+  }
+
+  public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException {
+    return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit);
+  }
+
+  public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit)
+      throws ZkInterruptedException {
+    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
+      throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
+    }
+    Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
+
+    LOG.debug("Waiting for keeper state " + keeperState);
+    acquireEventLock();
+    try {
+      boolean stillWaiting = true;
+      while (_currentState != keeperState) {
+        if (!stillWaiting) {
+          return false;
+        }
+        stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout);
+      }
+      LOG.debug("State is " + _currentState);
+      return true;
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
+  private void acquireEventLock() {
+    try {
+      getEventLock().lockInterruptibly();
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    }
+  }
+
+  /**
+   *
+   * @param <T>
+   * @param callable
+   * @return result of Callable
+   * @throws ZkInterruptedException
+   *             if operation was interrupted, or a required reconnection got interrupted
+   * @throws IllegalArgumentException
+   *             if called from anything except the ZooKeeper event thread
+   * @throws ZkException
+   *             if any ZooKeeper exception occurred
+   * @throws RuntimeException
+   *             if any other exception occurs from invoking the Callable
+   */
+  public <T> T retryUntilConnected(Callable<T> callable)
+      throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
+    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
+      throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
+    }
+    final long operationStartTime = System.currentTimeMillis();
+    while (true) {
+      if (_closed) {
+        throw new IllegalStateException("ZkClient already closed!");
+      }
+      try {
+        return callable.call();
+      } catch (ConnectionLossException e) {
+        // we give the event thread some time to update the status to 'Disconnected'
+        Thread.yield();
+        waitForRetry();
+      } catch (SessionExpiredException e) {
+        // we give the event thread some time to update the status to 'Expired'
+        Thread.yield();
+        waitForRetry();
+      } catch (KeeperException e) {
+        throw ZkException.create(e);
+      } catch (InterruptedException e) {
+        throw new ZkInterruptedException(e);
+      } catch (Exception e) {
+        throw ExceptionUtil.convertToRuntimeException(e);
+      }
+      // before attempting a retry, check whether retry timeout has elapsed
+      if (this.operationRetryTimeoutInMillis > -1
+          && (System.currentTimeMillis() - operationStartTime)
+          >= this.operationRetryTimeoutInMillis) {
+        throw new ZkTimeoutException("Operation cannot be retried because of retry timeout ("
+            + this.operationRetryTimeoutInMillis + " milli seconds)");
+      }
+    }
+  }
+
+  private void waitForRetry() {
+    if (this.operationRetryTimeoutInMillis < 0) {
+      this.waitUntilConnected();
+      return;
+    }
+    this.waitUntilConnected(this.operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS);
+  }
+
+  public void setCurrentState(KeeperState currentState) {
+    getEventLock().lock();
+    try {
+      _currentState = currentState;
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
+  /**
+   * Returns a mutex all zookeeper events are synchronized aginst. So in case you need to do something without getting
+   * any zookeeper event interruption synchronize against this mutex. Also all threads waiting on this mutex object
+   * will be notified on an event.
+   *
+   * @return the mutex.
+   */
+  public ZkLock getEventLock() {
+    return _zkEventLock;
+  }
+
+  public boolean delete(final String path) {
+    try {
+      retryUntilConnected(new Callable<Object>() {
+
+        @Override public Object call() throws Exception {
+          _connection.delete(path);
+          return null;
+        }
+      });
+
+      return true;
+    } catch (ZkNoNodeException e) {
+      return false;
+    }
+  }
+
+  private byte[] serialize(Object data) {
+    return _zkSerializer.serialize(data);
+  }
+
+  @SuppressWarnings("unchecked") private <T extends Object> T derializable(byte[] data) {
+    if (data == null) {
+      return null;
+    }
+    return (T) _zkSerializer.deserialize(data);
+  }
+
+  @SuppressWarnings("unchecked") public <T extends Object> T readData(String path) {
+    return (T) readData(path, false);
+  }
+
+  @SuppressWarnings("unchecked") public <T extends Object> T readData(String path,
+      boolean returnNullIfPathNotExists) {
+    T data = null;
+    try {
+      data = (T) readData(path, null);
+    } catch (ZkNoNodeException e) {
+      if (!returnNullIfPathNotExists) {
+        throw e;
+      }
+    }
+    return data;
+  }
+
+  @SuppressWarnings("unchecked") public <T extends Object> T readData(String path, Stat stat) {
+    return (T) readData(path, stat, hasListeners(path));
+  }
+
+  @SuppressWarnings("unchecked") protected <T extends Object> T readData(final String path,
+      final Stat stat, final boolean watch) {
+    byte[] data = retryUntilConnected(new Callable<byte[]>() {
+
+      @Override public byte[] call() throws Exception {
+        return _connection.readData(path, stat, watch);
+      }
+    });
+    return (T) derializable(data);
+  }
+
+  public void writeData(String path, Object object) {
+    writeData(path, object, -1);
+  }
+
+  /**
+   * Updates data of an existing znode. The current content of the znode is passed to the {@link DataUpdater} that is
+   * passed into this method, which returns the new content. The new content is only written back to ZooKeeper if
+   * nobody has modified the given znode in between. If a concurrent change has been detected the new data of the
+   * znode is passed to the updater once again until the new contents can be successfully written back to ZooKeeper.
+   *
+   * @param <T>
+   * @param path
+   *            The path of the znode.
+   * @param updater
+   *            Updater that creates the new contents.
+   */
+  @SuppressWarnings("unchecked") public <T extends Object> void updateDataSerialized(String path,
+      DataUpdater<T> updater) {
+    Stat stat = new Stat();
+    boolean retry;
+    do {
+      retry = false;
+      try {
+        T oldData = (T) readData(path, stat);
+        T newData = updater.update(oldData);
+        writeData(path, newData, stat.getVersion());
+      } catch (ZkBadVersionException e) {
+        retry = true;
+      }
+    } while (retry);
+  }
+
+  public void writeData(final String path, Object datat, final int expectedVersion) {
+    writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) {
+    final byte[] data = serialize(datat);
+    return (Stat) retryUntilConnected(new Callable<Object>() {
+
+      @Override public Object call() throws Exception {
+        Stat stat = _connection.writeDataReturnStat(path, data, expectedVersion);
+        return stat;
+      }
+    });
+  }
+
+  public void watchForData(final String path) {
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        _connection.exists(path, true);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Installs a child watch for the given path.
+   *
+   * @param path
+   * @return the current children of the path or null if the zk node with the given path doesn't exist.
+   */
+  public List<String> watchForChilds(final String path) {
+    if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) {
+      throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
+    }
+    return retryUntilConnected(new Callable<List<String>>() {
+      @Override public List<String> call() throws Exception {
+        exists(path, true);
+        try {
+          return getChildren(path, true);
+        } catch (ZkNoNodeException e) {
+          // ignore, the "exists" watch will listen for the parent node to appear
+        }
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Add authentication information to the connection. This will be used to identify the user and check access to
+   * nodes protected by ACLs
+   *
+   * @param scheme
+   * @param auth
+   */
+  public void addAuthInfo(final String scheme, final byte[] auth) {
+    retryUntilConnected(new Callable<Object>() {
+      @Override public Object call() throws Exception {
+        _connection.addAuthInfo(scheme, auth);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Connect to ZooKeeper.
+   *
+   * @param maxMsToWaitUntilConnected
+   * @param watcher
+   * @throws ZkInterruptedException
+   *             if the connection timed out due to thread interruption
+   * @throws ZkTimeoutException
+   *             if the connection timed out
+   * @throws IllegalStateException
+   *             if the connection timed out due to thread interruption
+   */
+  public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
+      throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
+    boolean started = false;
+    acquireEventLock();
+    try {
+      setShutdownTrigger(false);
+      _eventThread = new ZkEventThread(_connection.getServers());
+      _eventThread.start();
+      _connection.connect(watcher);
+
+      LOG.debug("Awaiting connection to Zookeeper server");
+      if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) {
+        throw new ZkTimeoutException(
+            "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected);
+      }
+      started = true;
+    } finally {
+      getEventLock().unlock();
+
+      // we should close the zookeeper instance, otherwise it would keep
+      // on trying to connect
+      if (!started) {
+        close();
+      }
+    }
+  }
+
+  public long getCreationTime(String path) {
+    acquireEventLock();
+    try {
+      return _connection.getCreateTime(path);
+    } catch (KeeperException e) {
+      throw ZkException.create(e);
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
+  /**
+   * Close the client.
+   *
+   * @throws ZkInterruptedException
+   */
+  public void close() throws ZkInterruptedException {
+    if (_closed) {
+      return;
+    }
+    LOG.debug("Closing ZkClient...");
+    getEventLock().lock();
+    try {
+      setShutdownTrigger(true);
+      _eventThread.interrupt();
+      _eventThread.join(2000);
+      _connection.close();
+      _closed = true;
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+    LOG.debug("Closing ZkClient...done");
+  }
+
+  private void reconnect() {
+    getEventLock().lock();
+    try {
+      _connection.close();
+      _connection.connect(this);
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
+  public void setShutdownTrigger(boolean triggerState) {
+    _shutdownTriggered = triggerState;
+  }
+
+  public boolean getShutdownTrigger() {
+    return _shutdownTriggered;
+  }
+
+  public int numberOfListeners() {
+    int listeners = 0;
+    for (Set<IZkChildListener> childListeners : _childListener.values()) {
+      listeners += childListeners.size();
+    }
+    for (Set<IZkDataListener> dataListeners : _dataListener.values()) {
+      listeners += dataListeners.size();
+    }
+    listeners += _stateListener.size();
+
+    return listeners;
+  }
+
+  public List<OpResult> multi(final Iterable<Op> ops) throws ZkException {
+    if (ops == null) {
+      throw new NullPointerException("ops must not be null.");
+    }
+
+    return retryUntilConnected(new Callable<List<OpResult>>() {
+
+      @Override public List<OpResult> call() throws Exception {
+        return _connection.multi(ops);
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7fc03f4c/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
new file mode 100644
index 0000000..dcf7019
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2010 the original author or authors.
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.helix.manager.zk.zookeeper;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * All listeners registered at the {@link ZkClient} will be notified from this event thread. This is to prevent
+ * dead-lock situations. The {@link ZkClient} pulls some information out of the {@link ZooKeeper} events to signal
+ * {@link ZkLock} conditions. Re-using the {@link ZooKeeper} event thread to also notify {@link ZkClient} listeners,
+ * would stop the ZkClient from receiving events from {@link ZooKeeper} as soon as one of the listeners blocks (because
+ * it is waiting for something). {@link ZkClient} would then for instance not be able to maintain it's connection state
+ * anymore.
+ */
+class ZkEventThread extends Thread {
+  private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
+
+  private BlockingQueue<ZkEvent> _events = new LinkedBlockingQueue<ZkEvent>();
+
+  private static AtomicInteger _eventId = new AtomicInteger(0);
+
+  public static abstract class ZkEvent {
+
+    private String _description;
+
+    public ZkEvent(String description) {
+      _description = description;
+    }
+
+    public abstract void run() throws Exception;
+
+    @Override public String toString() {
+      return "ZkEvent[" + _description + "]";
+    }
+  }
+
+  ZkEventThread(String name) {
+    setDaemon(true);
+    setName("ZkClient-EventThread-" + getId() + "-" + name);
+  }
+
+  @Override public void run() {
+    LOG.info("Starting ZkClient event thread.");
+    try {
+      while (!isInterrupted()) {
+        ZkEvent zkEvent = _events.take();
+        int eventId = _eventId.incrementAndGet();
+        LOG.debug("Delivering event #" + eventId + " " + zkEvent);
+        try {
+          zkEvent.run();
+        } catch (InterruptedException e) {
+          interrupt();
+        } catch (ZkInterruptedException e) {
+          interrupt();
+        } catch (Throwable e) {
+          LOG.error("Error handling event " + zkEvent, e);
+        }
+        LOG.debug("Delivering event #" + eventId + " done");
+      }
+    } catch (InterruptedException e) {
+      LOG.info("Terminate ZkClient event thread.");
+    }
+  }
+
+  public void send(ZkEvent event) {
+    if (!isInterrupted()) {
+      LOG.debug("New event: " + event);
+      _events.add(event);
+    }
+  }
+}