You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/10/09 19:46:54 UTC

git commit: [HELIX-268] Increase understandability of atomic API code

Updated Branches:
  refs/heads/helix-logical-model fd78c678f -> 558b42c61


[HELIX-268] Increase understandability of atomic API code


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/558b42c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/558b42c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/558b42c6

Branch: refs/heads/helix-logical-model
Commit: 558b42c61087861e8ce93f862d288bba43e1b228
Parents: fd78c67
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Oct 9 10:46:09 2013 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Oct 9 10:46:09 2013 -0700

----------------------------------------------------------------------
 .../api/accessor/AtomicClusterAccessor.java     |  42 ++++---
 .../api/accessor/AtomicParticipantAccessor.java | 118 ++++++++-----------
 .../api/accessor/AtomicResourceAccessor.java    |  37 ++++--
 .../helix/api/accessor/ClusterAccessor.java     |   6 +-
 .../helix/api/accessor/ParticipantAccessor.java |   4 +-
 .../helix/api/accessor/ResourceAccessor.java    |  12 +-
 6 files changed, 113 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
index 9881e2d..d17b2af 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicClusterAccessor.java
@@ -57,6 +57,11 @@ public class AtomicClusterAccessor extends ClusterAccessor {
   private final ClusterId _clusterId;
 
   /**
+   * Non-atomic instance to protect against recursive locking via polymorphism
+   */
+  private final ClusterAccessor _clusterAccessor;
+
+  /**
    * Instantiate the accessor
    * @param clusterId the cluster to access
    * @param accessor a HelixDataAccessor for the physical properties
@@ -69,6 +74,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     _accessor = accessor;
     _keyBuilder = accessor.keyBuilder();
     _clusterId = clusterId;
+    _clusterAccessor = new ClusterAccessor(clusterId, accessor);
   }
 
   @Override
@@ -77,7 +83,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.createCluster(cluster);
+        return _clusterAccessor.createCluster(cluster);
       } finally {
         lock.unlock();
       }
@@ -91,7 +97,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.dropCluster();
+        return _clusterAccessor.dropCluster();
       } finally {
         lock.unlock();
       }
@@ -105,7 +111,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.readCluster();
+        return _clusterAccessor.readCluster();
       } finally {
         lock.unlock();
       }
@@ -123,7 +129,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.addParticipantToCluster(participant);
+        return _clusterAccessor.addParticipantToCluster(participant);
       } finally {
         lock.unlock();
       }
@@ -137,7 +143,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.dropParticipantFromCluster(participantId);
+        return _clusterAccessor.dropParticipantFromCluster(participantId);
       } finally {
         lock.unlock();
       }
@@ -155,7 +161,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.addResourceToCluster(resource);
+        return _clusterAccessor.addResourceToCluster(resource);
       } finally {
         lock.unlock();
       }
@@ -169,7 +175,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.dropResourceFromCluster(resourceId);
+        return _clusterAccessor.dropResourceFromCluster(resourceId);
       } finally {
         lock.unlock();
       }
@@ -183,14 +189,7 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        Cluster cluster = super.readCluster();
-        if (cluster == null) {
-          LOG.error("Cluster does not exist, cannot be updated");
-          return null;
-        }
-        ClusterConfig config = clusterDelta.mergeInto(cluster.getConfig());
-        boolean status = super.setBasicClusterConfig(config);
-        return status ? config : null;
+        return _clusterAccessor.updateCluster(clusterDelta);
       } finally {
         lock.unlock();
       }
@@ -241,4 +240,17 @@ public class AtomicClusterAccessor extends ClusterAccessor {
     }
     return participants;
   }
+
+  @Override
+  public void initClusterStructure() {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.cluster(_clusterId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _clusterAccessor.initClusterStructure();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
index 8482208..fd05b48 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicParticipantAccessor.java
@@ -19,23 +19,18 @@ package org.apache.helix.api.accessor;
  * under the License.
  */
 
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.PropertyKey;
 import org.apache.helix.api.Participant;
 import org.apache.helix.api.Scope;
 import org.apache.helix.api.config.ParticipantConfig;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.MessageId;
 import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
 import org.apache.helix.lock.HelixLock;
 import org.apache.helix.lock.HelixLockable;
-import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.log4j.Logger;
 
@@ -50,16 +45,39 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
 
   private final ClusterId _clusterId;
   private final HelixDataAccessor _accessor;
-  private final PropertyKey.Builder _keyBuilder;
   private final HelixLockable _lockProvider;
 
+  /**
+   * Non-atomic instance to protect against recursive locking via polymorphism
+   */
+  private final ParticipantAccessor _participantAccessor;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
   public AtomicParticipantAccessor(ClusterId clusterId, HelixDataAccessor accessor,
       HelixLockable lockProvider) {
     super(accessor);
     _clusterId = clusterId;
     _accessor = accessor;
-    _keyBuilder = accessor.keyBuilder();
     _lockProvider = lockProvider;
+    _participantAccessor = new ParticipantAccessor(accessor);
+  }
+
+  @Override
+  void enableParticipant(ParticipantId participantId, boolean isEnabled) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.enableParticipant(participantId);
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   @Override
@@ -68,7 +86,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.readParticipant(participantId);
+        return _participantAccessor.readParticipant(participantId);
       } finally {
         lock.unlock();
       }
