You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/07 02:19:49 UTC
[41/53] [abbrv] git commit: [HELIX-268] Atomic API, rb=14578
[HELIX-268] Atomic API, rb=14578
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/e39b924b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/e39b924b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/e39b924b
Branch: refs/heads/master
Commit: e39b924b5d3975b90e530c17be7bdf2239affe86
Parents: 61643b1
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Oct 10 17:59:31 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 6 13:17:37 2013 -0800
----------------------------------------------------------------------
.../helix/api/accessor/AtomicClusterAccessor.java | 6 +++++-
.../api/accessor/AtomicParticipantAccessor.java | 6 +++++-
.../helix/api/accessor/AtomicResourceAccessor.java | 6 +++++-
.../java/org/apache/helix/lock/zk/ZKHelixLock.java | 8 +++++---
.../apache/helix/model/ClusterConfiguration.java | 16 ----------------
.../helix/api/accessor/TestAccessorRecreate.java | 10 +++++++++-
.../helix/api/accessor/TestAtomicAccessors.java | 12 +++++-------
.../controller/stages/TestMessageThrottleStage.java | 8 ++++++++
.../controller/stages/TestRebalancePipeline.java | 14 ++++++++++++++
.../org/apache/helix/lock/zk/TestZKHelixLock.java | 14 ++++++++++++--
10 files changed, 68 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 1196770..37cc47e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -46,7 +46,11 @@ import com.google.common.collect.Sets;
* An atomic version of the ClusterAccessor. If atomic operations are required, use instances of
* this class. Atomicity is not guaranteed when using instances of ClusterAccessor alongside
* instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
*/
public class AtomicClusterAccessor extends ClusterAccessor {
private static final Logger LOG = Logger.getLogger(AtomicClusterAccessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index 05fb0ec..1c734e3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -38,7 +38,11 @@ import org.apache.log4j.Logger;
* An atomic version of the ParticipantAccessor. If atomic operations are required, use instances of
* this class. Atomicity is not guaranteed when using instances of ParticipantAccessor alongside
* instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
*/
public class AtomicParticipantAccessor extends ParticipantAccessor {
private static final Logger LOG = Logger.getLogger(AtomicParticipantAccessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 4bb2ebe..6d69981 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -34,7 +34,11 @@ import org.apache.log4j.Logger;
* An atomic version of the ResourceAccessor. If atomic operations are required, use instances of
* this class. Atomicity is not guaranteed when using instances of ResourceAccessor alongside
* instances of this class. Furthermore, depending on the semantics of the lock, lock acquisition
- * may fail, in which case users should handle the return value of each function if necessary.
+ * may fail, in which case users should handle the return value of each function if necessary. <br/>
+ * <br/>
+ * Using this class is quite expensive; it should thus be used sparingly and only in systems where
+ * contention on these operations is expected. For most systems running Helix, this is typically not
+ * the case.
*/
public class AtomicResourceAccessor extends ResourceAccessor {
private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 4d3b459..1d56f13 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -92,9 +92,9 @@ public class ZKHelixLock implements HelixLock {
@Override
public synchronized boolean lock() {
_canceled = false;
- if (_locked) {
- // no need to proceed if the lock is already acquired
- return true;
+ if (_locked || isBlocked()) {
+ // no need to proceed if the lock is already acquired or already waiting
+ return false;
}
try {
// create the root path if it doesn't exist
@@ -132,6 +132,8 @@ public class ZKHelixLock implements HelixLock {
if (LOG.isInfoEnabled()) {
LOG.info("Unlock skipped because lock node was not present");
}
+ } catch (RuntimeException e) {
+ LOG.error("Error connecting to release the lock");
}
_locked = false;
return true;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 1278ceb..6887f24 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -66,22 +66,6 @@ public class ClusterConfiguration extends HelixProperty {
}
/**
- * Set the identifier of this configuration for the last write
- * @param writeId positive random long identifier
- */
- public void setWriteId(long writeId) {
- _record.setLongField(Fields.WRITE_ID.toString(), writeId);
- }
-
- /**
- * Get the identifier for the last write to this configuration
- * @return positive write identifier, or -1 of not set
- */
- public long getWriteId() {
- return _record.getLongField(Fields.WRITE_ID.toString(), -1);
- }
-
- /**
* Create a new ClusterConfiguration from a UserConfig
* @param userConfig user-defined configuration properties
* @return ClusterConfiguration
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
index a92f12a..4eebbc6 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAccessorRecreate.java
@@ -50,7 +50,7 @@ public class TestAccessorRecreate extends ZkUnitTestBase {
@Test
public void testRecreateCluster() {
final String MODIFIER = "modifier";
- final ClusterId clusterId = ClusterId.from("testCluster");
+ final ClusterId clusterId = ClusterId.from("TestAccessorRecreate!testCluster");
// connect
boolean connected = _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
@@ -159,4 +159,12 @@ public class TestAccessorRecreate extends ZkUnitTestBase {
.userConfig(userConfig).build();
return accessor.addParticipantToCluster(participant);
}
+ // private HelixLockable lockProvider() {
+ // return new HelixLockable() {
+ // @Override
+ // public HelixLock getLock(ClusterId clusterId, Scope<?> scope) {
+ // return new ZKHelixLock(clusterId, scope, _gZkClient);
+ // }
+ // };
+ // }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
index 4087796..443c3db 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -49,7 +49,7 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
@Test
public void testClusterUpdates() {
- final ClusterId clusterId = ClusterId.from("testCluster");
+ final ClusterId clusterId = ClusterId.from("TestAtomicAccessors!testCluster");
final BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
final HelixDataAccessor helixAccessor =
new ZKHelixDataAccessor(clusterId.stringify(), baseAccessor);
@@ -160,15 +160,13 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
}
@Override
- public boolean lock() {
+ public synchronized boolean lock() {
// synchronize here to ensure atomic set and so that the first lock is the first one who
// gets to lock
- synchronized (LockProvider.this) {
- if (_firstLock == null) {
- _firstLock = this;
- }
- return super.lock();
+ if (_firstLock == null) {
+ _firstLock = this;
}
+ return super.lock();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 450d654..0bd8795 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -31,6 +31,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
@@ -70,6 +72,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
});
setupStateModel(clusterName);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
ClusterEvent event = new ClusterEvent("testEvent");
event.addAttribute("helixmanager", manager);
@@ -143,6 +148,9 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
});
setupStateModel(clusterName);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
// setup constraints
ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 5a7c6ac..c9e0f53 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -30,6 +30,8 @@ import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.SessionId;
@@ -77,6 +79,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
});
setupStateModel(clusterName);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
dataRefresh.addStage(new NewReadClusterDataStage());
@@ -153,6 +158,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
0, 1
});
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
TestHelper
.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
@@ -226,6 +234,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
});
setupStateModel(clusterName);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
dataRefresh.addStage(new NewReadClusterDataStage());
@@ -328,6 +339,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
});
setupStateModel(clusterName);
+ ClusterAccessor clusterAccessor = new ClusterAccessor(ClusterId.from(clusterName), accessor);
+ clusterAccessor.initClusterStructure();
+
// cluster data cache refresh pipeline
Pipeline dataRefresh = new Pipeline();
dataRefresh.addStage(new NewReadClusterDataStage());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e39b924b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
index 4e023ba..f39ca11 100644
--- a/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
+++ b/helix-core/src/test/java/org/apache/helix/lock/zk/TestZKHelixLock.java
@@ -35,8 +35,10 @@ import org.testng.annotations.Test;
*/
public class TestZKHelixLock extends ZkUnitTestBase {
@Test
- public void basicTest() {
- _gZkClient.waitUntilConnected(30000, TimeUnit.MILLISECONDS);
+ public void basicTest() throws InterruptedException {
+ final long TIMEOUT = 30000;
+ final long RETRY_INTERVAL = 100;
+ _gZkClient.waitUntilConnected(TIMEOUT, TimeUnit.MILLISECONDS);
final AtomicBoolean t1Locked = new AtomicBoolean(false);
final AtomicBoolean t1Done = new AtomicBoolean(false);
final AtomicInteger field1 = new AtomicInteger(0);
@@ -103,6 +105,14 @@ public class TestZKHelixLock extends ZkUnitTestBase {
Assert.assertEquals(field2.get(), 1);
// unlock t1's lock after checking that t2 is blocked
+ long count = 0;
+ while (!lock2.isBlocked()) {
+ if (count > TIMEOUT) {
+ break;
+ }
+ Thread.sleep(RETRY_INTERVAL);
+ count += RETRY_INTERVAL;
+ }
Assert.assertTrue(lock2.isBlocked());
lock1.unlock();