You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/08 23:17:20 UTC

git commit: [HELIX-268] Atomic API for cluster, resource, participant

Updated Branches:
  refs/heads/helix-logical-model cb3051241 -> b40608916


[HELIX-268] Atomic API for cluster, resource, participant


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/b4060891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/b4060891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/b4060891

Branch: refs/heads/helix-logical-model
Commit: b40608916362be5a72698f15cb00ba3e1bbb800b
Parents: cb30512
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 8 14:12:59 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 8 14:12:59 2013 -0700

----------------------------------------------------------------------
 .../api/accessor/AtomicClusterAccessor.java     |  63 +++--
 .../api/accessor/AtomicParticipantAccessor.java | 232 +++++++++++++++++++
 .../api/accessor/AtomicResourceAccessor.java    | 133 +++++++++++
 .../helix/api/accessor/ClusterAccessor.java     |   7 +-
 .../helix/api/accessor/ParticipantAccessor.java |  17 +-
 .../org/apache/helix/lock/zk/ZKHelixLock.java   |  15 +-
 6 files changed, 439 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index ff8ab6e..a2af79b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -1,19 +1,5 @@
 package org.apache.helix.api.accessor;
 
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.lock.HelixLock;
-import org.apache.helix.lock.HelixLockable;
-import org.apache.log4j.Logger;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -33,6 +19,27 @@ import org.apache.log4j.Logger;
  * under the License.
  */
 
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
 /**
  * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
  * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
@@ -43,9 +50,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
 
   private final HelixLockable _lockProvider;
-  @SuppressWarnings("unused")
   private final HelixDataAccessor _accessor;
-  @SuppressWarnings("unused")
   private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
@@ -182,7 +187,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
           return null;
         }
         ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
-        boolean status = setBasicClusterConfig(config);
+        boolean status = super.setBasicClusterConfig(config);
         return status ? config : null;
       } finally {
         lock.unlock();
@@ -190,4 +195,28 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     }
     return null;
   }
+
+  /**
+   * Read resources atomically. This is resource-atomic, not cluster-atomic
+   */
+  @Override
+  public Map<ResourceId, Resource> readResources() {
+    // read resources individually instead of together to maintain the equality link between ideal
+    // state and resource config
+    Map<ResourceId, Resource> resources = Maps.newHashMap();
+    Set<String> idealStateNames =
+        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.idealStates()));
+    Set<String> resourceConfigNames =
+        Sets.newHashSet(_accessor.getChildNames(_keyBuilder.resourceConfigs()));
+    resourceConfigNames.addAll(idealStateNames);
+    ResourceAccessor accessor = new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+    for (String resourceName : resourceConfigNames) {
+      ResourceId resourceId = ResourceId.from(resourceName);
+      Resource resource = accessor.readResource(resourceId);
+      if (resource != null) {
+        resources.put(resourceId, resource);
+      }
+    }
+    return resources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
new file mode 100644
index 0000000..8482208
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -0,0 +1,232 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+/**
+ * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicParticipantAccessor extends ParticipantAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
+
+  private final ClusterId _clusterId;
+  private final HelixDataAccessor _accessor;
+  private final PropertyKey.Builder _keyBuilder;
+  private final HelixLockable _lockProvider;
+
+  public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(accessor);
+    _clusterId = clusterId;
+    _accessor = accessor;
+    _keyBuilder = accessor.keyBuilder();
+    _lockProvider = lockProvider;
+  }
+
+  @Override
+  public Participant readParticipant(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.readParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean setParticipant(ParticipantConfig participantConfig) {
+    if (participantConfig == null) {
+      LOG.error("participant config cannot be null");
+      return false;
+    }
+    HelixLock lock =
+        _lockProvider.getLock(_clusterId, Scope.participant(participantConfig.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.setParticipant(participantConfig);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public ParticipantConfig updateParticipant(ParticipantId participantId,
+      ParticipantConfig.Delta participantDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        Participant participant = super.readParticipant(participantId);
+        if (participant == null) {
+          LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+          return null;
+        }
+        ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+        super.setParticipant(config);
+        return config;
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is
+   * participant-local and resource-local.
+   */
+  @Override
+  public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+    Participant oldParticipant = readParticipant(oldParticipantId);
+    if (oldParticipant == null) {
+      LOG.error("Could not swap participants because the old participant does not exist");
+      return false;
+    }
+    if (oldParticipant.isEnabled()) {
+      LOG.error("Could not swap participants because the old participant is still enabled");
+      return false;
+    }
+    if (oldParticipant.isAlive()) {
+      LOG.error("Could not swap participants because the old participant is still live");
+      return false;
+    }
+    Participant newParticipant = readParticipant(newParticipantId);
+    if (newParticipant == null) {
+      LOG.error("Could not swap participants because the new participant does not exist");
+      return false;
+    }
+    dropParticipant(oldParticipantId);
+    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates());
+    for (String resourceName : idealStates) {
+      HelixLock lock =
+          _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName)));
+      boolean locked = lock.lock();
+      if (locked) {
+        try {
+          // lock the resource for all ideal state reads and updates
+          IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
+          if (idealState != null) {
+            swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+            PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+            resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+            _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  boolean dropParticipant(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.dropParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void insertMessagesToParticipant(ParticipantId participantId,
+      Map<MessageId, Message> msgMap) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        super.insertMessagesToParticipant(participantId, msgMap);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        super.updateMessageStatus(participantId, msgMap);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        super.deleteMessagesFromParticipant(participantId, msgIdSet);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  protected ResourceAccessor resourceAccessor() {
+    return new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
new file mode 100644
index 0000000..4ff50d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -0,0 +1,133 @@
+package org.apache.helix.api.accessor;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicResourceAccessor extends ResourceAccessor {
+  private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
+
+  private final ClusterId _clusterId;
+  private final HelixLockable _lockProvider;
+
+  public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+      HelixLockable lockProvider) {
+    super(accessor);
+    _clusterId = clusterId;
+    _lockProvider = lockProvider;
+  }
+
+  @Override
+  public Resource readResource(ResourceId resourceId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.readResource(resourceId);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        Resource resource = super.readResource(resourceId);
+        if (resource == null) {
+          LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+          return null;
+        }
+        ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+        super.setResource(config);
+        return config;
+      } finally {
+        lock.unlock();
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.setRebalancerContext(resourceId, context);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean setResource(ResourceConfig resourceConfig) {
+    if (resourceConfig == null) {
+      LOG.error("resource config cannot be null");
+      return false;
+    }
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceConfig.getId()));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.setResource(resourceConfig);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+      String participantGroupTag) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 3548c82..bec7308 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -81,6 +81,11 @@ public class ClusterAccessor {
   private final PropertyKey.Builder _keyBuilder;
   private final ClusterId _clusterId;
 
+  /**
+   * Instantiate a cluster accessor
+   * @param clusterId the cluster to access
+   * @param accessor HelixDataAccessor for the physical store
+   */
   public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
@@ -281,7 +286,7 @@ public class ClusterAccessor {
   }
 
   /**
-   * Read all resource in the cluster
+   * Read all resources in the cluster
    * @return map of resource id to resource
    */
   public Map<ResourceId, Resource> readResources() {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 7952761..50945aa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -205,9 +205,6 @@ public class ParticipantAccessor {
       }
     }
 
-    // TODO merge list logic should go to znrecord updater
-    // update participantConfig
-    // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
     BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
     final List<String> partitionNames = new ArrayList<String>();
     for (PartitionId partitionId : partitionIdSet) {
@@ -306,7 +303,7 @@ public class ParticipantAccessor {
     RunningInstance runningInstance = participant.getRunningInstance();
 
     // check that the resource exists
-    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    ResourceAccessor resourceAccessor = resourceAccessor();
     Resource resource = resourceAccessor.readResource(resourceId);
     if (resource == null || resource.getRebalancerConfig() == null) {
       LOG.error("Cannot reset partitions because the resource is not present");
@@ -676,8 +673,8 @@ public class ParticipantAccessor {
    * @param oldParticipantId the participant to drop
    * @param newParticipantId the participant that replaces it
    */
-  private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId,
-      ParticipantId newParticipantId) {
+  protected void swapParticipantsInIdealState(IdealState idealState,
+      ParticipantId oldParticipantId, ParticipantId newParticipantId) {
     for (PartitionId partitionId : idealState.getPartitionSet()) {
       List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
       if (oldPreferenceList != null) {
@@ -702,4 +699,12 @@ public class ParticipantAccessor {
       }
     }
   }
+
+  /**
+   * Get a ResourceAccessor instance
+   * @return ResourceAccessor
+   */
+  protected ResourceAccessor resourceAccessor() {
+    return new ResourceAccessor(_accessor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 6f09c5e..7ddbce1 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper;
 
 /**
  * Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation
+ * Please use the following lock order convention: Cluster, Participant, Resource, Partition
  */
 public class ZKHelixLock implements HelixLock {
   private static final Logger LOG = Logger.getLogger(ZKHelixLock.class);
@@ -42,8 +43,8 @@ public class ZKHelixLock implements HelixLock {
   private final String _rootPath;
   private final WriteLock _writeLock;
   private final ZkClient _zkClient;
-  private boolean _locked;
-  private boolean _canceled;
+  private volatile boolean _locked;
+  private volatile boolean _canceled;
 
   private final LockListener _listener = new LockListener() {
     @Override
@@ -55,10 +56,10 @@ public class ZKHelixLock implements HelixLock {
       synchronized (ZKHelixLock.this) {
         if (!_canceled) {
           _locked = true;
-          ZKHelixLock.this.notify();
         } else {
           unlock();
         }
+        ZKHelixLock.this.notify();
       }
     }
   };
@@ -115,7 +116,13 @@ public class ZKHelixLock implements HelixLock {
    * @return true if unlock executed, false otherwise
    */
   public synchronized boolean unlock() {
-    _writeLock.unlock();
+    try {
+      _writeLock.unlock();
+    } catch (IllegalArgumentException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Unlock skipped because lock node was not present");
+      }
+    }
     _locked = false;
     return true;
   }