@@ -87,7 +105,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.setParticipant(participantConfig);
+        return _participantAccessor.setParticipant(participantConfig);
       } finally {
         lock.unlock();
       }
@@ -102,14 +120,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        Participant participant = super.readParticipant(participantId);
-        if (participant == null) {
-          LOG.error("Participant " + participantId + " does not exist, cannot be updated");
-          return null;
-        }
-        ParticipantConfig config = participantDelta.mergeInto(participant.getConfig());
-        super.setParticipant(config);
-        return config;
+        return _participantAccessor.updateParticipant(participantId, participantDelta);
       } finally {
         lock.unlock();
       }
@@ -117,64 +128,13 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     return null;
   }
 
-  /**
-   * Swap a new participant in to serve the replicas of an old (dead) one. The atomicity scope is
-   * participant-local and resource-local.
-   */
-  @Override
-  public boolean swapParticipants(ParticipantId oldParticipantId, ParticipantId newParticipantId) {
-    Participant oldParticipant = readParticipant(oldParticipantId);
-    if (oldParticipant == null) {
-      LOG.error("Could not swap participants because the old participant does not exist");
-      return false;
-    }
-    if (oldParticipant.isEnabled()) {
-      LOG.error("Could not swap participants because the old participant is still enabled");
-      return false;
-    }
-    if (oldParticipant.isAlive()) {
-      LOG.error("Could not swap participants because the old participant is still live");
-      return false;
-    }
-    Participant newParticipant = readParticipant(newParticipantId);
-    if (newParticipant == null) {
-      LOG.error("Could not swap participants because the new participant does not exist");
-      return false;
-    }
-    dropParticipant(oldParticipantId);
-    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
-    List<String> idealStates = _accessor.getChildNames(_keyBuilder.idealStates());
-    for (String resourceName : idealStates) {
-      HelixLock lock =
-          _lockProvider.getLock(_clusterId, Scope.resource(ResourceId.from(resourceName)));
-      boolean locked = lock.lock();
-      if (locked) {
-        try {
-          // lock the resource for all ideal state reads and updates
-          IdealState idealState = _accessor.getProperty(_keyBuilder.idealState(resourceName));
-          if (idealState != null) {
-            swapParticipantsInIdealState(idealState, oldParticipantId, newParticipantId);
-            PartitionedRebalancerContext context = PartitionedRebalancerContext.from(idealState);
-            resourceAccessor.setRebalancerContext(ResourceId.from(resourceName), context);
-            _accessor.setProperty(_keyBuilder.idealState(resourceName), idealState);
-          }
-        } finally {
-          lock.unlock();
-        }
-      } else {
-        return false;
-      }
-    }
-    return true;
-  }
-
   @Override
   boolean dropParticipant(ParticipantId participantId) {
     HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.dropParticipant(participantId);
+        return _participantAccessor.dropParticipant(participantId);
       } finally {
         lock.unlock();
       }
