You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/10 02:43:08 UTC
git commit: [HELIX-268] Tests for the Atomic API
Updated Branches:
refs/heads/helix-logical-model 558b42c61 -> 101fe1e9a
[HELIX-268] Tests for the Atomic API
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/101fe1e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/101fe1e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/101fe1e9
Branch: refs/heads/helix-logical-model
Commit: 101fe1e9a4c7d102643445d533dc0c59a245b4cc
Parents: 558b42c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Oct 9 17:42:27 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Oct 9 17:42:27 2013 -0700
----------------------------------------------------------------------
.../helix/api/accessor/ClusterAccessor.java | 14 +-
.../helix/api/accessor/ParticipantAccessor.java | 3 +-
.../java/org/apache/helix/lock/HelixLock.java | 6 +
.../org/apache/helix/lock/zk/ZKHelixLock.java | 19 ++
.../api/accessor/TestAccessorRecreate.java | 162 +++++++++++++++
.../helix/api/accessor/TestAtomicAccessors.java | 200 +++++++++++++++++++
.../apache/helix/lock/zk/TestZKHelixLock.java | 117 +++++++++++
7 files changed, 512 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index f283f74..ea7dbb9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -118,20 +118,19 @@ public class ClusterAccessor {
}
_accessor.createProperty(_keyBuilder.constraints(), null);
for (ClusterConstraints constraints : cluster.getConstraintMap().values()) {
- _accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
- constraints);
+ _accessor.setProperty(_keyBuilder.constraint(constraints.getType().toString()), constraints);
}
ClusterConfiguration clusterConfig = ClusterConfiguration.from(cluster.getUserConfig());
if (cluster.autoJoinAllowed()) {
clusterConfig.setAutoJoinAllowed(cluster.autoJoinAllowed());
}
if (cluster.getStats() != null && !cluster.getStats().getMapFields().isEmpty()) {
- _accessor.createProperty(_keyBuilder.persistantStat(), cluster.getStats());
+ _accessor.setProperty(_keyBuilder.persistantStat(), cluster.getStats());
}
if (cluster.isPaused()) {
pauseCluster();
}
- _accessor.createProperty(_keyBuilder.clusterConfig(), clusterConfig);
+ _accessor.setProperty(_keyBuilder.clusterConfig(), clusterConfig);
return true;
}
@@ -653,7 +652,7 @@ public class ClusterAccessor {
configuration.addNamespacedConfig(resource.getRebalancerConfig().toNamespacedConfig());
configuration.setBucketSize(resource.getBucketSize());
configuration.setBatchMessageMode(resource.getBatchMessageMode());
- _accessor.createProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
}
// Create an IdealState from a RebalancerConfig (if the resource is partitioned)
@@ -662,7 +661,7 @@ public class ClusterAccessor {
ResourceAccessor.rebalancerConfigToIdealState(rebalancerConfig, resource.getBucketSize(),
resource.getBatchMessageMode());
if (idealState != null) {
- _accessor.createProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
+ _accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
return true;
}
@@ -790,8 +789,7 @@ public class ClusterAccessor {
for (PartitionId partitionId : disabledPartitions) {
instanceConfig.setInstanceEnabledForPartition(partitionId, false);
}
- _accessor.createProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
- _accessor.createProperty(_keyBuilder.messages(participantId.stringify()), null);
+ _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 90fd986..4e7d3c2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -747,7 +747,8 @@ public class ParticipantAccessor {
* @param participantId the participant for which to generate paths
* @return list of required paths as strings
*/
- static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder, ParticipantId participantId) {
+ private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder,
+ ParticipantId participantId) {
List<String> paths = Lists.newArrayList();
paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
paths.add(keyBuilder.messages(participantId.stringify()).getPath());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
index 79c15d0..a567a5c 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -34,4 +34,10 @@ public interface HelixLock {
* @return true if the lock was released, false if it could not be released
*/
public boolean unlock();
+
+ /**
+ * Check if this object is blocked waiting on the lock
+ * @return true if blocked, false otherwise
+ */
+ public boolean isBlocked();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 7ddbce1..70614b3 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -45,6 +45,7 @@ public class ZKHelixLock implements HelixLock {
private final ZkClient _zkClient;
private volatile boolean _locked;
private volatile boolean _canceled;
+ private volatile boolean _blocked;
private final LockListener _listener = new LockListener() {
@Override
@@ -79,6 +80,7 @@ public class ZKHelixLock implements HelixLock {
_writeLock = new WriteLock(zookeeper, _rootPath, null, _listener);
_locked = false;
_canceled = false;
+ _blocked = false;
}
/**
@@ -86,6 +88,7 @@ public class ZKHelixLock implements HelixLock {
* @return true if the lock succeeded, false if it failed, as is the case if the connection to ZK
* is lost
*/
+ @Override
public synchronized boolean lock() {
_canceled = false;
if (_locked) {
@@ -99,6 +102,7 @@ public class ZKHelixLock implements HelixLock {
if (acquired) {
_locked = true;
} else {
+ setBlocked(true);
wait();
}
} catch (KeeperException e) {
@@ -108,6 +112,7 @@ public class ZKHelixLock implements HelixLock {
LOG.error("Interrupted while acquiring a lock on " + _rootPath);
_canceled = true;
}
+ setBlocked(false);
return _locked;
}
@@ -115,6 +120,7 @@ public class ZKHelixLock implements HelixLock {
* Unlock the scope
* @return true if unlock executed, false otherwise
*/
+ @Override
public synchronized boolean unlock() {
try {
_writeLock.unlock();
@@ -127,6 +133,19 @@ public class ZKHelixLock implements HelixLock {
return true;
}
+ @Override
+ public synchronized boolean isBlocked() {
+ return _blocked;
+ }
+
+ /**
+ * Set if this the lock method is currently blocked
+ * @param isBlocked true if blocked, false otherwise
+ */
+ protected synchronized void setBlocked(boolean isBlocked) {
+ _blocked = isBlocked;
+ }
+
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("localhost:2199");
ClusterId clusterId = ClusterId.from("exampleCluster");
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
new file mode 100644
index 0000000..a92f12a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -0,0 +1,162 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestAccessorRecreate extends ZkUnitTestBase {
+ private static final Logger LOG = Logger.getLogger(TestAccessorRecreate.class);
+
+ /**
+ * This test just makes sure that a cluster is only recreated if it is incomplete. This is not
+ * directly testing atomicity, but rather a use case where a machine died while creating the
+ * cluster.
+ */
+ @Test
+ public void testRecreateCluster() {
+ final String MODIFIER = "modifier";
+ final ClusterId clusterId = ClusterId.from("testCluster");
+
+ // connect
+ boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ if (!connected) {
+ LOG.warn("Connection not established");
+ return;
+ }
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+ // create a cluster
+ boolean created = createCluster(clusterId, accessor, MODIFIER, 1);
+ Assert.assertTrue(created);
+
+ // read the cluster
+ Cluster clusterSnapshot = accessor.readCluster();
+ Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+ // create a cluster with the same id
+ boolean created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+ Assert.assertFalse(created2); // should fail since cluster exists
+
+ // remove a required property
+ helixAccessor.removeProperty(helixAccessor.keyBuilder().liveInstances());
+
+ // try again, should work this time
+ created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+ Assert.assertTrue(created2);
+
+ // read the cluster again
+ clusterSnapshot = accessor.readCluster();
+ Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+ accessor.dropCluster();
+ }
+
+ /**
+ * This test just makes sure that a participant is only recreated if it is incomplete. This is not
+ * directly testing atomicity, but rather a use case where a machine died while creating the
+ * participant.
+ */
+ @Test
+ public void testRecreateParticipant() {
+ final String MODIFIER = "modifier";
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final ParticipantId participantId = ParticipantId.from("testParticipant");
+
+ // connect
+ boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ if (!connected) {
+ LOG.warn("Connection not established");
+ return;
+ }
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+ // create the cluster
+ boolean clusterCreated = createCluster(clusterId, accessor, MODIFIER, 0);
+ Assert.assertTrue(clusterCreated);
+
+ // create the participant
+ boolean created = createParticipant(participantId, accessor, MODIFIER, 1);
+ Assert.assertTrue(created);
+
+ // read the participant
+ ParticipantAccessor participantAccessor = new ParticipantAccessor(helixAccessor);
+ Participant participantSnapshot = participantAccessor.readParticipant(participantId);
+ Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+ // create a participant with the same id
+ boolean created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+ Assert.assertFalse(created2); // should fail since participant exists
+
+ // remove a required property
+ helixAccessor.removeProperty(helixAccessor.keyBuilder().messages(participantId.stringify()));
+
+ // try again, should work this time
+ created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+ Assert.assertTrue(created2);
+
+ // read the cluster again
+ participantSnapshot = participantAccessor.readParticipant(participantId);
+ Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+ accessor.dropCluster();
+ }
+
+ private boolean createCluster(ClusterId clusterId, ClusterAccessor accessor, String modifierName,
+ int modifierValue) {
+ // create a cluster
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setIntField(modifierName, modifierValue);
+ ClusterConfig cluster = new ClusterConfig.Builder(clusterId).userConfig(userConfig).build();
+ return accessor.createCluster(cluster);
+ }
+
+ private boolean createParticipant(ParticipantId participantId, ClusterAccessor accessor,
+ String modifierName, int modifierValue) {
+ // create a participant
+ UserConfig userConfig = new UserConfig(Scope.participant(participantId));
+ userConfig.setIntField(modifierName, modifierValue);
+ ParticipantConfig participant =
+ new ParticipantConfig.Builder(participantId).hostName("host").port(0)
+ .userConfig(userConfig).build();
+ return accessor.addParticipantToCluster(participant);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
new file mode 100644
index 0000000..8dbb43f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -0,0 +1,200 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.lock.zk.ZKHelixLock;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that the atomic accessors behave atomically in response to interwoven updates.
+ */
+public class TestAtomicAccessors extends ZkUnitTestBase {
+ private static final long TIMEOUT = 30000L;
+ private static final long EXTRA_WAIT = 10000L;
+
+ @Test
+ public void testClusterUpdates() {
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor helixAccessor =
+ new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ final LockProvider lockProvider = new LockProvider();
+ final StateModelDefId stateModelDefId = StateModelDefId.from("FakeModel");
+ final State state = State.from("fake");
+ final int constraint1 = 10;
+ final int constraint2 = 11;
+ final String key1 = "key1";
+ final String key2 = "key2";
+
+ // set up the cluster (non-atomically since this concurrency comes later)
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+ ClusterConfig config = new ClusterConfig.Builder(clusterId).build();
+ boolean created = accessor.createCluster(config);
+ Assert.assertTrue(created);
+
+ // thread that will update the cluster in one way
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setBooleanField(key1, true);
+ ClusterConfig.Delta delta =
+ new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+ Scope.cluster(clusterId), stateModelDefId, state, constraint1).setUserConfig(
+ userConfig);
+ ClusterAccessor accessor =
+ new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+ accessor.updateCluster(delta);
+ }
+ };
+
+ // thread that will update the cluster in another way
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setBooleanField(key2, true);
+ ClusterConfig.Delta delta =
+ new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+ Scope.cluster(clusterId), stateModelDefId, state, constraint2).setUserConfig(
+ userConfig);
+ ClusterAccessor accessor =
+ new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+ accessor.updateCluster(delta);
+ }
+ };
+
+ // start the threads
+ t1.start();
+ t2.start();
+
+ // make sure the threads are done
+ long startTime = System.currentTimeMillis();
+ try {
+ t1.join(TIMEOUT);
+ t2.join(TIMEOUT);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ t1.interrupt();
+ t2.interrupt();
+ }
+ long endTime = System.currentTimeMillis();
+ if (endTime - startTime > TIMEOUT - EXTRA_WAIT) {
+ Assert.fail("Test timed out");
+ t1.interrupt();
+ t2.interrupt();
+ }
+
+ Assert.assertTrue(lockProvider.hasLockBlocked());
+
+ accessor.dropCluster();
+ }
+
+ /**
+ * A HelixLockable that returns an instrumented ZKHelixLock
+ */
+ private class LockProvider implements HelixLockable {
+ private HelixLock _firstLock = null;
+ private AtomicBoolean _hasSecondBlocked = new AtomicBoolean(false);
+
+ @Override
+ public synchronized HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+ return new MyLock(clusterId, scope, _gZkClient);
+ }
+
+ /**
+ * Check if a lock object has blocked
+ * @return true if a block happened, false otherwise
+ */
+ public synchronized boolean hasLockBlocked() {
+ return _hasSecondBlocked.get();
+ }
+
+ /**
+ * An instrumented ZKHelixLock
+ */
+ private class MyLock extends ZKHelixLock {
+ /**
+ * Instantiate a lock that instruments a ZKHelixLock
+ * @param clusterId the cluster to lock
+ * @param scope the scope to lock on
+ * @param zkClient an active ZooKeeper client
+ */
+ public MyLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+ super(clusterId, scope, zkClient);
+ }
+
+ @Override
+ public synchronized boolean lock() {
+ // synchronize here to ensure atomic set and so that the first lock is the first one who
+ // gets to lock
+ if (_firstLock == null) {
+ _firstLock = this;
+ }
+ return super.lock();
+ }
+
+ @Override
+ public boolean unlock() {
+ if (_firstLock == this) {
+ // wait to unlock until another thread has blocked
+ synchronized (_hasSecondBlocked) {
+ if (!_hasSecondBlocked.get()) {
+ try {
+ _hasSecondBlocked.wait(TIMEOUT);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ return super.unlock();
+ }
+
+ @Override
+ protected synchronized void setBlocked(boolean isBlocked) {
+ if (isBlocked) {
+ synchronized (_hasSecondBlocked) {
+ _hasSecondBlocked.set(true);
+ _hasSecondBlocked.notify();
+ }
+ }
+ super.setBlocked(isBlocked);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/101fe1e9/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
new file mode 100644
index 0000000..4e023ba
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -0,0 +1,117 @@
+package org.apache.helix.lock.zk;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Tests that the Zookeeper-based Helix lock can acquire, block, and release as appropriate
+ */
+public class TestZKHelixLock extends ZkUnitTestBase {
+ @Test
+ public void basicTest() {
+ _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ final AtomicBoolean t1Locked = new AtomicBoolean(false);
+ final AtomicBoolean t1Done = new AtomicBoolean(false);
+ final AtomicInteger field1 = new AtomicInteger(0);
+ final AtomicInteger field2 = new AtomicInteger(1);
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final HelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+ final HelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+
+ // thread 1: get a lock, set fields to 1
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ lock1.lock();
+ synchronized (t1Locked) {
+ t1Locked.set(true);
+ t1Locked.notify();
+ }
+ yield(); // if locking doesn't work, t2 will set the fields first
+ field1.set(1);
+ field2.set(1);
+ synchronized (t1Done) {
+ t1Done.set(true);
+ t1Done.notify();
+ }
+ }
+ };
+
+ // thread 2: wait for t1 to acquire the lock, get a lock, set fields to 2
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ synchronized (t1Locked) {
+ while (!t1Locked.get()) {
+ try {
+ t1Locked.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ lock2.lock();
+ field1.set(2);
+ field2.set(2);
+ }
+ };
+
+ // start the threads
+ t1.setPriority(Thread.MIN_PRIORITY);
+ t2.setPriority(Thread.MAX_PRIORITY);
+ t1.start();
+ t2.start();
+
+ // wait for t1 to finish setting fields
+ synchronized (t1Done) {
+ while (!t1Done.get()) {
+ try {
+ t1Done.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // make sure both fields are 1
+ Assert.assertEquals(field1.get(), 1);
+ Assert.assertEquals(field2.get(), 1);
+
+ // unlock t1's lock after checking that t2 is blocked
+ Assert.assertTrue(lock2.isBlocked());
+ lock1.unlock();
+
+ try {
+ // wait for t2, make sure both fields are 2
+ t2.join(10000);
+ Assert.assertEquals(field1.get(), 2);
+ Assert.assertEquals(field2.get(), 2);
+ } catch (InterruptedException e) {
+ }
+ }
+}