You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:43 UTC

[35/53] [abbrv] [HELIX-268] Atomic API - Add a ZK lock and a skeleton AtomicClusterAccessor

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
new file mode 100644
index 0000000..a92f12a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -0,0 +1,162 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestAccessorRecreate extends ZkUnitTestBase {
+  private static final Logger LOG = Logger.getLogger(TestAccessorRecreate.class);
+
+  /**
+   * This test just makes sure that a cluster is only recreated if it is incomplete. This is not
+   * directly testing atomicity, but rather a use case where a machine died while creating the
+   * cluster.
+   */
+  @Test
+  public void testRecreateCluster() {
+    final String MODIFIER = "modifier";
+    final ClusterId clusterId = ClusterId.from("testCluster");
+
+    // connect
+    boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    if (!connected) {
+      LOG.warn("Connection not established");
+      return;
+    }
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+    // create a cluster
+    boolean created = createCluster(clusterId, accessor, MODIFIER, 1);
+    Assert.assertTrue(created);
+
+    // read the cluster
+    Cluster clusterSnapshot = accessor.readCluster();
+    Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+    // create a cluster with the same id
+    boolean created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+    Assert.assertFalse(created2); // should fail since cluster exists
+
+    // remove a required property
+    helixAccessor.removeProperty(helixAccessor.keyBuilder().liveInstances());
+
+    // try again, should work this time
+    created2 = createCluster(clusterId, accessor, MODIFIER, 2);
+    Assert.assertTrue(created2);
+
+    // read the cluster again
+    clusterSnapshot = accessor.readCluster();
+    Assert.assertEquals(clusterSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+    accessor.dropCluster();
+  }
+
+  /**
+   * This test just makes sure that a participant is only recreated if it is incomplete. This is not
+   * directly testing atomicity, but rather a use case where a machine died while creating the
+   * participant.
+   */
+  @Test
+  public void testRecreateParticipant() {
+    final String MODIFIER = "modifier";
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final ParticipantId participantId = ParticipantId.from("testParticipant");
+
+    // connect
+    boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    if (!connected) {
+      LOG.warn("Connection not established");
+      return;
+    }
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    HelixDataAccessor helixAccessor = new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+
+    // create the cluster
+    boolean clusterCreated = createCluster(clusterId, accessor, MODIFIER, 0);
+    Assert.assertTrue(clusterCreated);
+
+    // create the participant
+    boolean created = createParticipant(participantId, accessor, MODIFIER, 1);
+    Assert.assertTrue(created);
+
+    // read the participant
+    ParticipantAccessor participantAccessor = new ParticipantAccessor(helixAccessor);
+    Participant participantSnapshot = participantAccessor.readParticipant(participantId);
+    Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 1);
+
+    // create a participant with the same id
+    boolean created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+    Assert.assertFalse(created2); // should fail since participant exists
+
+    // remove a required property
+    helixAccessor.removeProperty(helixAccessor.keyBuilder().messages(participantId.stringify()));
+
+    // try again, should work this time
+    created2 = createParticipant(participantId, accessor, MODIFIER, 2);
+    Assert.assertTrue(created2);
+
+    // read the cluster again
+    participantSnapshot = participantAccessor.readParticipant(participantId);
+    Assert.assertEquals(participantSnapshot.getUserConfig().getIntField(MODIFIER, -1), 2);
+
+    accessor.dropCluster();
+  }
+
+  private boolean createCluster(ClusterId clusterId, ClusterAccessor accessor, String modifierName,
+      int modifierValue) {
+    // create a cluster
+    UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+    userConfig.setIntField(modifierName, modifierValue);
+    ClusterConfig cluster = new ClusterConfig.Builder(clusterId).userConfig(userConfig).build();
+    return accessor.createCluster(cluster);
+  }
+
+  private boolean createParticipant(ParticipantId participantId, ClusterAccessor accessor,
+      String modifierName, int modifierValue) {
+    // create a participant
+    UserConfig userConfig = new UserConfig(Scope.participant(participantId));
+    userConfig.setIntField(modifierName, modifierValue);
+    ParticipantConfig participant =
+        new ParticipantConfig.Builder(participantId).hostName("host").port(0)
+            .userConfig(userConfig).build();
+    return accessor.addParticipantToCluster(participant);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
new file mode 100644
index 0000000..4087796
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -0,0 +1,202 @@
+package org.apache.helix.api.accessor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.State;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.lock.HelixLock;
+import org.apache.helix.lock.HelixLockable;
+import org.apache.helix.lock.zk.ZKHelixLock;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test that the atomic accessors behave atomically in response to interwoven updates.
+ */
+public class TestAtomicAccessors extends ZkUnitTestBase {
+  private static final long TIMEOUT = 30000L;
+  private static final long EXTRA_WAIT = 10000L;
+
+  @Test
+  public void testClusterUpdates() {
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    final HelixDataAccessor helixAccessor =
+        new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
+    final LockProvider lockProvider = new LockProvider();
+    final StateModelDefId stateModelDefId = StateModelDefId.from("FakeModel");
+    final State state = State.from("fake");
+    final int constraint1 = 10;
+    final int constraint2 = 11;
+    final String key1 = "key1";
+    final String key2 = "key2";
+
+    // set up the cluster (non-atomically since this concurrency comes later)
+    ClusterAccessor accessor = new ClusterAccessor(clusterId, helixAccessor);
+    ClusterConfig config = new ClusterConfig.Builder(clusterId).build();
+    boolean created = accessor.createCluster(config);
+    Assert.assertTrue(created);
+
+    // thread that will update the cluster in one way
+    Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+        userConfig.setBooleanField(key1, true);
+        ClusterConfig.Delta delta =
+            new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+                Scope.cluster(clusterId), stateModelDefId, state, constraint1).setUserConfig(
+                userConfig);
+        ClusterAccessor accessor =
+            new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+        accessor.updateCluster(delta);
+      }
+    };
+
+    // thread that will update the cluster in another way
+    Thread t2 = new Thread() {
+      @Override
+      public void run() {
+        UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
+        userConfig.setBooleanField(key2, true);
+        ClusterConfig.Delta delta =
+            new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
+                Scope.cluster(clusterId), stateModelDefId, state, constraint2).setUserConfig(
+                userConfig);
+        ClusterAccessor accessor =
+            new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
+        accessor.updateCluster(delta);
+      }
+    };
+
+    // start the threads
+    t1.start();
+    t2.start();
+
+    // make sure the threads are done
+    long startTime = System.currentTimeMillis();
+    try {
+      t1.join(TIMEOUT);
+      t2.join(TIMEOUT);
+    } catch (InterruptedException e) {
+      Assert.fail(e.getMessage());
+      t1.interrupt();
+      t2.interrupt();
+    }
+    long endTime = System.currentTimeMillis();
+    if (endTime - startTime > TIMEOUT - EXTRA_WAIT) {
+      Assert.fail("Test timed out");
+      t1.interrupt();
+      t2.interrupt();
+    }
+
+    Assert.assertTrue(lockProvider.hasLockBlocked());
+
+    accessor.dropCluster();
+  }
+
+  /**
+   * A HelixLockable that returns an instrumented ZKHelixLock
+   */
+  private class LockProvider implements HelixLockable {
+    private HelixLock _firstLock = null;
+    private AtomicBoolean _hasSecondBlocked = new AtomicBoolean(false);
+
+    @Override
+    public synchronized HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+      return new MyLock(clusterId, scope, _gZkClient);
+    }
+
+    /**
+     * Check if a lock object has blocked
+     * @return true if a block happened, false otherwise
+     */
+    public synchronized boolean hasLockBlocked() {
+      return _hasSecondBlocked.get();
+    }
+
+    /**
+     * An instrumented ZKHelixLock
+     */
+    private class MyLock extends ZKHelixLock {
+      /**
+       * Instantiate a lock that instruments a ZKHelixLock
+       * @param clusterId the cluster to lock
+       * @param scope the scope to lock on
+       * @param zkClient an active ZooKeeper client
+       */
+      public MyLock(ClusterId clusterId, Scope<?> scope, ZkClient zkClient) {
+        super(clusterId, scope, zkClient);
+      }
+
+      @Override
+      public boolean lock() {
+        // synchronize here to ensure atomic set and so that the first lock is the first one who
+        // gets to lock
+        synchronized (LockProvider.this) {
+          if (_firstLock == null) {
+            _firstLock = this;
+          }
+          return super.lock();
+        }
+      }
+
+      @Override
+      public boolean unlock() {
+        if (_firstLock == this) {
+          // wait to unlock until another thread has blocked
+          synchronized (_hasSecondBlocked) {
+            if (!_hasSecondBlocked.get()) {
+              try {
+                _hasSecondBlocked.wait(TIMEOUT);
+              } catch (InterruptedException e) {
+              }
+            }
+          }
+        }
+        return super.unlock();
+      }
+
+      @Override
+      protected void setBlocked(boolean isBlocked) {
+        if (isBlocked) {
+          synchronized (_hasSecondBlocked) {
+            _hasSecondBlocked.set(true);
+            _hasSecondBlocked.notify();
+          }
+        }
+        super.setBlocked(isBlocked);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/61643b1d/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
new file mode 100644
index 0000000..4e023ba
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -0,0 +1,117 @@
+package org.apache.helix.lock.zk;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.lock.HelixLock;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Tests that the Zookeeper-based Helix lock can acquire, block, and release as appropriate
+ */
+public class TestZKHelixLock extends ZkUnitTestBase {
+  @Test
+  public void basicTest() {
+    _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+    final AtomicBoolean t1Locked = new AtomicBoolean(false);
+    final AtomicBoolean t1Done = new AtomicBoolean(false);
+    final AtomicInteger field1 = new AtomicInteger(0);
+    final AtomicInteger field2 = new AtomicInteger(1);
+    final ClusterId clusterId = ClusterId.from("testCluster");
+    final HelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+    final HelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), _gZkClient);
+
+    // thread 1: get a lock, set fields to 1
+    Thread t1 = new Thread() {
+      @Override
+      public void run() {
+        lock1.lock();
+        synchronized (t1Locked) {
+          t1Locked.set(true);
+          t1Locked.notify();
+        }
+        yield(); // if locking doesn't work, t2 will set the fields first
+        field1.set(1);
+        field2.set(1);
+        synchronized (t1Done) {
+          t1Done.set(true);
+          t1Done.notify();
+        }
+      }
+    };
+
+    // thread 2: wait for t1 to acquire the lock, get a lock, set fields to 2
+    Thread t2 = new Thread() {
+      @Override
+      public void run() {
+        synchronized (t1Locked) {
+          while (!t1Locked.get()) {
+            try {
+              t1Locked.wait();
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+        lock2.lock();
+        field1.set(2);
+        field2.set(2);
+      }
+    };
+
+    // start the threads
+    t1.setPriority(Thread.MIN_PRIORITY);
+    t2.setPriority(Thread.MAX_PRIORITY);
+    t1.start();
+    t2.start();
+
+    // wait for t1 to finish setting fields
+    synchronized (t1Done) {
+      while (!t1Done.get()) {
+        try {
+          t1Done.wait();
+        } catch (InterruptedException e) {
+        }
+      }
+    }
+
+    // make sure both fields are 1
+    Assert.assertEquals(field1.get(), 1);
+    Assert.assertEquals(field2.get(), 1);
+
+    // unlock t1's lock after checking that t2 is blocked
+    Assert.assertTrue(lock2.isBlocked());
+    lock1.unlock();
+
+    try {
+      // wait for t2, make sure both fields are 2
+      t2.join(10000);
+      Assert.assertEquals(field1.get(), 2);
+      Assert.assertEquals(field2.get(), 2);
+    } catch (InterruptedException e) {
+    }
+  }
+}