@@ -189,7 +149,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        super.insertMessagesToParticipant(participantId, msgMap);
+        _participantAccessor.insertMessagesToParticipant(participantId, msgMap);
       } finally {
         lock.unlock();
       }
@@ -203,7 +163,7 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        super.updateMessageStatus(participantId, msgMap);
+        _participantAccessor.updateMessageStatus(participantId, msgMap);
       } finally {
         lock.unlock();
       }
@@ -217,7 +177,21 @@ public class AtomicParticipantAccessor extends ParticipantAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        super.deleteMessagesFromParticipant(participantId, msgIdSet);
+        _participantAccessor.deleteMessagesFromParticipant(participantId, msgIdSet);
+      } finally {
+        lock.unlock();
+      }
+    }
+    return;
+  }
+
+  @Override
+  public void initParticipantStructure(ParticipantId participantId) {
+    HelixLock lock = _lockProvider.getLock(_clusterId, Scope.participant(participantId));
+    boolean locked = lock.lock();
+    if (locked) {
+      try {
+        _participantAccessor.initParticipantStructure(participantId);
       } finally {
         lock.unlock();
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
index 4ff50d7..95c0b05 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/AtomicResourceAccessor.java
@@ -40,13 +40,27 @@ public class AtomicResourceAccessor extends ResourceAccessor {
   private static final Logger LOG = Logger.getLogger(AtomicResourceAccessor.class);
 
   private final ClusterId _clusterId;
+  private final HelixDataAccessor _accessor;
   private final HelixLockable _lockProvider;
 
+  /**
+   * Non-atomic instance to protect against recursive locking via polymorphism
+   */
+  private final ResourceAccessor _resourceAccessor;
+
+  /**
+   * Instantiate the accessor
+   * @param clusterId the cluster to access
+   * @param accessor a HelixDataAccessor for the physical properties
+   * @param lockProvider a lock provider
+   */
   public AtomicResourceAccessor(ClusterId clusterId, HelixDataAccessor accessor,
       HelixLockable lockProvider) {
     super(accessor);
     _clusterId = clusterId;
+    _accessor = accessor;
     _lockProvider = lockProvider;
+    _resourceAccessor = new ResourceAccessor(accessor);
   }
 
   @Override
@@ -55,7 +69,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.readResource(resourceId);
+        return _resourceAccessor.readResource(resourceId);
       } finally {
         lock.unlock();
       }
@@ -69,14 +83,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        Resource resource = super.readResource(resourceId);
-        if (resource == null) {
-          LOG.error("Resource " + resourceId + " does not exist, cannot be updated");
-          return null;
-        }
-        ResourceConfig config = resourceDelta.mergeInto(resource.getConfig());
-        super.setResource(config);
-        return config;
+        return _resourceAccessor.updateResource(resourceId, resourceDelta);
       } finally {
         lock.unlock();
       }
@@ -90,7 +97,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.setRebalancerContext(resourceId, context);
+        return _resourceAccessor.setRebalancerContext(resourceId, context);
       } finally {
         lock.unlock();
       }
@@ -108,7 +115,7 @@ public class AtomicResourceAccessor extends ResourceAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.setResource(resourceConfig);
+        return _resourceAccessor.setResource(resourceConfig);
       } finally {
         lock.unlock();
       }
