You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:49 UTC

[41/53] [abbrv] git commit: [HELIX-268] Atomic API, rb=14578

[HELIX-268] Atomic API, rb=14578


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e39b924b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e39b924b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e39b924b

Branch: refs/heads/master
Commit: e39b924b5d3975b90e530c17be7bdf2239affe86
Parents: 61643b1
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Oct 10 17:59:31 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:37 2013 -0800

----------------------------------------------------------------------
 .../helix/api/accessor/AtomicClusterAccessor.java   |  6 +++++-
 .../api/accessor/AtomicParticipantAccessor.java     |  6 +++++-
 .../helix/api/accessor/AtomicResourceAccessor.java  |  6 +++++-
 .../java/org/apache/helix/lock/zk/ZKHelixLock.java  |  8 +++++---
 .../apache/helix/model/ClusterConfiguration.java    | 16 ----------------
 .../helix/api/accessor/TestAccessorRecreate.java    | 10 +++++++++-
 .../helix/api/accessor/TestAtomicAccessors.java     | 12 +++++-------
 .../controller/stages/TestMessageThrottleStage.java |  8 ++++++++
 .../controller/stages/TestRebalancePipeline.java    | 14 ++++++++++++++
 .../org/apache/helix/lock/zk/TestZKHelixLock.java   | 14 ++++++++++++--
 10 files changed, 68 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 1196770..37cc47e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -46,7 +46,11 @@ import com.google.common.collect.Sets;
  * An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
  * this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
  * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
  */
 public class AtomicClusterAccessor extends ClusterAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index 05fb0ec..1c734e3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -38,7 +38,11 @@ import org.apache.log4j.Logger;
  * An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
  * this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
  * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
  */
 public class AtomicParticipantAccessor extends ParticipantAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 4bb2ebe..6d69981 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -34,7 +34,11 @@ import org.apache.log4j.Logger;
  * An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
  * this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
  * instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
  */
 public class AtomicResourceAccessor extends ResourceAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 4d3b459..1d56f13 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -92,9 +92,9 @@ public class ZKHelixLock implements HelixLock {
   @Override
   public synchronized boolean lock() {
     _canceled = false;
-    if (_locked) {
-      // no need to proceed if the lock is already acquired
-      return true;
+    if (_locked || isBlocked()) {
+      // no need to proceed if the lock is already acquired or already waiting
+      return false;
     }
     try {
       // create the root path if it doesn't exist
@@ -132,6 +132,8 @@ public class ZKHelixLock implements HelixLock {
       if (LOG.isInfoEnabled()) {
         LOG.info("Unlock skipped because lock node was not present");
       }
+    } catch (RuntimeException e) {
+      LOG.error("Error connecting to release the lock");
     }
     _locked = false;
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 1278ceb..6887f24 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -66,22 +66,6 @@ public class ClusterConfiguration extends HelixProperty {
   }
 
   /**
-   * Set the identifier of this configuration for the last write
-   * @param writeId positive random long identifier
-   */
-  public void setWriteId(long writeId) {
-    _record.setLongField(Fields.WRITE_ID.toString(), writeId);
-  }
-
-  /**
-   * Get the identifier for the last write to this configuration
-   * @return positive write identifier, or -1 of not set
-   */
-  public long getWriteId() {
-    return _record.getLongField(Fields.WRITE_ID.toString(), -1);
-  }
-
-  /**
    * Create a new ClusterConfiguration from a UserConfig
    * @param userConfig user-defined configuration properties
    * @return ClusterConfiguration

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
index a92f12a..4eebbc6 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -50,7 +50,7 @@ public class TestAccessorRecreate extends ZkUnitTestBase {
   @Test
   public void testRecreateCluster() {
     final String MODIFIER = "modifier";
-    final ClusterId clusterId = ClusterId.from("testCluster");
+    final ClusterId clusterId = ClusterId.from("TestAccessorRecreate!testCluster");
 
     // connect
     boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
@@ -159,4 +159,12 @@ public class TestAccessorRecreate extends ZkUnitTestBase {
             .userConfig(userConfig).build();
     return accessor.addParticipantToCluster(participant);
   }
+  // private HelixLockable lockProvider() {
+  // return new HelixLockable() {
+  // @Override
+  // public HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+  // return new ZKHelixLock(clusterId, scope, _gZkClient);
+  // }
+  // };
+  // }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
index 4087796..443c3db 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -49,7 +49,7 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
 
   @Test
   public void testClusterUpdates() {
-    final ClusterId clusterId = ClusterId.from("testCluster");
+    final ClusterId clusterId = ClusterId.from("TestAtomicAccessors!testCluster");
     final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
     final HelixDataAccessor helixAccessor =
         new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
@@ -160,15 +160,13 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
       }
 
       @Override
-      public boolean lock() {
+      public synchronized boolean lock() {
         // synchronize here to ensure atomic set and so that the first lock is the first one who
         // gets to lock
-        synchronized (LockProvider.this) {
-          if (_firstLock == null) {
-            _firstLock = this;
-          }
-          return super.lock();
+        if (_firstLock == null) {
+          _firstLock = this;
         }
+        return super.lock();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 450d654..0bd8795 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -31,6 +31,8 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
@@ -70,6 +72,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     });
     setupStateModel(clusterName);
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
 
@@ -143,6 +148,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
     });
     setupStateModel(clusterName);
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     // setup constraints
     ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 5a7c6ac..c9e0f53 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -30,6 +30,8 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.SessionId;
@@ -77,6 +79,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     });
     setupStateModel(clusterName);
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new NewReadClusterDataStage());
@@ -153,6 +158,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
         0, 1
     });
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     TestHelper
         .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
 
@@ -226,6 +234,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     });
     setupStateModel(clusterName);
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new NewReadClusterDataStage());
@@ -328,6 +339,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     });
     setupStateModel(clusterName);
 
+    ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+    clusterAccessor.initClusterStructure();
+
     // cluster data cache refresh pipeline
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new NewReadClusterDataStage());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
index 4e023ba..f39ca11 100644
--- a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -35,8 +35,10 @@ import org.testng.annotations.Test;
  */
 public class TestZKHelixLock extends ZkUnitTestBase {
   @Test
-  public void basicTest() {
-    _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+  public void basicTest() throws InterruptedException {
+    final long TIMEOUT = 30000;
+    final long RETRY_INTERVAL = 100;
+    _gZkClient.waitUntilConnected(TIMEOUT, TimeUnit.MILLISECONDS);
     final AtomicBoolean t1Locked = new AtomicBoolean(false);
     final AtomicBoolean t1Done = new AtomicBoolean(false);
     final AtomicInteger field1 = new AtomicInteger(0);
@@ -103,6 +105,14 @@ public class TestZKHelixLock extends ZkUnitTestBase {
     Assert.assertEquals(field2.get(), 1);
 
     // unlock t1's lock after checking that t2 is blocked
+    long count = 0;
+    while (!lock2.isBlocked()) {
+      if (count > TIMEOUT) {
+        break;
+      }
+      Thread.sleep(RETRY_INTERVAL);
+      count += RETRY_INTERVAL;
+    }
     Assert.assertTrue(lock2.isBlocked());
     lock1.unlock();