You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/09 01:58:26 UTC

git commit: [HELIX-268] Atomic API - added functionality

Updated Branches:
  refs/heads/helix-logical-model b40608916 -> fd78c678f


[HELIX-268] Atomic API - added functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/fd78c678
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/fd78c678
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/fd78c678

Branch: refs/heads/helix-logical-model
Commit: fd78c678f932f3e6a5939b0aa634a18942d723cf
Parents: b406089
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Oct 8 16:57:56 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Oct 8 16:57:56 2013 -0700

----------------------------------------------------------------------
 .../api/accessor/AtomicClusterAccessor.java     | 22 ++++++++
 .../helix/api/accessor/ClusterAccessor.java     | 34 ++++++------
 .../helix/api/accessor/ParticipantAccessor.java | 57 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index a2af79b..9881e2d 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -19,12 +19,14 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
 import org.apache.helix.api.Resource;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ClusterConfig;
@@ -219,4 +221,24 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     }
     return resources;
   }
+
+  /**
+   * Read participants atomically. This is participant-atomic, not cluster-atomic
+   */
+  @Override
+  public Map<ParticipantId, Participant> readParticipants() {
+    // read participants individually to keep configs consistent with current state and messages
+    Map<ParticipantId, Participant> participants = Maps.newHashMap();
+    ParticipantAccessor accessor =
+        new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+    List<String> participantNames = _accessor.getChildNames(_keyBuilder.instanceConfigs());
+    for (String participantName : participantNames) {
+      ParticipantId participantId = ParticipantId.from(participantName);
+      Participant participant = accessor.readParticipant(participantId);
+      if (participant != null) {
+        participants.put(participantId, participant);
+      }
+    }
+    return participants;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index bec7308..ba321cf 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -72,6 +71,7 @@ import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 import org.testng.internal.annotations.Sets;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class ClusterAccessor {
@@ -688,7 +688,7 @@ public class ClusterAccessor {
    * @return true if valid or false otherwise
    */
   public boolean isClusterStructureValid() {
-    List<String> paths = getRequiredPaths(_clusterId);
+    List<String> paths = getRequiredPaths(_keyBuilder);
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
@@ -706,7 +706,7 @@ public class ClusterAccessor {
    */
   public void initClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = getRequiredPaths(_clusterId);
+    List<String> paths = getRequiredPaths(_keyBuilder);
     for (String path : paths) {
       boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
       if (!status && LOG.isDebugEnabled()) {
@@ -718,20 +718,19 @@ public class ClusterAccessor {
   /**
    * Remove all but the top level cluster node; intended for reconstructing the cluster
    */
-  private void clearClusterStructure() {
+  void clearClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
-    List<String> paths = getRequiredPaths(_clusterId);
+    List<String> paths = getRequiredPaths(_keyBuilder);
     baseAccessor.remove(paths, 0);
   }
 
   /**
    * Get all property paths that must be set for a cluster structure to be valid
-   * @param the cluster that the paths will be relative to
+   * @param keyBuilder a PropertyKey.Builder for the cluster
    * @return list of paths as strings
    */
-  private static List<String> getRequiredPaths(ClusterId clusterId) {
-    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterId.stringify());
-    List<String> paths = new ArrayList<String>();
+  static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+    List<String> paths = Lists.newArrayList();
     paths.add(keyBuilder.clusterConfigs().getPath());
     paths.add(keyBuilder.instanceConfigs().getPath());
     paths.add(keyBuilder.propertyStore().getPath());
@@ -762,22 +761,19 @@ public class ClusterAccessor {
       return false;
     }
 
+    ParticipantAccessor participantAccessor = new ParticipantAccessor(_accessor);
     ParticipantId participantId = participant.getId();
-    if (_accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify())) != null) {
+    InstanceConfig existConfig =
+        _accessor.getProperty(_keyBuilder.instanceConfig(participantId.stringify()));
+    if (existConfig != null && participantAccessor.isParticipantStructureValid(participantId)) {
       LOG.error("Config for participant: " + participantId + " already exists in cluster: "
           + _clusterId);
       return false;
     }
 
-    // add empty root ZNodes
-    List<PropertyKey> createKeys = new ArrayList<PropertyKey>();
-    createKeys.add(_keyBuilder.messages(participantId.stringify()));
-    createKeys.add(_keyBuilder.currentStates(participantId.stringify()));
-    createKeys.add(_keyBuilder.participantErrors(participantId.stringify()));
-    createKeys.add(_keyBuilder.statusUpdates(participantId.stringify()));
-    for (PropertyKey key : createKeys) {
-      _accessor.createProperty(key, null);
-    }
+    // clear and rebuild the participant structure
+    participantAccessor.clearParticipantStructure(participantId);
+    participantAccessor.initParticipantStructure(participantId);
 
     // add the config
     InstanceConfig instanceConfig = new InstanceConfig(participant.getId());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fd78c678/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index 50945aa..dd6c77b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -701,6 +701,63 @@ public class ParticipantAccessor {
   }
 
   /**
+   * Create empty persistent properties to ensure that there is a valid participant structure
+   */
+  public void initParticipantStructure(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    for (String path : paths) {
+      boolean status = baseAccessor.create(path, null, AccessOption.PERSISTENT);
+      if (!status && LOG.isDebugEnabled()) {
+        LOG.debug(path + " already exists");
+      }
+    }
+  }
+
+  /**
+   * Clear properties for the participant
+   */
+  public void clearParticipantStructure(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    baseAccessor.remove(paths, 0);
+  }
+
+  /**
+   * check if participant structure is valid
+   * @return true if valid or false otherwise
+   */
+  public boolean isParticipantStructureValid(ParticipantId participantId) {
+    List<String> paths = getRequiredPaths(_keyBuilder, participantId);
+    BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
+    if (baseAccessor != null) {
+      boolean[] existsResults = baseAccessor.exists(paths, 0);
+      for (boolean exists : existsResults) {
+        if (!exists) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Get the paths that should be created if the participant exists
+   * @param keyBuilder PropertyKey.Builder for the cluster
+   * @param participantId the participant for which to generate paths
+   * @return list of required paths as strings
+   */
+  static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder, ParticipantId participantId) {
+    List<String> paths = Lists.newArrayList();
+    paths.add(keyBuilder.instanceConfig(participantId.stringify()).getPath());
+    paths.add(keyBuilder.messages(participantId.stringify()).getPath());
+    paths.add(keyBuilder.currentStates(participantId.stringify()).getPath());
+    paths.add(keyBuilder.participantErrors(participantId.stringify()).getPath());
+    paths.add(keyBuilder.statusUpdates(participantId.stringify()).getPath());
+    return paths;
+  }
+
+  /**
    * Get a ResourceAccessor instance
    * @return ResourceAccessor
    */