You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/08 23:17:20 UTC
git commit: [HELIX-268] Atomic API for cluster, resource, participant
Updated Branches:
refs/heads/helix-logical-model cb3051241 -> b40608916
[HELIX-268] Atomic API for cluster, resource, participant
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/b4060891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/b4060891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/b4060891
Branch: refs/heads/helix-logical-model
Commit: b40608916362be5a72698f15cb00ba3e1bbb800b
Parents: cb30512
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 8 14:12:59 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 8 14:12:59 2013 -0700
----------------------------------------------------------------------
.../api/accessor/AtomicClusterAccessor.java | 63 +++--
.../api/accessor/AtomicParticipantAccessor.java | 232 +++++++++++++++++++
.../api/accessor/AtomicResourceAccessor.java | 133 +++++++++++
.../helix/api/accessor/ClusterAccessor.java | 7 +-
.../helix/api/accessor/ParticipantAccessor.java | 17 +-
.../org/apache/helix/lock/zk/ZKHelixLock.java | 15 +-
6 files changed, 439 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index ff8ab6e..a2af79b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -1,19 +1,5 @@
package org.apache.helix.api.accessor;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.api.Cluster;
-import org.apache.helix.api.Scope;
-import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.lock.HelixLock;
-import org.apache.helix.lock.HelixLockable;
-import org.apache.log4j.Logger;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -33,6 +19,27 @@ import org.apache.log4j.Logger;
* under the License.
*/
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
* An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
* this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
@@ -43,9 +50,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
private final HelixLockable _lockProvider;
- @SuppressWarnings("unused")
private final HelixDataAccessor _accessor;
- @SuppressWarnings("unused")
private final PropertyKey.Builder _keyBuilder;
private final ClusterId _clusterId;
@@ -182,7 +187,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
return null;
}
ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
- boolean status = setBasicClusterConfig(config);
+ boolean status = super.setBasicClusterConfig(config);
return status ? config : null;
} finally {
lock.unlock();
@@ -190,4 +195,28 @@ public class AtomicClusterAccessor extends ClusterAccessor {
}
return null;
}
+
+ /**
+ * Read resources atomically. This is resource-atomic, not cluster-atomic
+ */
+ @Override
+ public Map<ResourceId, Resource> readResources() {
+ // read resources individually instead of together to maintain the equality link between ideal
+ // state and resource config
+ Map<ResourceId, Resource> resources = Maps.newHashMap();
+ Set<String> idealStateNames =
+ Sets.newHashSet(_accessor.getChildNames(_keyBuilder.idealStates()));
+ Set<String> resourceConfigNames =
+ Sets.newHashSet(_accessor.getChildNames(_keyBuilder.resourceConfigs()));
+ resourceConfigNames.addAll(idealStateNames);
+ ResourceAccessor accessor = new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+ for (String resourceName : resourceConfigNames) {
+ ResourceId resourceId = ResourceId.from(resourceName);
+ Resource resource = accessor.readResource(resourceId);
+ if (resource != null) {
+ resources.put(resourceId, resource);
+ }
+ }
+ return resources;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
new file mode 100644
index 0000000..8482208
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -0,0 +1,232 @@
+package org.apache.helix.api.accessor;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.MessageId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import org.apache.log4j.Logger;
+
+/**
+ * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicParticipantAccessor extends ParticipantAccessor {
+ private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
+
+ private final ClusterId _clusterId;
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+ private final HelixLockable _lockProvider;
+
+ public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+ HelixLockable lockProvider) {
+ super(accessor);
+ _clusterId = clusterId;
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ _lockProvider = lockProvider;
+ }
+
+ @Override
+ public Participant readParticipant(ParticipantId participantId) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.readParticipant(participantId);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean setParticipant(ParticipantConfig participantConfig) {
+ if (participantConfig == null) {
+ LOG.error("participant config cannot be null");
+ return false;
+ }
+ HelixLock lock =
+ _lockProvider.getLock(_clusterId, Scope.participant(participantConfig.getId()));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.setParticipant(participantConfig);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public ParticipantConfig updateParticipant(ParticipantId participantId,
+ ParticipantConfig.Delta participantDelta) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ Participant participant = super.readParticipant(participantId);
+ if (participant == null) {
+ LOG.error("Participant " + participantId + " does not exist, cannot be updated");
+ return null;
+ }
+ ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
+ super.setParticipant(config);
+ return config;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is
+ * participant-local and resource-local.
+ */
+ @Override
+ public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
+ Participant oldParticipant = readParticipant(oldParticipantId);
+ if (oldParticipant == null) {
+ LOG.error("Could not swap participants because the old participant does not exist");
+ return false;
+ }
+ if (oldParticipant.isEnabled()) {
+ LOG.error("Could not swap participants because the old participant is still enabled");
+ return false;
+ }
+ if (oldParticipant.isAlive()) {
+ LOG.error("Could not swap participants because the old participant is still live");
+ return false;
+ }
+ Participant newParticipant = readParticipant(newParticipantId);
+ if (newParticipant == null) {
+ LOG.error("Could not swap participants because the new participant does not exist");
+ return false;
+ }
+ dropParticipant(oldParticipantId);
+ ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+ List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates());
+ for (String resourceName : idealStates) {
+ HelixLock lock =
+ _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName)));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ // lock the resource for all ideal state reads and updates
+ IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
+ if (idealState != null) {
+ swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
+ PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
+ resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
+ _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
+ }
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ boolean dropParticipant(ParticipantId participantId) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.dropParticipant(participantId);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void insertMessagesToParticipant(ParticipantId participantId,
+ Map<MessageId, Message> msgMap) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ super.insertMessagesToParticipant(participantId, msgMap);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return;
+ }
+
+ @Override
+ public void updateMessageStatus(ParticipantId participantId, Map<MessageId, Message> msgMap) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ super.updateMessageStatus(participantId, msgMap);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return;
+ }
+
+ @Override
+ public void deleteMessagesFromParticipant(ParticipantId participantId, Set<MessageId> msgIdSet) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ super.deleteMessagesFromParticipant(participantId, msgIdSet);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return;
+ }
+
+ @Override
+ protected ResourceAccessor resourceAccessor() {
+ return new AtomicResourceAccessor(_clusterId, _accessor, _lockProvider);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
new file mode 100644
index 0000000..4ff50d7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -0,0 +1,133 @@
+package org.apache.helix.api.accessor;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.context.RebalancerContext;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
+ * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
+ * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
+ * may fail, in which case users should handle the return value of each function if necessary.
+ */
+public class AtomicResourceAccessor extends ResourceAccessor {
+ private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
+
+ private final ClusterId _clusterId;
+ private final HelixLockable _lockProvider;
+
+ public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
+ HelixLockable lockProvider) {
+ super(accessor);
+ _clusterId = clusterId;
+ _lockProvider = lockProvider;
+ }
+
+ @Override
+ public Resource readResource(ResourceId resourceId) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.readResource(resourceId);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public ResourceConfig updateResource(ResourceId resourceId, ResourceConfig.Delta resourceDelta) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ Resource resource = super.readResource(resourceId);
+ if (resource == null) {
+ LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
+ return null;
+ }
+ ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
+ super.setResource(config);
+ return config;
+ } finally {
+ lock.unlock();
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean setRebalancerContext(ResourceId resourceId, RebalancerContext context) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.setRebalancerContext(resourceId, context);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean setResource(ResourceConfig resourceConfig) {
+ if (resourceConfig == null) {
+ LOG.error("resource config cannot be null");
+ return false;
+ }
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.resource(resourceConfig.getId()));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.setResource(resourceConfig);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean generateDefaultAssignment(ResourceId resourceId, int replicaCount,
+ String participantGroupTag) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 3548c82..bec7308 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -81,6 +81,11 @@ public class ClusterAccessor {
private final PropertyKey.Builder _keyBuilder;
private final ClusterId _clusterId;
+ /**
+ * Instantiate a cluster accessor
+ * @param clusterId the cluster to access
+ * @param accessor HelixDataAccessor for the physical store
+ */
public ClusterAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
_accessor = accessor;
_keyBuilder = accessor.keyBuilder();
@@ -281,7 +286,7 @@ public class ClusterAccessor {
}
/**
- * Read all resource in the cluster
+ * Read all resources in the cluster
* @return map of resource id to resource
*/
public Map<ResourceId, Resource> readResources() {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 7952761..50945aa 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -205,9 +205,6 @@ public class ParticipantAccessor {
}
}
- // TODO merge list logic should go to znrecord updater
- // update participantConfig
- // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
final List<String> partitionNames = new ArrayList<String>();
for (PartitionId partitionId : partitionIdSet) {
@@ -306,7 +303,7 @@ public class ParticipantAccessor {
RunningInstance runningInstance = participant.getRunningInstance();
// check that the resource exists
- ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+ ResourceAccessor resourceAccessor = resourceAccessor();
Resource resource = resourceAccessor.readResource(resourceId);
if (resource == null || resource.getRebalancerConfig() == null) {
LOG.error("Cannot reset partitions because the resource is not present");
@@ -676,8 +673,8 @@ public class ParticipantAccessor {
* @param oldParticipantId the participant to drop
* @param newParticipantId the participant that replaces it
*/
- private void swapParticipantsInIdealState(IdealState idealState, ParticipantId oldParticipantId,
- ParticipantId newParticipantId) {
+ protected void swapParticipantsInIdealState(IdealState idealState,
+ ParticipantId oldParticipantId, ParticipantId newParticipantId) {
for (PartitionId partitionId : idealState.getPartitionSet()) {
List<ParticipantId> oldPreferenceList = idealState.getPreferenceList(partitionId);
if (oldPreferenceList != null) {
@@ -702,4 +699,12 @@ public class ParticipantAccessor {
}
}
}
+
+ /**
+ * Get a ResourceAccessor instance
+ * @return ResourceAccessor
+ */
+ protected ResourceAccessor resourceAccessor() {
+ return new ResourceAccessor(_accessor);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/b4060891/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 6f09c5e..7ddbce1 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.ZooKeeper;
/**
* Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation
+ * Please use the following lock order convention: Cluster, Participant, Resource, Partition
*/
public class ZKHelixLock implements HelixLock {
private static final Logger LOG = Logger.getLogger(ZKHelixLock.class);
@@ -42,8 +43,8 @@ public class ZKHelixLock implements HelixLock {
private final String _rootPath;
private final WriteLock _writeLock;
private final ZkClient _zkClient;
- private boolean _locked;
- private boolean _canceled;
+ private volatile boolean _locked;
+ private volatile boolean _canceled;
private final LockListener _listener = new LockListener() {
@Override
@@ -55,10 +56,10 @@ public class ZKHelixLock implements HelixLock {
synchronized (ZKHelixLock.this) {
if (!_canceled) {
_locked = true;
- ZKHelixLock.this.notify();
} else {
unlock();
}
+ ZKHelixLock.this.notify();
}
}
};
@@ -115,7 +116,13 @@ public class ZKHelixLock implements HelixLock {
* @return true if unlock executed, false otherwise
*/
public synchronized boolean unlock() {
- _writeLock.unlock();
+ try {
+ _writeLock.unlock();
+ } catch (IllegalArgumentException e) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Unlock skipped because lock node was not present");
+ }
+ }
_locked = false;
return true;
}