You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/09 19:46:54 UTC
git commit: [HELIX-268] Increase understandability of atomic API code
Updated Branches:
refs/heads/helix-logical-model fd78c678f -> 558b42c61
[HELIX-268] Increase understandability of atomic API code
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/558b42c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/558b42c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/558b42c6
Branch: refs/heads/helix-logical-model
Commit: 558b42c61087861e8ce93f862d288bba43e1b228
Parents: fd78c67
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Oct 9 10:46:09 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Oct 9 10:46:09 2013 -0700
----------------------------------------------------------------------
.../api/accessor/AtomicClusterAccessor.java | 42 ++++---
.../api/accessor/AtomicParticipantAccessor.java | 118 ++++++++-----------
.../api/accessor/AtomicResourceAccessor.java | 37 ++++--
.../helix/api/accessor/ClusterAccessor.java | 6 +-
.../helix/api/accessor/ParticipantAccessor.java | 4 +-
.../helix/api/accessor/ResourceAccessor.java | 12 +-
6 files changed, 113 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 9881e2d..d17b2af 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -57,6 +57,11 @@ public class AtomicClusterAccessor extends ClusterAccessor {
private final ClusterId _clusterId;
/**
+ * Non-atomic instance to protect against recursive locking via polymorphism
+ */
+ private final ClusterAccessor _clusterAccessor;
+
+ /**
* Instantiate the accessor
* @param clusterId the cluster to access
* @param accessor a HelixDataAccessor for the physical properties
@@ -69,6 +74,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
_accessor = accessor;
_keyBuilder = accessor.keyBuilder();
_clusterId = clusterId;
+ _clusterAccessor = new ClusterAccessor(clusterId, accessor);
}
@Override
@@ -77,7 +83,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.createCluster(cluster);
+ return _clusterAccessor.createCluster(cluster);
} finally {
lock.unlock();
}
@@ -91,7 +97,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.dropCluster();
+ return _clusterAccessor.dropCluster();
} finally {
lock.unlock();
}
@@ -105,7 +111,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.readCluster();
+ return _clusterAccessor.readCluster();
} finally {
lock.unlock();
}
@@ -123,7 +129,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.addParticipantToCluster(participant);
+ return _clusterAccessor.addParticipantToCluster(participant);
} finally {
lock.unlock();
}
@@ -137,7 +143,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.dropParticipantFromCluster(participantId);
+ return _clusterAccessor.dropParticipantFromCluster(participantId);
} finally {
lock.unlock();
}
@@ -155,7 +161,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.addResourceToCluster(resource);
+ return _clusterAccessor.addResourceToCluster(resource);
} finally {
lock.unlock();
}
@@ -169,7 +175,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.dropResourceFromCluster(resourceId);
+ return _clusterAccessor.dropResourceFromCluster(resourceId);
} finally {
lock.unlock();
}
@@ -183,14 +189,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- Cluster cluster = super.readCluster();
- if (cluster == null) {
- LOG.error("Cluster does not exist, cannot be updated");
- return null;
- }
- ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
- boolean status = super.setBasicClusterConfig(config);
- return status ? config : null;
+ return _clusterAccessor.updateCluster(clusterDelta);
} finally {
lock.unlock();
}
@@ -241,4 +240,17 @@ public class AtomicClusterAccessor extends ClusterAccessor {
}
return participants;
}
+
+ @Override
+ public void initClusterStructure() {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ _clusterAccessor.initClusterStructure();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index 8482208..fd05b48 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -19,23 +19,18 @@ package org.apache.helix.api.accessor;
* under the License.
*/
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
import org.apache.helix.lock.HelixLock;
import org.apache.helix.lock.HelixLockable;
-import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.log4j.Logger;
@@ -50,16 +45,39 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
private final ClusterId _clusterId;
private final HelixDataAccessor _accessor;
- private final PropertyKey.Builder _keyBuilder;
private final HelixLockable _lockProvider;
+ /**
+ * Non-atomic instance to protect against recursive locking via polymorphism
+ */
+ private final ParticipantAccessor _participantAccessor;
+
+ /**
+ * Instantiate the accessor
+ * @param clusterId the cluster to access
+ * @param accessor a HelixDataAccessor for the physical properties
+ * @param lockProvider a lock provider
+ */
public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
HelixLockable lockProvider) {
super(accessor);
_clusterId = clusterId;
_accessor = accessor;
- _keyBuilder = accessor.keyBuilder();
_lockProvider = lockProvider;
+ _participantAccessor = new ParticipantAccessor(accessor);
+ }
+
+ @Override
+ void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ _participantAccessor.enableParticipant(participantId);
+ } finally {
+ lock.unlock();
+ }
+ }
}
@Override
@@ -68,7 +86,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.readParticipant(participantId);
+ return _participantAccessor.readParticipant(participantId);
} finally {
lock.unlock();
}
@@ -87,7 +105,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.setParticipant(participantConfig);
+ return _participantAccessor.setParticipant(participantConfig);
} finally {
lock.unlock();
}
@@ -102,14 +120,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- Participant participant = super.readParticipant(participantId);
- if (participant == null) {
- LOG.error("Participant " + participantId + " does not exist, cannot be updated");
- return null;
- }
- ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
- super.setParticipant(config);
- return config;
+ return _participantAccessor.updateParticipant(participantId, participantDelta);
} finally {
lock.unlock();
}
@@ -117,64 +128,13 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
return null;
}
- /**
- * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is
- * participant-local and resource-local.
- */
- @Override
- public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
- Participant oldParticipant = readParticipant(oldParticipantId);
- if (oldParticipant == null) {
- LOG.error("Could not swap participants because the old participant does not exist");
- return false;
- }
- if (oldParticipant.isEnabled()) {
- LOG.error("Could not swap participants because the old participant is still enabled");
- return false;
- }
- if (oldParticipant.isAlive()) {
- LOG.error("Could not swap participants because the old participant is still live");
- return false;
- }
- Participant newParticipant = readParticipant(newParticipantId);
- if (newParticipant == null) {
- LOG.error("Could not swap participants because the new participant does not exist");
- return false;
- }
- dropParticipant(oldParticipantId);
- ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
- List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates());
- for (String resourceName : idealStates) {
- HelixLock lock =
- _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName)));
- boolean locked = lock.lock();
- if (locked) {
- try {
- // lock the resource for all ideal state reads and updates
- IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
- if (idealState != null) {
- swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
- PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
- resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
- _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
- }
- } finally {
- lock.unlock();
- }
- } else {
- return false;
- }
- }
- return true;
- }
-
@Override
boolean dropParticipant(ParticipantId participantId) {
HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
boolean locked = lock.lock();
if (locked) {
try {
- return super.dropParticipant(participantId);
+ return _participantAccessor.dropParticipant(participantId);
} finally {
lock.unlock();
}
@@ -189,7 +149,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- super.insertMessagesToParticipant(participantId, msgMap);
+ _participantAccessor.insertMessagesToParticipant(participantId, msgMap);
} finally {
lock.unlock();
}
@@ -203,7 +163,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- super.updateMessageStatus(participantId, msgMap);
+ _participantAccessor.updateMessageStatus(participantId, msgMap);
} finally {
lock.unlock();
}
@@ -217,7 +177,21 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- super.deleteMessagesFromParticipant(participantId, msgIdSet);
+ _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet);
+ } finally {
+ lock.unlock();
+ }
+ }
+ return;
+ }
+
+ @Override
+ public void initParticipantStructure(ParticipantId participantId) {
+ HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+ boolean locked = lock.lock();
+ if (locked) {
+ try {
+ _participantAccessor.initParticipantStructure(participantId);
} finally {
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 4ff50d7..95c0b05 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -40,13 +40,27 @@ public class AtomicResourceAccessor extends ResourceAccessor {
private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
private final ClusterId _clusterId;
+ private final HelixDataAccessor _accessor;
private final HelixLockable _lockProvider;
+ /**
+ * Non-atomic instance to protect against recursive locking via polymorphism
+ */
+ private final ResourceAccessor _resourceAccessor;
+
+ /**
+ * Instantiate the accessor
+ * @param clusterId the cluster to access
+ * @param accessor a HelixDataAccessor for the physical properties
+ * @param lockProvider a lock provider
+ */
public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
HelixLockable lockProvider) {
super(accessor);
_clusterId = clusterId;
+ _accessor = accessor;
_lockProvider = lockProvider;
+ _resourceAccessor = new ResourceAccessor(accessor);
}
@Override
@@ -55,7 +69,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.readResource(resourceId);
+ return _resourceAccessor.readResource(resourceId);
} finally {
lock.unlock();
}
@@ -69,14 +83,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- Resource resource = super.readResource(resourceId);
- if (resource == null) {
- LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
- return null;
- }
- ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
- super.setResource(config);
- return config;
+ return _resourceAccessor.updateResource(resourceId, resourceDelta);
} finally {
lock.unlock();
}
@@ -90,7 +97,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.setRebalancerContext(resourceId, context);
+ return _resourceAccessor.setRebalancerContext(resourceId, context);
} finally {
lock.unlock();
}
@@ -108,7 +115,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.setResource(resourceConfig);
+ return _resourceAccessor.setResource(resourceConfig);
} finally {
lock.unlock();
}
@@ -123,11 +130,17 @@ public class AtomicResourceAccessor extends ResourceAccessor {
boolean locked = lock.lock();
if (locked) {
try {
- return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag);
+ return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount,
+ participantGroupTag);
} finally {
lock.unlock();
}
}
return false;
}
+
+ @Override
+ protected ParticipantAccessor participantAccessor() {
+ return new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ba321cf..f283f74 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -157,7 +157,7 @@ public class ClusterAccessor {
* @param config ClusterConfig
* @return true if correctly set, false otherwise
*/
- protected boolean setBasicClusterConfig(ClusterConfig config) {
+ private boolean setBasicClusterConfig(ClusterConfig config) {
if (config == null) {
return false;
}
@@ -718,7 +718,7 @@ public class ClusterAccessor {
/**
* Remove all but the top level cluster node; intended for reconstructing the cluster
*/
- void clearClusterStructure() {
+ private void clearClusterStructure() {
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
List<String> paths = getRequiredPaths(_keyBuilder);
baseAccessor.remove(paths, 0);
@@ -729,7 +729,7 @@ public class ClusterAccessor {
* @param keyBuilder a PropertyKey.Builder for the cluster
* @return list of paths as strings
*/
- static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+ private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
List<String> paths = Lists.newArrayList();
paths.add(keyBuilder.clusterConfigs().getPath());
paths.add(keyBuilder.instanceConfigs().getPath());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index dd6c77b..90fd986 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -655,7 +655,7 @@ public class ParticipantAccessor {
return false;
}
dropParticipant(oldParticipantId);
- ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+ ResourceAccessor resourceAccessor = resourceAccessor();
Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
@@ -717,7 +717,7 @@ public class ParticipantAccessor {
/**
* Clear properties for the participant
*/
- public void clearParticipantStructure(ParticipantId participantId) {
+ void clearParticipantStructure(ParticipantId participantId) {
List<String> paths = getRequiredPaths(_keyBuilder, participantId);
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
baseAccessor.remove(paths, 0);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index c65cb44..7041c5e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -129,7 +129,7 @@ public class ResourceAccessor {
* @param resourceId
* @param configuration
*/
- void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+ private void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
_accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
// also set an ideal state if the resource supports it
RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
@@ -278,7 +278,7 @@ public class ResourceAccessor {
* @return true if they were reset, false otherwise
*/
public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
- ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+ ParticipantAccessor accessor = participantAccessor();
List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
for (ExternalView extView : extViews) {
if (!resetResourceIdSet.contains(extView.getResourceId())) {
@@ -436,4 +436,12 @@ public class ResourceAccessor {
return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
rebalancerContext, userConfig, bucketSize, batchMessageMode);
}
+
+ /**
+ * Get a ParticipantAccessor instance
+ * @return ParticipantAccessor
+ */
+ protected ParticipantAccessor participantAccessor() {
+ return new ParticipantAccessor(_accessor);
+ }
}