You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/09 01:58:26 UTC
git commit: [HELIX-268] Atomic API - added functionality
Updated Branches:
refs/heads/helix-logical-model b40608916 -> fd78c678f
[HELIX-268] Atomic API - added functionality
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/fd78c678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/fd78c678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/fd78c678
Branch: refs/heads/helix-logical-model
Commit: fd78c678f932f3e6a5939b0aa634a18942d723cf
Parents: b406089
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 8 16:57:56 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 8 16:57:56 2013 -0700
----------------------------------------------------------------------
.../api/accessor/AtomicClusterAccessor.java | 22 ++++++++
.../helix/api/accessor/ClusterAccessor.java | 34 ++++++------
.../helix/api/accessor/ParticipantAccessor.java | 57 ++++++++++++++++++++
3 files changed, 94 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index a2af79b..9881e2d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -19,12 +19,14 @@ package org.apache.helix.api.accessor;
* under the License.
*/
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
import org.apache.helix.api.Resource;
import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ClusterConfig;
@@ -219,4 +221,24 @@ public class AtomicClusterAccessor extends ClusterAccessor {
}
return resources;
}
+
+ /**
+ * Read participants atomically. This is participant-atomic, not cluster-atomic
+ */
+ @Override
+ public Map<ParticipantId, Participant> readParticipants() {
+ // read participants individually to keep configs consistent with current state and messages
+ Map<ParticipantId, Participant> participants = Maps.newHashMap();
+ ParticipantAccessor accessor =
+ new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+ List<String> participantNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+ for (String participantName : participantNames) {
+ ParticipantId participantId = ParticipantId.from(participantName);
+ Participant participant = accessor.readParticipant(participantId);
+ if (participant != null) {
+ participants.put(participantId, participant);
+ }
+ }
+ return participants;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index bec7308..ba321cf 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.api.accessor;
* under the License.
*/
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -72,6 +71,7 @@ import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
import org.testng.internal.annotations.Sets;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class ClusterAccessor {
@@ -688,7 +688,7 @@ public class ClusterAccessor {
* @return true if valid or false otherwise
*/
public boolean isClusterStructureValid() {
- List<String> paths = getRequiredPaths(_clusterId);
+ List<String> paths = getRequiredPaths(_keyBuilder);
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
if (baseAccessor != null) {
boolean[] existsResults = baseAccessor.exists(paths, 0);
@@ -706,7 +706,7 @@ public class ClusterAccessor {
*/
public void initClusterStructure() {
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
- List<String> paths = getRequiredPaths(_clusterId);
+ List<String> paths = getRequiredPaths(_keyBuilder);
for (String path : paths) {
boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
if (!status && LOG.isDebugEnabled()) {
@@ -718,20 +718,19 @@ public class ClusterAccessor {
/**
* Remove all but the top level cluster node; intended for reconstructing the cluster
*/
- private void clearClusterStructure() {
+ void clearClusterStructure() {
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
- List<String> paths = getRequiredPaths(_clusterId);
+ List<String> paths = getRequiredPaths(_keyBuilder);
baseAccessor.remove(paths, 0);
}
/**
* Get all property paths that must be set for a cluster structure to be valid
- * @param the cluster that the paths will be relative to
+ * @param keyBuilder a PropertyKey.Builder for the cluster
* @return list of paths as strings
*/
- private static List<String> getRequiredPaths(ClusterId clusterId) {
- PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
- List<String> paths = new ArrayList<String>();
+ static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+ List<String> paths = Lists.newArrayList();
paths.add(keyBuilder.clusterConfigs().getPath());
paths.add(keyBuilder.instanceConfigs().getPath());
paths.add(keyBuilder.propertyStore().getPath());
@@ -762,22 +761,19 @@ public class ClusterAccessor {
return false;
}
+ ParticipantAccessor participantAccessor = new ParticipantAccessor(_accessor);
ParticipantId participantId = participant.getId();
- if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+ InstanceConfig existConfig =
+ _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+ if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) {
LOG.error("Config for participant: " + participantId + " already exists in cluster: "
+ _clusterId);
return false;
}
- // add empty root ZNodes
- List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
- createKeys.add(_keyBuilder.messages(participantId.stringify()));
- createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
- createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
- createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
- for (PropertyKey key : createKeys) {
- _accessor.createProperty(key, null);
- }
+ // clear and rebuild the participant structure
+ participantAccessor.clearParticipantStructure(participantId);
+ participantAccessor.initParticipantStructure(participantId);
// add the config
InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 50945aa..dd6c77b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -701,6 +701,63 @@ public class ParticipantAccessor {
}
/**
+ * Create empty persistent properties to ensure that there is a valid participant structure
+ */
+ public void initParticipantStructure(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ for (String path : paths) {
+ boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+ if (!status && LOG.isDebugEnabled()) {
+ LOG.debug(path + " already exists");
+ }
+ }
+ }
+
+ /**
+ * Clear properties for the participant
+ */
+ public void clearParticipantStructure(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ baseAccessor.remove(paths, 0);
+ }
+
+ /**
+ * check if participant structure is valid
+ * @return true if valid or false otherwise
+ */
+ public boolean isParticipantStructureValid(ParticipantId participantId) {
+ List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+ BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+ if (baseAccessor != null) {
+ boolean[] existsResults = baseAccessor.exists(paths, 0);
+ for (boolean exists : existsResults) {
+ if (!exists) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the paths that should be created if the participant exists
+ * @param keyBuilder PropertyKey.Builder for the cluster
+ * @param participantId the participant for which to generate paths
+ * @return list of required paths as strings
+ */
+ static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder, ParticipantId participantId) {
+ List<String> paths = Lists.newArrayList();
+ paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
+ paths.add(keyBuilder.messages(participantId.stringify()).getPath());
+ paths.add(keyBuilder.currentStates(participantId.stringify()).getPath());
+ paths.add(keyBuilder.participantErrors(participantId.stringify()).getPath());
+ paths.add(keyBuilder.statusUpdates(participantId.stringify()).getPath());
+ return paths;
+ }
+
+ /**
* Get a ResourceAccessor instance
* @return ResourceAccessor
*/