@@ -123,11 +130,17 @@ public class AtomicResourceAccessor extends ResourceAccessor {
     boolean locked = lock.lock();
     if (locked) {
       try {
-        return super.generateDefaultAssignment(resourceId, replicaCount, participantGroupTag);
+        return _resourceAccessor.generateDefaultAssignment(resourceId, replicaCount,
+            participantGroupTag);
       } finally {
         lock.unlock();
       }
     }
     return false;
   }
+
+  @Override
+  protected ParticipantAccessor participantAccessor() {
+    return new AtomicParticipantAccessor(_clusterId, _accessor, _lockProvider);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index ba321cf..f283f74 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -157,7 +157,7 @@ public class ClusterAccessor {
    * @param config ClusterConfig
    * @return true if correctly set, false otherwise
    */
-  protected boolean setBasicClusterConfig(ClusterConfig config) {
+  private boolean setBasicClusterConfig(ClusterConfig config) {
     if (config == null) {
       return false;
     }
@@ -718,7 +718,7 @@ public class ClusterAccessor {
   /**
    * Remove all but the top level cluster node; intended for reconstructing the cluster
    */
-  void clearClusterStructure() {
+  private void clearClusterStructure() {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     List<String> paths = getRequiredPaths(_keyBuilder);
     baseAccessor.remove(paths, 0);
@@ -729,7 +729,7 @@ public class ClusterAccessor {
    * @param keyBuilder a PropertyKey.Builder for the cluster
    * @return list of paths as strings
    */
-  static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
+  private static List<String> getRequiredPaths(PropertyKey.Builder keyBuilder) {
     List<String> paths = Lists.newArrayList();
     paths.add(keyBuilder.clusterConfigs().getPath());
     paths.add(keyBuilder.instanceConfigs().getPath());

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
index dd6c77b..90fd986 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ParticipantAccessor.java
@@ -655,7 +655,7 @@ public class ParticipantAccessor {
       return false;
     }
     dropParticipant(oldParticipantId);
-    ResourceAccessor resourceAccessor = new ResourceAccessor(_accessor);
+    ResourceAccessor resourceAccessor = resourceAccessor();
     Map<String, IdealState> idealStateMap = _accessor.getChildValuesMap(_keyBuilder.idealStates());
     for (String resourceName : idealStateMap.keySet()) {
       IdealState idealState = idealStateMap.get(resourceName);
@@ -717,7 +717,7 @@ public class ParticipantAccessor {
   /**
    * Clear properties for the participant
    */
-  public void clearParticipantStructure(ParticipantId participantId) {
+  void clearParticipantStructure(ParticipantId participantId) {
     List<String> paths = getRequiredPaths(_keyBuilder, participantId);
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     baseAccessor.remove(paths, 0);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/558b42c6/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
index c65cb44..7041c5e 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ResourceAccessor.java
@@ -129,7 +129,7 @@ public class ResourceAccessor {
    * @param resourceId
    * @param configuration
    */
-  void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
+  private void setConfiguration(ResourceId resourceId, ResourceConfiguration configuration) {
     _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
     // also set an ideal state if the resource supports it
     RebalancerConfig rebalancerConfig = new RebalancerConfig(configuration);
@@ -278,7 +278,7 @@ public class ResourceAccessor {
    * @return true if they were reset, false otherwise
    */
   public boolean resetResources(Set<ResourceId> resetResourceIdSet) {
-    ParticipantAccessor accessor = new ParticipantAccessor(_accessor);
+    ParticipantAccessor accessor = participantAccessor();
     List<ExternalView> extViews = _accessor.getChildValues(_keyBuilder.externalViews());
     for (ExternalView extView : extViews) {
       if (!resetResourceIdSet.contains(extView.getResourceId())) {
@@ -436,4 +436,12 @@ public class ResourceAccessor {
     return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
         rebalancerContext, userConfig, bucketSize, batchMessageMode);
   }
+
+  /**
+   * Get a ParticipantAccessor instance
+   * @return ParticipantAccessor
+   */
+  protected ParticipantAccessor participantAccessor() {
+    return new ParticipantAccessor(_accessor);
+  }
 }