You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:43 UTC
[35/53] [abbrv] [HELIX-268] Atomic API - Add a ZK lock and a skeleton
AtomicClusterAccessor
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
new file mode 100644
index 0000000..a92f12a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -0,0 +1,162 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestAccessorRecreate extends ZkUnitTestBase {
+ private static final Logger LOG = Logger.getLogger(TestAccessorRecreate.class);
+
+ /**
+ * This test just makes sure that a cluster is only recreated if it is incomplete. This is not
+ * directly testing atomicity, but rather a use case where a machine died while creating the
+ * cluster.
+ */
+ @Test
+ public void testRecreateCluster() {
+ final String MODIFIER = "modifier";
+ final ClusterId clusterId = ClusterId.from("testCluster");
+
+ // connect
+ boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ if (!connected) {
+ LOG.warn("Connection not established");
+ return;
+ }
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+ // create a cluster
+ boolean created = createCluster(clusterId, accessor, MODIFIER, 1);
+ Assert.assertTrue(created);
+
+ // read the cluster
+ Cluster clusterSnapshot = accessor.readCluster();
+ Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+ // create a cluster with the same id
+ boolean created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+ Assert.assertFalse(created2); // should fail since cluster exists
+
+ // remove a required property
+ helixAccessor.removeProperty(helixAccessor.keyBuilder().liveInstances());
+
+ // try again, should work this time
+ created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+ Assert.assertTrue(created2);
+
+ // read the cluster again
+ clusterSnapshot = accessor.readCluster();
+ Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+ accessor.dropCluster();
+ }
+
+ /**
+ * This test just makes sure that a participant is only recreated if it is incomplete. This is not
+ * directly testing atomicity, but rather a use case where a machine died while creating the
+ * participant.
+ */
+ @Test
+ public void testRecreateParticipant() {
+ final String MODIFIER = "modifier";
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final ParticipantId participantId = ParticipantId.from("testParticipant");
+
+ // connect
+ boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ if (!connected) {
+ LOG.warn("Connection not established");
+ return;
+ }
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+ // create the cluster
+ boolean clusterCreated = createCluster(clusterId, accessor, MODIFIER, 0);
+ Assert.assertTrue(clusterCreated);
+
+ // create the participant
+ boolean created = createParticipant(participantId, accessor, MODIFIER, 1);
+ Assert.assertTrue(created);
+
+ // read the participant
+ ParticipantAccessor participantAccessor = new ParticipantAccessor(helixAccessor);
+ Participant participantSnapshot = participantAccessor.readParticipant(participantId);
+ Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+ // create a participant with the same id
+ boolean created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+ Assert.assertFalse(created2); // should fail since participant exists
+
+ // remove a required property
+ helixAccessor.removeProperty(helixAccessor.keyBuilder().messages(participantId.stringify()));
+
+ // try again, should work this time
+ created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+ Assert.assertTrue(created2);
+
+ // read the cluster again
+ participantSnapshot = participantAccessor.readParticipant(participantId);
+ Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+ accessor.dropCluster();
+ }
+
+ private boolean createCluster(ClusterId clusterId, ClusterAccessor accessor, String modifierName,
+ int modifierValue) {
+ // create a cluster
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setIntField(modifierName, modifierValue);
+ ClusterConfig cluster = new ClusterConfig.Builder(clusterId).userConfig(userConfig).build();
+ return accessor.createCluster(cluster);
+ }
+
+ private boolean createParticipant(ParticipantId participantId, ClusterAccessor accessor,
+ String modifierName, int modifierValue) {
+ // create a participant
+ UserConfig userConfig = new UserConfig(Scope.participant(participantId));
+ userConfig.setIntField(modifierName, modifierValue);
+ ParticipantConfig participant =
+ new ParticipantConfig.Builder(participantId).hostName("host").port(0)
+ .userConfig(userConfig).build();
+ return accessor.addParticipantToCluster(participant);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
new file mode 100644
index 0000000..4087796
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -0,0 +1,202 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.lock.zk.ZKHelixLock;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that the atomic accessors behave atomically in response to interwoven updates.
+ */
+public class TestAtomicAccessors extends ZkUnitTestBase {
+ private static final long TIMEOUT = 30000L;
+ private static final long EXTRA_WAIT = 10000L;
+
+ @Test
+ public void testClusterUpdates() {
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+ final HelixDataAccessor helixAccessor =
+ new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+ final LockProvider lockProvider = new LockProvider();
+ final StateModelDefId stateModelDefId = StateModelDefId.from("FakeModel");
+ final State state = State.from("fake");
+ final int constraint1 = 10;
+ final int constraint2 = 11;
+ final String key1 = "key1";
+ final String key2 = "key2";
+
+ // set up the cluster (non-atomically since this concurrency comes later)
+ ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+ ClusterConfig config = new ClusterConfig.Builder(clusterId).build();
+ boolean created = accessor.createCluster(config);
+ Assert.assertTrue(created);
+
+ // thread that will update the cluster in one way
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setBooleanField(key1, true);
+ ClusterConfig.Delta delta =
+ new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+ Scope.cluster(clusterId), stateModelDefId, state, constraint1).setUserConfig(
+ userConfig);
+ ClusterAccessor accessor =
+ new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+ accessor.updateCluster(delta);
+ }
+ };
+
+ // thread that will update the cluster in another way
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+ userConfig.setBooleanField(key2, true);
+ ClusterConfig.Delta delta =
+ new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+ Scope.cluster(clusterId), stateModelDefId, state, constraint2).setUserConfig(
+ userConfig);
+ ClusterAccessor accessor =
+ new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+ accessor.updateCluster(delta);
+ }
+ };
+
+ // start the threads
+ t1.start();
+ t2.start();
+
+ // make sure the threads are done
+ long startTime = System.currentTimeMillis();
+ try {
+ t1.join(TIMEOUT);
+ t2.join(TIMEOUT);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ t1.interrupt();
+ t2.interrupt();
+ }
+ long endTime = System.currentTimeMillis();
+ if (endTime - startTime > TIMEOUT - EXTRA_WAIT) {
+ Assert.fail("Test timed out");
+ t1.interrupt();
+ t2.interrupt();
+ }
+
+ Assert.assertTrue(lockProvider.hasLockBlocked());
+
+ accessor.dropCluster();
+ }
+
+ /**
+ * A HelixLockable that returns an instrumented ZKHelixLock
+ */
+ private class LockProvider implements HelixLockable {
+ private HelixLock _firstLock = null;
+ private AtomicBoolean _hasSecondBlocked = new AtomicBoolean(false);
+
+ @Override
+ public synchronized HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+ return new MyLock(clusterId, scope, _gZkClient);
+ }
+
+ /**
+ * Check if a lock object has blocked
+ * @return true if a block happened, false otherwise
+ */
+ public synchronized boolean hasLockBlocked() {
+ return _hasSecondBlocked.get();
+ }
+
+ /**
+ * An instrumented ZKHelixLock
+ */
+ private class MyLock extends ZKHelixLock {
+ /**
+ * Instantiate a lock that instruments a ZKHelixLock
+ * @param clusterId the cluster to lock
+ * @param scope the scope to lock on
+ * @param zkClient an active ZooKeeper client
+ */
+ public MyLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+ super(clusterId, scope, zkClient);
+ }
+
+ @Override
+ public boolean lock() {
+ // synchronize here to ensure atomic set and so that the first lock is the first one who
+ // gets to lock
+ synchronized (LockProvider.this) {
+ if (_firstLock == null) {
+ _firstLock = this;
+ }
+ return super.lock();
+ }
+ }
+
+ @Override
+ public boolean unlock() {
+ if (_firstLock == this) {
+ // wait to unlock until another thread has blocked
+ synchronized (_hasSecondBlocked) {
+ if (!_hasSecondBlocked.get()) {
+ try {
+ _hasSecondBlocked.wait(TIMEOUT);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ return super.unlock();
+ }
+
+ @Override
+ protected void setBlocked(boolean isBlocked) {
+ if (isBlocked) {
+ synchronized (_hasSecondBlocked) {
+ _hasSecondBlocked.set(true);
+ _hasSecondBlocked.notify();
+ }
+ }
+ super.setBlocked(isBlocked);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
new file mode 100644
index 0000000..4e023ba
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -0,0 +1,117 @@
+package org.apache.helix.lock.zk;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Tests that the Zookeeper-based Helix lock can acquire, block, and release as appropriate
+ */
+public class TestZKHelixLock extends ZkUnitTestBase {
+ @Test
+ public void basicTest() {
+ _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ final AtomicBoolean t1Locked = new AtomicBoolean(false);
+ final AtomicBoolean t1Done = new AtomicBoolean(false);
+ final AtomicInteger field1 = new AtomicInteger(0);
+ final AtomicInteger field2 = new AtomicInteger(1);
+ final ClusterId clusterId = ClusterId.from("testCluster");
+ final HelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+ final HelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+
+ // thread 1: get a lock, set fields to 1
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ lock1.lock();
+ synchronized (t1Locked) {
+ t1Locked.set(true);
+ t1Locked.notify();
+ }
+ yield(); // if locking doesn't work, t2 will set the fields first
+ field1.set(1);
+ field2.set(1);
+ synchronized (t1Done) {
+ t1Done.set(true);
+ t1Done.notify();
+ }
+ }
+ };
+
+ // thread 2: wait for t1 to acquire the lock, get a lock, set fields to 2
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ synchronized (t1Locked) {
+ while (!t1Locked.get()) {
+ try {
+ t1Locked.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ lock2.lock();
+ field1.set(2);
+ field2.set(2);
+ }
+ };
+
+ // start the threads
+ t1.setPriority(Thread.MIN_PRIORITY);
+ t2.setPriority(Thread.MAX_PRIORITY);
+ t1.start();
+ t2.start();
+
+ // wait for t1 to finish setting fields
+ synchronized (t1Done) {
+ while (!t1Done.get()) {
+ try {
+ t1Done.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // make sure both fields are 1
+ Assert.assertEquals(field1.get(), 1);
+ Assert.assertEquals(field2.get(), 1);
+
+ // unlock t1's lock after checking that t2 is blocked
+ Assert.assertTrue(lock2.isBlocked());
+ lock1.unlock();
+
+ try {
+ // wait for t2, make sure both fields are 2
+ t2.join(10000);
+ Assert.assertEquals(field1.get(), 2);
+ Assert.assertEquals(field2.get(), 2);
+ } catch (InterruptedException e) {
+ }
+ }
+}