You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/10 19:44:07 UTC
git commit: [HELIX-238] Assorted fixes
Updated Branches:
refs/heads/helix-logical-model 101fe1e9a -> bdb7a4d3f
[HELIX-238] Assorted fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/bdb7a4d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/bdb7a4d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/bdb7a4d3
Branch: refs/heads/helix-logical-model
Commit: bdb7a4d3fc4c29685ba47f56a6c44e1ae1dc5577
Parents: 101fe1e
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Thu Oct 10 10:35:41 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Thu Oct 10 10:35:41 2013 -0700
----------------------------------------------------------------------
.../api/accessor/AtomicClusterAccessor.java | 2 +-
.../api/accessor/AtomicParticipantAccessor.java | 7 +--
.../api/accessor/AtomicResourceAccessor.java | 2 +-
.../helix/api/accessor/ClusterAccessor.java | 23 ++++++---
.../helix/api/accessor/ParticipantAccessor.java | 42 +++++++++-------
.../helix/api/accessor/ResourceAccessor.java | 28 +++++++----
.../org/apache/helix/lock/zk/ZKHelixLock.java | 53 +++-----------------
.../helix/model/ClusterConfiguration.java | 20 ++++++++
.../helix/api/accessor/TestAtomicAccessors.java | 12 +++--
9 files changed, 95 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index d17b2af..1196770 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -57,7 +57,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
private final ClusterId _clusterId;
/**
- * Non-atomic instance to protect against recursive locking via polymorphism
+ * Non-atomic instance to protect against reentrant locking via polymorphism
*/
private final ClusterAccessor _clusterAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index fd05b48..05fb0ec 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -48,7 +48,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
private final HelixLockable _lockProvider;
/**
- * Non-atomic instance to protect against recursive locking via polymorphism
+ * Non-atomic instance to protect against reentrant locking via polymorphism
*/
private final ParticipantAccessor _participantAccessor;
@@ -68,16 +68,17 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
}
@Override
- void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+ boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
boolean locked = lock.lock();
if (locked) {
try {
- _participantAccessor.enableParticipant(participantId);
+ return _participantAccessor.enableParticipant(participantId);
} finally {
lock.unlock();
}
}
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 95c0b05..4bb2ebe 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -44,7 +44,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
private final HelixLockable _lockProvider;
/**
- * Non-atomic instance to protect against recursive locking via polymorphism
+ * Non-atomic instance to protect against reentrant locking via polymorphism
*/
private final ResourceAccessor _resourceAccessor;
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ea7dbb9..abeb649 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -161,6 +161,7 @@ public class ClusterAccessor {
return false;
}
ClusterConfiguration configuration = ClusterConfiguration.from(config.getUserConfig());
+ configuration.setAutoJoinAllowed(config.autoJoinAllowed());
_accessor.setProperty(_keyBuilder.clusterConfig(), configuration);
Map<ConstraintType, ClusterConstraints> constraints = config.getConstraintMap();
for (ConstraintType type : constraints.keySet()) {
@@ -465,7 +466,8 @@ public class ClusterAccessor {
*/
public boolean addStat(final String statName) {
if (!isClusterStructureValid()) {
- throw new HelixException("cluster " + _clusterId + " is not setup yet");
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
}
String persistentStatsPath = _keyBuilder.persistantStat().getPath();
@@ -496,7 +498,8 @@ public class ClusterAccessor {
*/
public boolean dropStat(final String statName) {
if (!isClusterStructureValid()) {
- throw new HelixException("cluster " + _clusterId + " is not setup yet");
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
}
String persistentStatsPath = _keyBuilder.persistantStat().getPath();
@@ -528,7 +531,8 @@ public class ClusterAccessor {
*/
public boolean addAlert(final String alertName) {
if (!isClusterStructureValid()) {
- throw new HelixException("cluster " + _clusterId + " is not setup yet");
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
}
BaseDataAccessor<ZNRecord> baseAccessor = _accessor.getBaseDataAccessor();
@@ -564,7 +568,8 @@ public class ClusterAccessor {
*/
public boolean dropAlert(final String alertName) {
if (!isClusterStructureValid()) {
- throw new HelixException("cluster " + _clusterId + " is not setup yet");
+ LOG.error("cluster " + _clusterId + " is not setup yet");
+ return false;
}
String alertsPath = _keyBuilder.alerts().getPath();
@@ -597,16 +602,18 @@ public class ClusterAccessor {
/**
* pause controller of cluster
+ * @return true if cluster was paused, false if pause failed or already paused
*/
- public void pauseCluster() {
- _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
+ public boolean pauseCluster() {
+ return _accessor.createProperty(_keyBuilder.pause(), new PauseSignal("pause"));
}
/**
* resume controller of cluster
+ * @return true if resume succeeded, false otherwise
*/
- public void resumeCluster() {
- _accessor.removeProperty(_keyBuilder.pause());
+ public boolean resumeCluster() {
+ return _accessor.removeProperty(_keyBuilder.pause());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 4e7d3c2..ac8f79d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -86,34 +86,36 @@ public class ParticipantAccessor {
* enable/disable a participant
* @param participantId
* @param isEnabled
+ * @return true if enable state succeeded, false otherwise
*/
- void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+ boolean enableParticipant(ParticipantId participantId, boolean isEnabled) {
String participantName = participantId.stringify();
if (_accessor.getProperty(_keyBuilder.instanceConfig(participantName)) == null) {
LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
- return;
+ return false;
}
InstanceConfig config = new InstanceConfig(participantName);
config.setInstanceEnabled(isEnabled);
- _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
-
+ return _accessor.updateProperty(_keyBuilder.instanceConfig(participantName), config);
}
/**
* disable participant
* @param participantId
+ * @return true if disabled successfully, false otherwise
*/
- public void disableParticipant(ParticipantId participantId) {
- enableParticipant(participantId, false);
+ public boolean disableParticipant(ParticipantId participantId) {
+ return enableParticipant(participantId, false);
}
/**
* enable participant
* @param participantId
+ * @return true if enabled successfully, false otherwise
*/
- public void enableParticipant(ParticipantId participantId) {
- enableParticipant(participantId, true);
+ public boolean enableParticipant(ParticipantId participantId) {
+ return enableParticipant(participantId, true);
}
/**
@@ -173,8 +175,9 @@ public class ParticipantAccessor {
* @param participantId
* @param resourceId
* @param partitionIdSet
+ * @return true if enable state changed successfully, false otherwise
*/
- void enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
+ boolean enablePartitionsForParticipant(final boolean enabled, final ParticipantId participantId,
final ResourceId resourceId, final Set<PartitionId> partitionIdSet) {
String participantName = participantId.stringify();
String resourceName = resourceId.stringify();
@@ -183,7 +186,7 @@ public class ParticipantAccessor {
PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(participantName);
if (_accessor.getProperty(instanceConfigKey) == null) {
LOG.error("Config for participant: " + participantId + " does NOT exist in cluster");
- return;
+ return false;
}
// check resource exist. warn if not
@@ -211,7 +214,7 @@ public class ParticipantAccessor {
partitionNames.add(partitionId.stringify());
}
- baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
+ return baseAccessor.update(instanceConfigKey.getPath(), new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
if (currentData == null) {
@@ -245,10 +248,11 @@ public class ParticipantAccessor {
* @param participantId
* @param resourceId
* @param disablePartitionIdSet
+ * @return true if disabled successfully, false otherwise
*/
- public void disablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
- Set<PartitionId> disablePartitionIdSet) {
- enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
+ public boolean disablePartitionsForParticipant(ParticipantId participantId,
+ ResourceId resourceId, Set<PartitionId> disablePartitionIdSet) {
+ return enablePartitionsForParticipant(false, participantId, resourceId, disablePartitionIdSet);
}
/**
@@ -256,10 +260,11 @@ public class ParticipantAccessor {
* @param participantId
* @param resourceId
* @param enablePartitionIdSet
+ * @return true if enabled successfully, false otherwise
*/
- public void enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
+ public boolean enablePartitionsForParticipant(ParticipantId participantId, ResourceId resourceId,
Set<PartitionId> enablePartitionIdSet) {
- enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
+ return enablePartitionsForParticipant(true, participantId, resourceId, enablePartitionIdSet);
}
/**
@@ -600,10 +605,11 @@ public class ParticipantAccessor {
* @param resourceId resource id
* @param participantId participant id
* @param sessionId session id
+ * @return true if dropped, false otherwise
*/
- public void dropCurrentState(ResourceId resourceId, ParticipantId participantId,
+ public boolean dropCurrentState(ResourceId resourceId, ParticipantId participantId,
SessionId sessionId) {
- _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
+ return _accessor.removeProperty(_keyBuilder.currentState(participantId.stringify(),
sessionId.stringify(), resourceId.stringify()));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index 7041c5e..58b226d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -108,9 +108,10 @@ public class ResourceAccessor {
* save resource assignment
* @param resourceId
* @param resourceAssignment
+ * @return true if set, false otherwise
*/
- public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
- _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
+ public boolean setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ return _accessor.setProperty(_keyBuilder.resourceAssignment(resourceId.stringify()),
resourceAssignment);
}
@@ -128,9 +129,11 @@ public class ResourceAccessor {
* rebalancer configuration
* @param resourceId
* @param configuration
+ * @return true if set, false otherwise
*/
- private void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
- _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ private boolean setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ boolean status =
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
// also set an ideal state if the resource supports it
RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
IdealState idealState =
@@ -139,6 +142,7 @@ public class ResourceAccessor {
if (idealState != null) {
_accessor.setProperty(_keyBuilder.idealState(resourceId.stringify()), idealState);
}
+ return status;
}
/**
@@ -249,27 +253,29 @@ public class ResourceAccessor {
* Get a resource configuration, which may include user-defined configuration, as well as
* rebalancer configuration
* @param resourceId
- * @return configuration
+ * @return configuration or null
*/
- public void getConfiguration(ResourceId resourceId) {
- _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
+ public ResourceConfiguration getConfiguration(ResourceId resourceId) {
+ return _accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
}
/**
* set external view of a resource
* @param resourceId
* @param extView
+ * @return true if set, false otherwise
*/
- public void setExternalView(ResourceId resourceId, ExternalView extView) {
- _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
+ public boolean setExternalView(ResourceId resourceId, ExternalView extView) {
+ return _accessor.setProperty(_keyBuilder.externalView(resourceId.stringify()), extView);
}
/**
* drop external view of a resource
* @param resourceId
+ * @return true if dropped, false otherwise
*/
- public void dropExternalView(ResourceId resourceId) {
- _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
+ public boolean dropExternalView(ResourceId resourceId) {
+ return _accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
index 70614b3..4d3b459 100644
--- a/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
+++ b/helix-core/src/main/java/org/apache/helix/lock/zk/ZKHelixLock.java
@@ -35,6 +35,7 @@ import org.apache.zookeeper.ZooKeeper;
/**
* Locking scheme for Helix that uses the ZooKeeper exclusive lock implementation
* Please use the following lock order convention: Cluster, Participant, Resource, Partition
+ * WARNING: this is not a reentrant lock
*/
public class ZKHelixLock implements HelixLock {
private static final Logger LOG = Logger.getLogger(ZKHelixLock.class);
@@ -96,8 +97,11 @@ public class ZKHelixLock implements HelixLock {
return true;
}
try {
+ // create the root path if it doesn't exist
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
baseAccessor.create(_rootPath, null, AccessOption.PERSISTENT);
+
+ // try to acquire the lock
boolean acquired = _writeLock.lock();
if (acquired) {
_locked = true;
@@ -134,7 +138,7 @@ public class ZKHelixLock implements HelixLock {
}
@Override
- public synchronized boolean isBlocked() {
+ public boolean isBlocked() {
return _blocked;
}
@@ -142,52 +146,7 @@ public class ZKHelixLock implements HelixLock {
* Set if this the lock method is currently blocked
* @param isBlocked true if blocked, false otherwise
*/
- protected synchronized void setBlocked(boolean isBlocked) {
+ protected void setBlocked(boolean isBlocked) {
_blocked = isBlocked;
}
-
- public static void main(String[] args) {
- ZkClient zkClient = new ZkClient("localhost:2199");
- ClusterId clusterId = ClusterId.from("exampleCluster");
- final ZKHelixLock lock1 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient);
- final ZKHelixLock lock2 = new ZKHelixLock(clusterId, Scope.cluster(clusterId), zkClient);
- System.err.println("lock1 started");
- boolean result = lock1.lock();
- System.err.println("lock1 finished " + result);
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.err.println("unlock1 started");
- lock1.unlock();
- System.err.println("unlock1 finished");
- }
- }.start();
- final Thread t1 = new Thread() {
- @Override
- public void run() {
- System.err.println("lock2 started");
- boolean locked = lock2.lock();
- System.err.println("lock2 finished " + locked);
- }
- };
- t1.start();
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);
- System.err.println("interrupt2 start");
- t1.interrupt();
- System.err.println("interrupt2 finished");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }.start();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index d733a5c..1278ceb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -29,6 +29,10 @@ import org.apache.helix.api.id.ClusterId;
* Persisted configuration properties for a cluster
*/
public class ClusterConfiguration extends HelixProperty {
+ private enum Fields {
+ WRITE_ID
+ }
+
/**
* Instantiate for an id
* @param id cluster id
@@ -62,6 +66,22 @@ public class ClusterConfiguration extends HelixProperty {
}
/**
+ * Set the identifier of this configuration for the last write
+ * @param writeId positive random long identifier
+ */
+ public void setWriteId(long writeId) {
+ _record.setLongField(Fields.WRITE_ID.toString(), writeId);
+ }
+
+ /**
+ * Get the identifier for the last write to this configuration
+ * @return positive write identifier, or -1 of not set
+ */
+ public long getWriteId() {
+ return _record.getLongField(Fields.WRITE_ID.toString(), -1);
+ }
+
+ /**
* Create a new ClusterConfiguration from a UserConfig
* @param userConfig user-defined configuration properties
* @return ClusterConfiguration
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/bdb7a4d3/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
index 8dbb43f..4087796 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -160,13 +160,15 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
}
@Override
- public synchronized boolean lock() {
+ public boolean lock() {
// synchronize here to ensure atomic set and so that the first lock is the first one who
// gets to lock
- if (_firstLock == null) {
- _firstLock = this;
+ synchronized (LockProvider.this) {
+ if (_firstLock == null) {
+ _firstLock = this;
+ }
+ return super.lock();
}
- return super.lock();
}
@Override
@@ -186,7 +188,7 @@ public class TestAtomicAccessors extends ZkUnitTestBase {
}
@Override
- protected synchronized void setBlocked(boolean isBlocked) {
+ protected void setBlocked(boolean isBlocked) {
if (isBlocked) {
synchronized (_hasSecondBlocked) {
_hasSecondBlocked.set(true);