You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/08 03:05:59 UTC

git commit: [HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor

Updated Branches:
  refs/heads/helix-logical-model 48a5f48f3 -> cb3051241


[HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/cb305124
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/cb305124
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/cb305124

Branch: refs/heads/helix-logical-model
Commit: cb3051241d99ccb68674e5b37feba133fe2d5286
Parents: 48a5f48
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Oct 7 18:04:51 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Oct 7 18:04:51 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/api/Scope.java   |   8 +
 .../api/accessor/AtomicClusterAccessor.java     | 193 ++++++++++++
 .../helix/api/accessor/ClusterAccessor.java     |  54 ++--
 .../helix/api/accessor/ParticipantAccessor.java |   2 -
 .../java/org/apache/helix/lock/HelixLock.java   |  37 +++
 .../org/apache/helix/lock/HelixLockable.java    |  36 +++
 .../org/apache/helix/lock/zk/LockListener.java  |  39 +++
 .../apache/helix/lock/zk/ProtocolSupport.java   | 191 ++++++++++++
 .../org/apache/helix/lock/zk/WriteLock.java     | 294 +++++++++++++++++++
 .../org/apache/helix/lock/zk/ZKHelixLock.java   | 167 +++++++++++
 .../org/apache/helix/lock/zk/ZNodeName.java     | 113 +++++++
 .../helix/lock/zk/ZooKeeperOperation.java       |  38 +++
 12 files changed, 1150 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/Scope.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Scope.java b/helix-core/src/main/java/org/apache/helix/api/Scope.java
index 7dc217c..26e09a9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Scope.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Scope.java
@@ -60,6 +60,14 @@ public class Scope<T extends Id> {
     return getType() + "{" + getScopedId() + "}";
   }
 
+  @Override
+  public boolean equals(Object that) {
+    if (that instanceof Scope) {
+      return this.toString().equals(that.toString());
+    }
+    return false;
+  }
+
   /**
    * Get the Helix entity type that this scope covers
    * @return scope type

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
new file mode 100644
index 0000000..ff8ab6e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -0,0 +1,193 @@
+package org.apache.helix.api.accessor;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicClusterAccessor extends ClusterAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
+
+  private final HelixLockable _lockProvider;
+  @SuppressWarnings("unused")
+  private final HelixDataAccessor _accessor;
+  @SuppressWarnings("unused")
+  private final PropertyKey.Builder _keyBuilder;
+  private final ClusterId _clusterId;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
+  public AtomicClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(clusterId, accessor);
+    _lockProvider = lockProvider;
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+    _clusterId = clusterId;
+  }
+
+  @Override
+  public boolean createCluster(ClusterConfig cluster) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.createCluster(cluster);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropCluster() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.dropCluster();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public Cluster readCluster() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.readCluster();
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean addParticipantToCluster(ParticipantConfig participant) {
+    if (participant == null) {
+      LOG.error("Participant config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participant.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.addParticipantToCluster(participant);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropParticipantFromCluster(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.dropParticipantFromCluster(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean addResourceToCluster(ResourceConfig resource) {
+    if (resource == null) {
+      LOG.error("Resource config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resource.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.addResourceToCluster(resource);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean dropResourceFromCluster(ResourceId resourceId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.dropResourceFromCluster(resourceId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public ClusterConfig updateCluster(ClusterConfig.Delta clusterDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        Cluster cluster = super.readCluster();
+        if (cluster == null) {
+          LOG.error("Cluster does not exist, cannot be updated");
+          return null;
+        }
+        ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
+        boolean status = setBasicClusterConfig(config);
+        return status ? config : null;
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 8780115..3548c82 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -20,6 +20,7 @@ package org.apache.helix.api.accessor;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -91,11 +92,12 @@ public class ClusterAccessor {
    * @return true if created, false if creation failed
    */
   public boolean createCluster(ClusterConfig cluster) {
-    boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
-    if (!created) {
+    ClusterConfiguration configuration = _accessor.getProperty(_keyBuilder.clusterConfig());
+    if (configuration != null && isClusterStructureValid()) {
       LOG.error("Cluster already created. Aborting.");
       return false;
     }
+    clearClusterStructure();
     initClusterStructure();
     Map<StateModelDefId, StateModelDefinition> stateModelDefs = cluster.getStateModelMap();
     for (StateModelDefinition stateModelDef : stateModelDefs.values()) {
@@ -121,10 +123,10 @@ public class ClusterAccessor {
     if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
       _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats());
     }
-    _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
     if (cluster.isPaused()) {
       pauseCluster();
     }
+    _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
 
     return true;
   }
@@ -150,7 +152,7 @@ public class ClusterAccessor {
    * @param config ClusterConfig
    * @return true if correctly set, false otherwise
    */
-  private boolean setBasicClusterConfig(ClusterConfig config) {
+  protected boolean setBasicClusterConfig(ClusterConfig config) {
     if (config == null) {
       return false;
     }
@@ -199,9 +201,13 @@ public class ClusterAccessor {
 
   /**
    * read entire cluster data
-   * @return cluster snapshot
+   * @return cluster snapshot or null
    */
   public Cluster readCluster() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up");
+      return null;
+    }
     LiveInstance leader = _accessor.getProperty(_keyBuilder.controllerLeader());
 
     /**
@@ -279,6 +285,11 @@ public class ClusterAccessor {
    * @return map of resource id to resource
    */
   public Map<ResourceId, Resource> readResources() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up yet!");
+      return Collections.emptyMap();
+    }
+
     /**
      * map of resource-id to ideal-state
      */
@@ -319,9 +330,14 @@ public class ClusterAccessor {
 
   /**
    * Read all participants in the cluster
-   * @return map of participant id to participant
+   * @return map of participant id to participant, or empty map
    */
   public Map<ParticipantId, Participant> readParticipants() {
+    if (!isClusterStructureValid()) {
+      LOG.error("Cluster is not fully set up yet!");
+      return Collections.emptyMap();
+    }
+
     /**
      * map of instance-id to instance-config
      */
@@ -667,18 +683,8 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    return isClusterStructureValid(_clusterId, _accessor.getBaseDataAccessor());
-  }
-
-  /**
-   * check if cluster structure is valid
-   * @param clusterId the cluster to check
-   * @param baseAccessor a base data accessor
-   * @return true if valid or false otherwise
-   */
-  private static boolean isClusterStructureValid(ClusterId clusterId,
-      BaseDataAccessor<?> baseAccessor) {
-    List<String> paths = getRequiredPaths(clusterId);
+    List<String> paths = getRequiredPaths(_clusterId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
       for (boolean exists : existsResults) {
@@ -693,7 +699,7 @@ public class ClusterAccessor {
   /**
    * Create empty persistent properties to ensure that there is a valid cluster structure
    */
-  private void initClusterStructure() {
+  public void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     List<String> paths = getRequiredPaths(_clusterId);
     for (String path : paths) {
@@ -705,6 +711,15 @@ public class ClusterAccessor {
   }
 
   /**
+   * Remove all but the top level cluster node; intended for reconstructing the cluster
+   */
+  private void clearClusterStructure() {
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    List<String> paths = getRequiredPaths(_clusterId);
+    baseAccessor.remove(paths, 0);
+  }
+
+  /**
    * Get all property paths that must be set for a cluster structure to be valid
    * @param the cluster that the paths will be relative to
    * @return list of paths as strings
@@ -712,7 +727,6 @@ public class ClusterAccessor {
   private static List<String> getRequiredPaths(ClusterId clusterId) {
     PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
     List<String> paths = new ArrayList<String>();
-    paths.add(keyBuilder.cluster().getPath());
     paths.add(keyBuilder.clusterConfigs().getPath());
     paths.add(keyBuilder.instanceConfigs().getPath());
     paths.add(keyBuilder.propertyStore().getPath());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index c53bcd8..7952761 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -618,12 +618,10 @@ public class ParticipantAccessor {
   boolean dropParticipant(ParticipantId participantId) {
     if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) == null) {
       LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
-      return false;
     }
 
     if (_accessor.getProperty(_keyBuilder.instance(participantId.stringify())) == null) {
       LOG.error("Participant: " + participantId + " structure does NOT exist in cluster");
-      return false;
     }
 
     // delete participant config path

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
new file mode 100644
index 0000000..79c15d0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -0,0 +1,37 @@
+package org.apache.helix.lock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Generic (distributed) lock for Helix-related persisted updates
+ */
+public interface HelixLock {
+  /**
+   * Synchronously acquire a lock
+   * @return true if the lock was acquired, false if could not be acquired
+   */
+  public boolean lock();
+
+  /**
+   * Release a lock
+   * @return true if the lock was released, false if it could not be released
+   */
+  public boolean unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
new file mode 100644
index 0000000..fdb2ca5
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLockable.java
@@ -0,0 +1,36 @@
+package org.apache.helix.lock;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Implemented by any Helix construct that is lockable and is able to return a HelixLock instance
+ */
+public interface HelixLockable {
+  /**
+   * Get a lock object on a scope
+   * @param clusterId cluster to lock
+   * @param scope scope relative to the cluster that the lock protects
+   * @return HelixLock instance
+   */
+  HelixLock getLock(ClusterId clusterId, Scope<?> scope);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
new file mode 100644
index 0000000..bb2118c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/LockListener.java
@@ -0,0 +1,39 @@
+package org.apache.helix.lock.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * This class has two methods which are call
+ * back methods when a lock is acquired and
+ * when the lock is released.
+ */
+interface LockListener {
+  /**
+   * call back called when the lock
+   * is acquired
+   */
+  public void lockAcquired();
+
+  /**
+   * call back called when the lock is
+   * released.
+   */
+  public void lockReleased();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
new file mode 100644
index 0000000..23bef6a
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ProtocolSupport.java
@@ -0,0 +1,191 @@
+package org.apache.helix.lock.zk;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A base class for protocol implementations which provides a number of higher
+ * level helper methods for working with ZooKeeper along with retrying synchronous
+ * operations if the connection to ZooKeeper closes such as
+ * {@link #retryOperation(ZooKeeperOperation)}
+ */
+class ProtocolSupport {
+  private static final Logger LOG = Logger.getLogger(ProtocolSupport.class);
+
+  protected final ZooKeeper zookeeper;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private long retryDelay = 500L;
+  private int retryCount = 10;
+  private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+  public ProtocolSupport(ZooKeeper zookeeper) {
+    this.zookeeper = zookeeper;
+  }
+
+  /**
+   * Closes this strategy and releases any ZooKeeper resources; but keeps the
+   * ZooKeeper instance open
+   */
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      doClose();
+    }
+  }
+
+  /**
+   * return zookeeper client instance
+   * @return zookeeper client instance
+   */
+  public ZooKeeper getZookeeper() {
+    return zookeeper;
+  }
+
+  /**
+   * return the acl its using
+   * @return the acl.
+   */
+  public List<ACL> getAcl() {
+    return acl;
+  }
+
+  /**
+   * set the acl
+   * @param acl the acl to set to
+   */
+  public void setAcl(List<ACL> acl) {
+    this.acl = acl;
+  }
+
+  /**
+   * get the retry delay in milliseconds
+   * @return the retry delay
+   */
+  public long getRetryDelay() {
+    return retryDelay;
+  }
+
+  /**
+   * Sets the time waited between retry delays
+   * @param retryDelay the retry delay
+   */
+  public void setRetryDelay(long retryDelay) {
+    this.retryDelay = retryDelay;
+  }
+
+  /**
+   * Allow derived classes to perform
+   * some custom closing operations to release resources
+   */
+  protected void doClose() {
+  }
+
+  /**
+   * Perform the given operation, retrying if the connection fails
+   * @return object. it needs to be cast to the callee's expected
+   *         return type.
+   */
+  protected Object retryOperation(ZooKeeperOperation operation) throws KeeperException,
+      InterruptedException {
+    KeeperException exception = null;
+    for (int i = 0; i < retryCount; i++) {
+      try {
+        return operation.execute();
+      } catch (KeeperException.SessionExpiredException e) {
+        LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);
+        throw e;
+      } catch (KeeperException.ConnectionLossException e) {
+        if (exception == null) {
+          exception = e;
+        }
+        LOG.debug("Attempt " + i + " failed with connection loss so " + "attempting to reconnect: "
+            + e, e);
+        retryDelay(i);
+      }
+    }
+    throw exception;
+  }
+
+  /**
+   * Ensures that the given path exists with no data, the current
+   * ACL and no flags
+   * @param path
+   */
+  protected void ensurePathExists(String path) {
+    ensureExists(path, null, acl, CreateMode.PERSISTENT);
+  }
+
+  /**
+   * Ensures that the given path exists with the given data, ACL and flags
+   * @param path
+   * @param acl
+   * @param flags
+   */
+  protected void ensureExists(final String path, final byte[] data, final List<ACL> acl,
+      final CreateMode flags) {
+    try {
+      retryOperation(new ZooKeeperOperation() {
+        public boolean execute() throws KeeperException, InterruptedException {
+          Stat stat = zookeeper.exists(path, false);
+          if (stat != null) {
+            return true;
+          }
+          zookeeper.create(path, data, acl, flags);
+          return true;
+        }
+      });
+    } catch (KeeperException e) {
+      LOG.warn("Caught: " + e, e);
+    } catch (InterruptedException e) {
+      LOG.warn("Caught: " + e, e);
+    }
+  }
+
+  /**
+   * Returns true if this protocol has been closed
+   * @return true if this protocol is closed
+   */
+  protected boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Performs a retry delay if this is not the first attempt
+   * @param attemptCount the number of the attempts performed so far
+   */
+  protected void retryDelay(int attemptCount) {
+    if (attemptCount > 0) {
+      try {
+        Thread.sleep(attemptCount * retryDelay);
+      } catch (InterruptedException e) {
+        LOG.debug("Failed to sleep: " + e, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
new file mode 100644
index 0000000..aef7618
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/WriteLock.java
@@ -0,0 +1,294 @@
+package org.apache.helix.lock.zk;
+
+import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A protocol to implement an exclusive
+ * write lock or to elect a leader.
+ * <p/>
+ * You invoke {@link #lock()} to start the process of grabbing the lock; you may get the lock then
+ * or it may be some time later.
+ * <p/>
+ * You can register a listener so that you are invoked when you get the lock; otherwise you can ask
+ * if you have the lock by calling {@link #isOwner()}
+ */
+class WriteLock extends ProtocolSupport {
+  private static final Logger LOG = Logger.getLogger(WriteLock.class);
+
+  private final String dir;
+  private String id;
+  private ZNodeName idName;
+  private String ownerId;
+  private String lastChildId;
+  private byte[] data = {
+      0x12, 0x34
+  };
+  private LockListener callback;
+  private LockZooKeeperOperation zop;
+
+  /**
+   * zookeeper contructor for writelock
+   * @param zookeeper zookeeper client instance
+   * @param dir the parent path you want to use for locking
+   * @param acls the acls that you want to use for all the paths,
+   *          if null world read/write is used.
+   */
+  public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
+    super(zookeeper);
+    this.dir = dir;
+    if (acl != null) {
+      setAcl(acl);
+    }
+    this.zop = new LockZooKeeperOperation();
+  }
+
+  /**
+   * zookeeper contructor for writelock with callback
+   * @param zookeeper the zookeeper client instance
+   * @param dir the parent path you want to use for locking
+   * @param acl the acls that you want to use for all the paths
+   * @param callback the call back instance
+   */
+  public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, LockListener callback) {
+    this(zookeeper, dir, acl);
+    this.callback = callback;
+  }
+
+  /**
+   * return the current locklistener
+   * @return the locklistener
+   */
+  public LockListener getLockListener() {
+    return this.callback;
+  }
+
+  /**
+   * register a different call back listener
+   * @param callback the call back instance
+   */
+  public void setLockListener(LockListener callback) {
+    this.callback = callback;
+  }
+
+  /**
+   * Removes the lock or associated znode if
+   * you no longer require the lock. this also
+   * removes your request in the queue for locking
+   * in case you do not already hold the lock.
+   * @throws RuntimeException throws a runtime exception
+   *           if it cannot connect to zookeeper.
+   */
+  public synchronized void unlock() throws RuntimeException {
+
+    if (!isClosed() && id != null) {
+      // we don't need to retry this operation in the case of failure
+      // as ZK will remove ephemeral files and we don't wanna hang
+      // this process when closing if we cannot reconnect to ZK
+      try {
+
+        ZooKeeperOperation zopdel = new ZooKeeperOperation() {
+          public boolean execute() throws KeeperException, InterruptedException {
+            zookeeper.delete(id, -1);
+            return Boolean.TRUE;
+          }
+        };
+        zopdel.execute();
+      } catch (InterruptedException e) {
+        LOG.warn("Caught: " + e, e);
+        // set that we have been interrupted.
+        Thread.currentThread().interrupt();
+      } catch (KeeperException.NoNodeException e) {
+        // do nothing
+      } catch (KeeperException e) {
+        LOG.warn("Caught: " + e, e);
+        throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);
+      } finally {
+        if (callback != null) {
+          callback.lockReleased();
+        }
+        id = null;
+      }
+    }
+  }
+
+  /**
+   * the watcher called on
+   * getting watch while watching
+   * my predecessor
+   */
+  private class LockWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+      // lets either become the leader or watch the new/updated node
+      LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState()
+          + " type " + event.getType());
+      try {
+        lock();
+      } catch (Exception e) {
+        LOG.warn("Failed to acquire lock: " + e, e);
+      }
+    }
+  }
+
+  /**
+   * a zoookeeper operation that is mainly responsible
+   * for all the magic required for locking.
+   */
+  private class LockZooKeeperOperation implements ZooKeeperOperation {
+
+    /**
+     * find if we have been created earler if not create our node
+     * @param prefix the prefix node
+     * @param zookeeper teh zookeeper client
+     * @param dir the dir paretn
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
+        throws KeeperException, InterruptedException {
+      List<String> names = zookeeper.getChildren(dir, false);
+      for (String name : names) {
+        if (name.startsWith(prefix)) {
+          id = name;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found id created last time: " + id);
+          }
+          break;
+        }
+      }
+      if (id == null) {
+        id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Created id: " + id);
+        }
+      }
+
+    }
+
+    /**
+     * the command that is run and retried for actually
+     * obtaining the lock
+     * @return if the command was successful or not
+     */
+    public boolean execute() throws KeeperException, InterruptedException {
+      do {
+        if (id == null) {
+          long sessionId = zookeeper.getSessionId();
+          String prefix = "x-" + sessionId + "-";
+          // lets try look up the current ID if we failed
+          // in the middle of creating the znode
+          findPrefixInChildren(prefix, zookeeper, dir);
+          idName = new ZNodeName(id);
+        }
+        if (id != null) {
+          List<String> names = zookeeper.getChildren(dir, false);
+          if (names.isEmpty()) {
+            LOG.warn("No children in: " + dir + " when we've just "
+                + "created one! Lets recreate it...");
+            // lets force the recreation of the id
+            id = null;
+          } else {
+            // lets sort them explicitly (though they do seem to come back in order ususally :)
+            SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
+            for (String name : names) {
+              sortedNames.add(new ZNodeName(dir + "/" + name));
+            }
+            ownerId = sortedNames.first().getName();
+            SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
+            if (!lessThanMe.isEmpty()) {
+              ZNodeName lastChildName = lessThanMe.last();
+              lastChildId = lastChildName.getName();
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("watching less than me node: " + lastChildId);
+              }
+              Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
+              if (stat != null) {
+                return Boolean.FALSE;
+              } else {
+                LOG.warn("Could not find the" + " stats for less than me: "
+                    + lastChildName.getName());
+              }
+            } else {
+              if (isOwner()) {
+                if (callback != null) {
+                  callback.lockAcquired();
+                }
+                return Boolean.TRUE;
+              }
+            }
+          }
+        }
+      } while (id == null);
+      return Boolean.FALSE;
+    }
+  };
+
+  /**
+   * Attempts to acquire the exclusive write lock returning whether or not it was
+   * acquired. Note that the exclusive lock may be acquired some time later after
+   * this method has been invoked due to the current lock owner going away.
+   */
+  public synchronized boolean lock() throws KeeperException, InterruptedException {
+    if (isClosed()) {
+      return false;
+    }
+    ensurePathExists(dir);
+
+    return (Boolean) retryOperation(zop);
+  }
+
+  /**
+   * return the parent dir for lock
+   * @return the parent dir used for locks.
+   */
+  public String getDir() {
+    return dir;
+  }
+
+  /**
+   * Returns true if this node is the owner of the
+   * lock (or the leader)
+   */
+  public boolean isOwner() {
+    return id != null && ownerId != null && id.equals(ownerId);
+  }
+
+  /**
+   * return the id for this lock
+   * @return the id for this lock
+   */
+  public String getId() {
+    return this.id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
new file mode 100644
index 0000000..6f09c5e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -0,0 +1,167 @@
+package org.apache.helix.lock.zk;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation
+ */
+public class ZKHelixLock implements HelixLock {
+  private static final Logger LOG = Logger.getLogger(ZKHelixLock.class);
+
+  private static final String LOCK_ROOT = "LOCKS";
+  private final String _rootPath;
+  private final WriteLock _writeLock;
+  private final ZkClient _zkClient;
+  private boolean _locked;
+  private boolean _canceled;
+
+  private final LockListener _listener = new LockListener() {
+    @Override
+    public void lockReleased() {
+    }
+
+    @Override
+    public void lockAcquired() {
+      synchronized (ZKHelixLock.this) {
+        if (!_canceled) {
+          _locked = true;
+          ZKHelixLock.this.notify();
+        } else {
+          unlock();
+        }
+      }
+    }
+  };
+
+  /**
+   * Initialize for a cluster and scope
+   * @param clusterId the cluster under which the lock will live
+   * @param scope the scope to lock
+   * @param zkClient an active ZK client
+   */
+  public ZKHelixLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+    _zkClient = zkClient;
+    _rootPath =
+        '/' + clusterId.stringify() + '/' + LOCK_ROOT + '/' + scope.getType() + '_'
+            + scope.getScopedId();
+    ZooKeeper zookeeper = ((ZkConnection) zkClient.getConnection()).getZookeeper();
+    _writeLock = new WriteLock(zookeeper, _rootPath, null, _listener);
+    _locked = false;
+    _canceled = false;
+  }
+
+  /**
+   * Try to synchronously lock the scope
+   * @return true if the lock succeeded, false if it failed, as is the case if the connection to ZK
+   *         is lost
+   */
+  public synchronized boolean lock() {
+    _canceled = false;
+    if (_locked) {
+      // no need to proceed if the lock is already acquired
+      return true;
+    }
+    try {
+      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
+      baseAccessor.create(_rootPath, null, AccessOption.PERSISTENT);
+      boolean acquired = _writeLock.lock();
+      if (acquired) {
+        _locked = true;
+      } else {
+        wait();
+      }
+    } catch (KeeperException e) {
+      LOG.error("Error acquiring a lock on " + _rootPath, e);
+      _canceled = true;
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while acquiring a lock on " + _rootPath);
+      _canceled = true;
+    }
+    return _locked;
+  }
+
+  /**
+   * Unlock the scope
+   * @return true if unlock executed, false otherwise
+   */
+  public synchronized boolean unlock() {
+    _writeLock.unlock();
+    _locked = false;
+    return true;
+  }
+
+  public static void main(String[] args) {
+    ZkClient zkClient = new ZkClient("localhost:2199");
+    ClusterId clusterId = ClusterId.from("exampleCluster");
+    final ZKHelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient);
+    final ZKHelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient);
+    System.err.println("lock1 started");
+    boolean result = lock1.lock();
+    System.err.println("lock1 finished " + result);
+    new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(10000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        System.err.println("unlock1 started");
+        lock1.unlock();
+        System.err.println("unlock1 finished");
+      }
+    }.start();
+    final Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        System.err.println("lock2 started");
+        boolean locked = lock2.lock();
+        System.err.println("lock2 finished " + locked);
+      }
+    };
+    t1.start();
+    new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(5000);
+          System.err.println("interrupt2 start");
+          t1.interrupt();
+          System.err.println("interrupt2 finished");
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
new file mode 100644
index 0000000..47253e6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZNodeName.java
@@ -0,0 +1,113 @@
+package org.apache.helix.lock.zk;
+
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Represents an ephemeral znode name which has an ordered sequence number
+ * and can be sorted in order
+ */
+class ZNodeName implements Comparable<ZNodeName> {
+  private final String name;
+  private String prefix;
+  private int sequence = -1;
+  private static final Logger LOG = Logger.getLogger(ZNodeName.class);
+
+  public ZNodeName(String name) {
+    if (name == null) {
+      throw new NullPointerException("id cannot be null");
+    }
+    this.name = name;
+    this.prefix = name;
+    int idx = name.lastIndexOf('-');
+    if (idx >= 0) {
+      this.prefix = name.substring(0, idx);
+      try {
+        this.sequence = Integer.parseInt(name.substring(idx + 1));
+        // If an exception occurred we misdetected a sequence suffix,
+        // so return -1.
+      } catch (NumberFormatException e) {
+        LOG.info("Number format exception for " + idx, e);
+      } catch (ArrayIndexOutOfBoundsException e) {
+        LOG.info("Array out of bounds for " + idx, e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+
+    ZNodeName sequence = (ZNodeName) o;
+
+    if (!name.equals(sequence.name))
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return name.hashCode() + 37;
+  }
+
+  public int compareTo(ZNodeName that) {
+    int answer = this.prefix.compareTo(that.prefix);
+    if (answer == 0) {
+      int s1 = this.sequence;
+      int s2 = that.sequence;
+      if (s1 == -1 && s2 == -1) {
+        return this.name.compareTo(that.name);
+      }
+      answer = s1 == -1 ? 1 : s2 == -1 ? -1 : s1 - s2;
+    }
+    return answer;
+  }
+
+  /**
+   * Returns the name of the znode
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Returns the sequence number
+   */
+  public int getZNodeName() {
+    return sequence;
+  }
+
+  /**
+   * Returns the text prefix before the sequence number
+   */
+  public String getPrefix() {
+    return prefix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/cb305124/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
new file mode 100644
index 0000000..58b9fe3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZooKeeperOperation.java
@@ -0,0 +1,38 @@
+package org.apache.helix.lock.zk;
+
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A callback object which can be used for implementing retry-able operations in the
+ * {@link org.apache.helix.lock.zk.recipes.lock.ProtocolSupport} class
+ */
+interface ZooKeeperOperation {
+
+  /**
+   * Performs the operation - which may be involved multiple times if the connection
+   * to ZooKeeper closes during this operation
+   * @return the result of the operation or null
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  public boolean execute() throws KeeperException, InterruptedException;
+}