You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:12:23 UTC

[08/52] [abbrv] [HELIX-279] Apply gc handling fixes to ZKHelixManager

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 933bf78..96b1ac8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.Cluster;
 import org.apache.helix.api.State;
@@ -40,6 +41,8 @@ import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.log4j.Logger;
 
+import com.google.common.collect.Sets;
+
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
@@ -86,7 +89,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
   private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
       ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
     ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
-    Set<? extends PartitionId> mappedPartitions =
+    Set<PartitionId> mappedPartitions =
         currentStateOutput.getCurrentStateMappedPartitions(resourceId);
     if (mappedPartitions == null) {
       return partitionMapping;
@@ -106,6 +109,58 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     return partitionMapping;
   }
 
+  /**
+   * Update a ResourceAssignment with dropped and disabled participants for partitions
+   * @param cluster cluster snapshot
+   * @param resourceAssignment current resource assignment
+   * @param currentStateOutput aggregated current state
+   * @param stateModelDef state model definition for the resource
+   */
+  private void mapDroppedAndDisabledPartitions(Cluster cluster,
+      ResourceAssignment resourceAssignment, ResourceCurrentState currentStateOutput,
+      StateModelDefinition stateModelDef) {
+    // get the total partition set: mapped and current state
+    ResourceId resourceId = resourceAssignment.getResourceId();
+    Set<PartitionId> mappedPartitions = Sets.newHashSet();
+    mappedPartitions.addAll(currentStateOutput.getCurrentStateMappedPartitions(resourceId));
+    mappedPartitions.addAll(resourceAssignment.getMappedPartitionIds());
+    for (PartitionId partitionId : mappedPartitions) {
+      // for each partition, get the dropped and disabled mappings
+      Set<ParticipantId> disabledParticipants =
+          ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+              partitionId);
+
+      // get the error participants
+      Map<ParticipantId, State> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceId, partitionId);
+      Set<ParticipantId> errorParticipants = Sets.newHashSet();
+      for (ParticipantId participantId : currentStateMap.keySet()) {
+        State state = currentStateMap.get(participantId);
+        if (state.equals(State.from(HelixDefinedState.ERROR))) {
+          errorParticipants.add(participantId);
+        }
+      }
+
+      // get the dropped and disabled map
+      State initialState = stateModelDef.getTypedInitialState();
+      Map<ParticipantId, State> participantStateMap = resourceAssignment.getReplicaMap(partitionId);
+      Set<ParticipantId> participants = participantStateMap.keySet();
+      Map<ParticipantId, State> droppedAndDisabledMap =
+          ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
+              disabledParticipants, initialState);
+
+      // don't map error participants
+      for (ParticipantId participantId : errorParticipants) {
+        droppedAndDisabledMap.remove(participantId);
+      }
+      // save the mappings, overwriting as necessary
+      participantStateMap.putAll(droppedAndDisabledMap);
+
+      // include this add step in case the resource assignment did not already map this partition
+      resourceAssignment.addReplicaMap(partitionId, participantStateMap);
+    }
+  }
+
   private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
       Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
     BestPossibleStateOutput output = new BestPossibleStateOutput();
@@ -127,11 +182,14 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
               rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
         }
       }
+      RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+      StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
       if (resourceAssignment == null) {
-        RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
-        StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
         resourceAssignment =
             mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+      } else {
+        mapDroppedAndDisabledPartitions(cluster, resourceAssignment, currentStateOutput,
+            stateModelDef);
       }
       output.setResourceAssignment(resourceId, resourceAssignment);
     }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 5730289..c036b14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -68,7 +68,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
           continue;
         }
 
-        if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
+        if (!liveParticipant.getRunningInstance().getSessionId()
+            .equals(message.getTypedTgtSessionId())) {
           continue;
         }
 
@@ -126,17 +127,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
 
         Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
         for (PartitionId partitionId : partitionStateMap.keySet()) {
-          Partition partition = resource.getSubUnit(partitionId);
-          if (partition != null) {
-            currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
-                curState.getState(partitionId));
-          } else {
-            // log
-          }
+          currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+              curState.getState(partitionId));
         }
       }
     }
-
     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index d6fe8c3..08e6799 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -82,7 +82,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
 
       ResourceAssignment resourceAssignment =
           bestPossibleStateOutput.getResourceAssignment(resourceId);
-      for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+      for (PartitionId subUnitId : resourceAssignment.getMappedPartitionIds()) {
         Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
 
         // we should generate message based on the desired-state priority

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 4a3fe28..bbbf5c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -95,11 +95,13 @@ public class MessageSelectionStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.RESOURCES.toString());
     ResourceCurrentState currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     MessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
     if (cluster == null || resourceMap == null || currentStateOutput == null
-        || messageGenOutput == null) {
+        || messageGenOutput == null || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event:" + event
-          + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+          + ". Requires DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
     }
 
     MessageOutput output = new MessageOutput();
@@ -120,7 +122,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
               configResource == null ? null : configResource.getRebalancerConfig(), cluster);
 
       // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+      for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+          .getMappedPartitionIds()) {
         List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
         List<Message> selectedMessages =
             selectMessages(cluster.getLiveParticipantMap(),

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index a7b75a3..764b422 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -121,10 +121,13 @@ public class MessageThrottleStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
 
-    if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
+    if (cluster == null || resourceMap == null || msgSelectionOutput == null
+        || bestPossibleStateOutput == null) {
       throw new StageException("Missing attributes in event: " + event
-          + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+          + ". Requires ClusterDataCache|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
     }
 
     MessageOutput output = new MessageOutput();
@@ -145,9 +148,9 @@ public class MessageThrottleStage extends AbstractBaseStage {
     // go through all new messages, throttle if necessary
     // assume messages should be sorted by state transition priority in messageSelection stage
     for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
       // TODO fix it
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+      for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+          .getMappedPartitionIds()) {
         List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
         if (constraint != null && messages != null && messages.size() > 0) {
           messages = throttle(throttleCounterMap, constraint, messages, true);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index 3dd3b81..2f5ec1d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -31,6 +31,8 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.model.CurrentState;
 
+import com.google.common.collect.Sets;
+
 public class ResourceCurrentState {
   /**
    * map of resource-id to map of partition-id to map of participant-id to state
@@ -225,12 +227,17 @@ public class ResourceCurrentState {
    * @param resourceId resource to look up
    * @return set of mapped partitions, or empty set if there are none
    */
-  public Set<? extends PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+  public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
     Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+    Map<PartitionId, Map<ParticipantId, State>> pendingStateMap = _pendingStateMap.get(resourceId);
+    Set<PartitionId> partitionSet = Sets.newHashSet();
     if (currentStateMap != null) {
-      return currentStateMap.keySet();
+      partitionSet.addAll(currentStateMap.keySet());
+    }
+    if (pendingStateMap != null) {
+      partitionSet.addAll(pendingStateMap.keySet());
     }
-    return Collections.emptySet();
+    return partitionSet;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 02188be..bc2ee50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -53,20 +53,24 @@ public class TaskAssignmentStage extends AbstractBaseStage {
     Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
     MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+    BestPossibleStateOutput bestPossibleStateOutput =
+        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
     Cluster cluster = event.getAttribute("ClusterDataCache");
     Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
 
     if (manager == null || resourceMap == null || messageOutput == null || cluster == null
         || liveParticipantMap == null) {
-      throw new StageException("Missing attributes in event:" + event
-          + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
+      throw new StageException(
+          "Missing attributes in event:"
+              + event
+              + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|DataCache|liveInstanceMap");
     }
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     List<Message> messagesToSend = new ArrayList<Message>();
     for (ResourceId resourceId : resourceMap.keySet()) {
-      ResourceConfig resource = resourceMap.get(resourceId);
-      for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+      for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+          .getMappedPartitionIds()) {
         List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
         messagesToSend.addAll(messages);
       }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
deleted file mode 100644
index f623ca5..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ /dev/null
@@ -1,691 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.MessageListener;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper.States;
-
-public abstract class AbstractManager implements HelixManager, IZkStateListener {
-  private static Logger LOG = Logger.getLogger(AbstractManager.class);
-
-  final String _zkAddress;
-  final String _clusterName;
-  final String _instanceName;
-  final InstanceType _instanceType;
-  final int _sessionTimeout;
-  final List<PreConnectCallback> _preConnectCallbacks;
-  protected final List<CallbackHandler> _handlers;
-  final HelixManagerProperties _properties;
-
-  /**
-   * helix version#
-   */
-  final String _version;
-
-  protected ZkClient _zkclient = null;
-  final DefaultMessagingService _messagingService;
-
-  BaseDataAccessor<ZNRecord> _baseDataAccessor;
-  ZKHelixDataAccessor _dataAccessor;
-  final Builder _keyBuilder;
-  ConfigAccessor _configAccessor;
-  ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
-  LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
-  final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
-
-  volatile String _sessionId;
-
-  /**
-   * Keep track of timestamps that zk State has become Disconnected
-   * If in a _timeWindowLengthMs window zk State has become Disconnected
-   * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
-   */
-  final List<Long> _disconnectTimeHistory = new LinkedList<Long>();
-
-  final int _flappingTimeWindowMs;
-  final int _maxDisconnectThreshold;
-
-  public AbstractManager(String zkAddress, String clusterName, String instanceName,
-      InstanceType instanceType) {
-
-    LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
-        + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
-
-    _zkAddress = zkAddress;
-    _clusterName = clusterName;
-    _instanceType = instanceType;
-    _instanceName = instanceName;
-    _preConnectCallbacks = new LinkedList<PreConnectCallback>();
-    _handlers = new ArrayList<CallbackHandler>();
-    _properties = new HelixManagerProperties("cluster-manager-version.properties");
-    _version = _properties.getVersion();
-
-    _keyBuilder = new Builder(clusterName);
-    _messagingService = new DefaultMessagingService(this);
-
-    /**
-     * use system property if available
-     */
-    _flappingTimeWindowMs =
-        getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
-            ZKHelixManager.FLAPPING_TIME_WINDIOW);
-
-    _maxDisconnectThreshold =
-        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
-            ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
-
-    _sessionTimeout =
-        getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
-
-  }
-
-  private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
-    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
-
-    try {
-      int value = Integer.parseInt(valueString);
-      if (value > 0) {
-        return value;
-      }
-    } catch (NumberFormatException e) {
-      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
-          + ", using default value: " + propertyDefaultValue);
-    }
-
-    return propertyDefaultValue;
-  }
-
-  /**
-   * different types of helix manager should impl its own handle new session logic
-   */
-  // public abstract void handleNewSession();
-
-  @Override
-  public void connect() throws Exception {
-    LOG.info("ClusterManager.connect()");
-    if (isConnected()) {
-      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
-          + " already connected. skip connect");
-      return;
-    }
-
-    try {
-      createClient();
-      _messagingService.onConnected();
-    } catch (Exception e) {
-      LOG.error("fail to connect " + _instanceName, e);
-      disconnect();
-      throw e;
-    }
-  }
-
-  @Override
-  public boolean isConnected() {
-    if (_zkclient == null) {
-      return false;
-    }
-    ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
-    if (zkconnection != null) {
-      States state = zkconnection.getZookeeperState();
-      return state == States.CONNECTED;
-    }
-    return false;
-  }
-
-  /**
-   * specific disconnect logic for each helix-manager type
-   */
-  abstract void doDisconnect();
-
-  /**
-   * This function can be called when the connection are in bad state(e.g. flapping),
-   * in which isConnected() could be false and we want to disconnect from cluster.
-   */
-  @Override
-  public void disconnect() {
-    LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
-    try {
-      /**
-       * stop all timer tasks
-       */
-      stopTimerTasks();
-
-      /**
-       * shutdown thread pool first to avoid reset() being invoked in the middle of state
-       * transition
-       */
-      _messagingService.getExecutor().shutdown();
-
-      // TODO reset user defined handlers only
-      resetHandlers();
-
-      _dataAccessor.shutdown();
-
-      doDisconnect();
-
-      _zkclient.unsubscribeAll();
-    } finally {
-      _zkclient.close();
-      LOG.info("Cluster manager: " + _instanceName + " disconnected");
-    }
-  }
-
-  @Override
-  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
-        new EventType[] {
-            EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
-        new EventType[] {
-            EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
-            EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
-        new EventType[] {
-          EventType.NodeChildrenChanged
-        });
-  }
-
-  @Override
-  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
-      throws Exception {
-    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
-        new EventType[] {
-          EventType.NodeChildrenChanged
-        });
-  }
-
-  @Override
-  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
-      throws Exception {
-    Builder keyBuilder = new Builder(_clusterName);
-
-    PropertyKey propertyKey = null;
-    switch (scope) {
-    case CLUSTER:
-      propertyKey = keyBuilder.clusterConfigs();
-      break;
-    case PARTICIPANT:
-      propertyKey = keyBuilder.instanceConfigs();
-      break;
-    case RESOURCE:
-      propertyKey = keyBuilder.resourceConfigs();
-      break;
-    default:
-      break;
-    }
-
-    if (propertyKey != null) {
-      addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] {
-        EventType.NodeChildrenChanged
-      });
-    } else {
-      LOG.error("Can't add listener to config scope: " + scope);
-    }
-  }
-
-  @Override
-  public void addMessageListener(MessageListener listener, String instanceName) {
-    addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
-      String instanceName, String sessionId) throws Exception {
-    addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
-        ChangeType.CURRENT_STATE, new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
-      throws Exception {
-    addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
-    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public void addControllerListener(ControllerChangeListener listener) {
-    addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
-        new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  void addControllerMessageListener(MessageListener listener) {
-    addListener(listener, new Builder(_clusterName).controllerMessages(),
-        ChangeType.MESSAGES_CONTROLLER, new EventType[] {
-            EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
-  }
-
-  @Override
-  public boolean removeListener(PropertyKey key, Object listener) {
-    LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
-        + _clusterName + " by instance: " + _instanceName);
-
-    synchronized (this) {
-      List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
-      for (CallbackHandler handler : _handlers) {
-        // compare property-key path and listener reference
-        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
-          toRemove.add(handler);
-        }
-      }
-
-      _handlers.removeAll(toRemove);
-
-      // handler.reset() may modify the handlers list, so do it outside the iteration
-      for (CallbackHandler handler : toRemove) {
-        handler.reset();
-      }
-    }
-
-    return true;
-  }
-
-  @Override
-  public HelixDataAccessor getHelixDataAccessor() {
-    checkConnected();
-    return _dataAccessor;
-  }
-
-  @Override
-  public ConfigAccessor getConfigAccessor() {
-    checkConnected();
-    return _configAccessor;
-  }
-
-  @Override
-  public String getClusterName() {
-    return _clusterName;
-  }
-
-  @Override
-  public String getInstanceName() {
-    return _instanceName;
-  }
-
-  @Override
-  public String getSessionId() {
-    checkConnected();
-    return _sessionId;
-  }
-
-  @Override
-  public long getLastNotificationTime() {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  @Override
-  public HelixAdmin getClusterManagmentTool() {
-    checkConnected();
-    if (_zkclient != null) {
-      return new ZKHelixAdmin(_zkclient);
-    }
-
-    LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
-    return null;
-  }
-
-  @Override
-  public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
-    checkConnected();
-
-    if (_helixPropertyStore == null) {
-      String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
-      _helixPropertyStore =
-          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
-              null);
-    }
-
-    return _helixPropertyStore;
-  }
-
-  @Override
-  public ClusterMessagingService getMessagingService() {
-    // The caller can register message handler factories on messaging service before the
-    // helix manager is connected. Thus we do not check connected here
-    return _messagingService;
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    // helix-participant will override this
-    return null;
-  }
-
-  @Override
-  public InstanceType getInstanceType() {
-    return _instanceType;
-  }
-
-  @Override
-  public String getVersion() {
-    return _version;
-  }
-
-  @Override
-  public HelixManagerProperties getProperties() {
-    return _properties;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    // helix-participant will override this
-    return null;
-  }
-
-  @Override
-  public abstract boolean isLeader();
-
-  @Override
-  public void startTimerTasks() {
-    for (HelixTimerTask task : _timerTasks) {
-      task.start();
-    }
-
-  }
-
-  @Override
-  public void stopTimerTasks() {
-    for (HelixTimerTask task : _timerTasks) {
-      task.stop();
-    }
-
-  }
-
-  @Override
-  public void addPreConnectCallback(PreConnectCallback callback) {
-    LOG.info("Adding preconnect callback: " + callback);
-    _preConnectCallbacks.add(callback);
-  }
-
-  @Override
-  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
-    _liveInstanceInfoProvider = liveInstanceInfoProvider;
-  }
-
-  /**
-   * wait until we get a non-zero session-id. note that we might lose zkconnection
-   * right after we read session-id. but it's ok to get stale session-id and we will have
-   * another handle-new-session callback to correct this.
-   */
-  protected void waitUntilConnected() {
-    boolean isConnected;
-    do {
-      isConnected =
-          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
-      if (!isConnected) {
-        LOG.error("fail to connect zkserver: " + _zkAddress + " in "
-            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
-            + ", clusterName: " + _clusterName);
-        continue;
-      }
-
-      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
-      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
-
-      /**
-       * at the time we read session-id, zkconnection might be lost again
-       * wait until we get a non-zero session-id
-       */
-    } while ("0".equals(_sessionId));
-
-    LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
-        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
-        + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
-  }
-
-  protected void checkConnected() {
-    if (!isConnected()) {
-      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
-    }
-  }
-
-  protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
-      EventType[] eventType) {
-    checkConnected();
-
-    PropertyType type = propertyKey.getType();
-
-    synchronized (this) {
-      for (CallbackHandler handler : _handlers) {
-        // compare property-key path and listener reference
-        if (handler.getPath().equals(propertyKey.getPath())
-            && handler.getListener().equals(listener)) {
-          LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
-              + " already exists. skip add");
-
-          return;
-        }
-      }
-
-      CallbackHandler newHandler =
-          new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
-
-      _handlers.add(newHandler);
-      LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
-          + newHandler.getPath());
-    }
-  }
-
-  protected void initHandlers(List<CallbackHandler> handlers) {
-    synchronized (this) {
-      if (handlers != null) {
-        for (CallbackHandler handler : handlers) {
-          handler.init();
-          LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
-  protected void resetHandlers() {
-    synchronized (this) {
-      if (_handlers != null) {
-        // get a copy of the list and iterate over the copy list
-        // in case handler.reset() modify the original handler list
-        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
-        tmpHandlers.addAll(_handlers);
-
-        for (CallbackHandler handler : tmpHandlers) {
-          handler.reset();
-          LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
-        }
-      }
-    }
-  }
-
-  /**
-   * different helix-manager may override this to have a cache-enabled based-data-accessor
-   * @param baseDataAccessor
-   * @return
-   */
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    return baseDataAccessor;
-  }
-
-  void createClient() throws Exception {
-    PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
-
-    _zkclient =
-        new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
-
-    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
-
-    _baseDataAccessor = createBaseDataAccessor(baseDataAccessor);
-
-    _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
-    _configAccessor = new ConfigAccessor(_zkclient);
-
-    int retryCount = 0;
-
-    _zkclient.subscribeStateChanges(this);
-    while (retryCount < 3) {
-      try {
-        _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
-        handleStateChanged(KeeperState.SyncConnected);
-        handleNewSession();
-        break;
-      } catch (HelixException e) {
-        LOG.error("fail to createClient.", e);
-        throw e;
-      } catch (Exception e) {
-        retryCount++;
-
-        LOG.error("fail to createClient. retry " + retryCount, e);
-        if (retryCount == 3) {
-          throw e;
-        }
-      }
-    }
-  }
-
-  // TODO separate out flapping detection code
-  @Override
-  public void handleStateChanged(KeeperState state) throws Exception {
-    switch (state) {
-    case SyncConnected:
-      ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
-      LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
-      break;
-    case Disconnected:
-      LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
-          + _instanceName + ", type: " + _instanceType);
-
-      /**
-       * Track the time stamp that the disconnected happens, then check history and see if
-       * we should disconnect the helix-manager
-       */
-      _disconnectTimeHistory.add(System.currentTimeMillis());
-      if (isFlapping()) {
-        LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
-            + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
-            + _flappingTimeWindowMs + "ms.");
-        disconnect();
-      }
-      break;
-    case Expired:
-      LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
-          + _instanceName + ", type: " + _instanceType);
-      break;
-    default:
-      break;
-    }
-  }
-
-  /**
-   * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
-   * _timeWindowLengthMs Ms
-   * time window, we think that there are something wrong going on and disconnect the zkHelixManager
-   * from zk.
-   */
-  private boolean isFlapping() {
-    if (_disconnectTimeHistory.size() == 0) {
-      return false;
-    }
-    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
-
-    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
-    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
-      _disconnectTimeHistory.remove(0);
-    }
-    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
-  }
-
-  /**
-   * controller should override it to return a list of timers that need to start/stop when
-   * leadership changes
-   * @return
-   */
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
deleted file mode 100644
index dd8e9be..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Timer;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class ControllerManager extends AbstractManager {
-  private static Logger LOG = Logger.getLogger(ControllerManager.class);
-
-  final GenericHelixController _controller = new GenericHelixController();
-
-  // TODO merge into GenericHelixController
-  private CallbackHandler _leaderElectionHandler = null;
-
-  /**
-   * status dump timer-task
-   */
-  static class StatusDumpTask extends HelixTimerTask {
-    Timer _timer = null;
-    final ZkClient zkclient;
-    final AbstractManager helixController;
-
-    public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) {
-      this.zkclient = zkclient;
-      this.helixController = helixController;
-    }
-
-    @Override
-    public void start() {
-      long initialDelay = 30 * 60 * 1000;
-      long period = 120 * 60 * 1000;
-      int timeThresholdNoChange = 180 * 60 * 1000;
-
-      if (_timer == null) {
-        LOG.info("Start StatusDumpTask");
-        _timer = new Timer("StatusDumpTimerTask", true);
-        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
-            timeThresholdNoChange), initialDelay, period);
-      }
-
-    }
-
-    @Override
-    public void stop() {
-      if (_timer != null) {
-        LOG.info("Stop StatusDumpTask");
-        _timer.cancel();
-        _timer = null;
-      }
-    }
-  }
-
-  public ControllerManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
-
-    _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-    _timerTasks.add(new StatusDumpTask(_zkclient, this));
-  }
-
-  @Override
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return _timerTasks;
-  }
-
-  @Override
-  public void handleNewSession() throws Exception {
-    waitUntilConnected();
-
-    /**
-     * reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-    // TODO reset user defined handlers only
-    resetHandlers();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.init();
-    } else {
-      _leaderElectionHandler =
-          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
-              new DistributedLeaderElection(this, _controller), new EventType[] {
-                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-              }, ChangeType.CONTROLLER);
-    }
-
-    /**
-     * init handlers
-     * ok to init message handler and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-  }
-
-  @Override
-  void doDisconnect() {
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-  }
-
-  @Override
-  public boolean isLeader() {
-    if (!isConnected()) {
-      return false;
-    }
-
-    try {
-      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null) {
-        String leaderName = leader.getInstanceName();
-        String sessionId = leader.getTypedSessionId().stringify();
-        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
-            && sessionId.equals(_sessionId)) {
-          return true;
-        }
-      }
-    } catch (Exception e) {
-      // log
-    }
-    return false;
-  }
-
-  /**
-   * helix-controller uses a write-through cache for external-view
-   */
-  @Override
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
-    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index df90f6e..d2b520b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import java.util.List;
 
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.HelixManager;
 import org.apache.helix.HelixTimerTask;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.GenericHelixController;
@@ -35,14 +36,14 @@ import org.apache.log4j.Logger;
 public class ControllerManagerHelper {
   private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
 
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final DefaultMessagingService _messagingService;
   final List<HelixTimerTask> _controllerTimerTasks;
 
-  public ControllerManagerHelper(AbstractManager manager) {
+  public ControllerManagerHelper(HelixManager manager, List<HelixTimerTask> controllerTimerTasks) {
     _manager = manager;
     _messagingService = (DefaultMessagingService) manager.getMessagingService();
-    _controllerTimerTasks = manager.getControllerHelixTimerTasks();
+    _controllerTimerTasks = controllerTimerTasks;
   }
 
   public void addListenersToController(GenericHelixController controller) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index a7d1f25..4fe9164 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -321,6 +321,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
             _manager.getMessagingService().send(recipientCriteria, messageTemplate, callback,
                 timeOut);
       }
+
       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
       Builder keyBuilder = accessor.keyBuilder();
 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
deleted file mode 100644
index f169317..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class DistributedControllerManager extends AbstractManager {
-  private static Logger LOG = Logger.getLogger(DistributedControllerManager.class);
-
-  final StateMachineEngine _stateMachineEngine;
-  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
-  CallbackHandler _leaderElectionHandler = null;
-  final GenericHelixController _controller = new GenericHelixController();
-
-  /**
-   * hold timer tasks for controller only
-   * we need to add/remove controller timer tasks during handle new session
-   */
-  final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-
-  public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT);
-
-    _stateMachineEngine = new HelixStateMachineEngine(this);
-    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
-    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-
-    _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
-    _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
-
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    checkConnected();
-    return _participantHealthInfoCollector;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    return _stateMachineEngine;
-  }
-
-  @Override
-  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
-    return _controllerTimerTasks;
-  }
-
-  @Override
-  public void handleNewSession() throws Exception {
-    waitUntilConnected();
-
-    ParticipantManagerHelper participantHelper =
-        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-
-    /**
-     * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
-     * disconnect if fail to cleanup
-     */
-    stopTimerTasks();
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-    resetHandlers();
-
-    /**
-     * clean up write-through cache
-     */
-    _baseDataAccessor.reset();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
-      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
-    }
-
-    /**
-     * auto-join
-     */
-    participantHelper.joinCluster();
-
-    /**
-     * Invoke PreConnectCallbacks
-     */
-    for (PreConnectCallback callback : _preConnectCallbacks) {
-      callback.onPreConnect();
-    }
-
-    participantHelper.createLiveInstance();
-
-    participantHelper.carryOverPreviousCurrentState();
-
-    participantHelper.setupMsgHandler();
-
-    /**
-     * leader election
-     */
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.init();
-    } else {
-      _leaderElectionHandler =
-          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
-              new DistributedLeaderElection(this, _controller), new EventType[] {
-                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
-              }, ChangeType.CONTROLLER);
-    }
-
-    /**
-     * start health-check timer task
-     */
-    participantHelper.createHealthCheckPath();
-    startTimerTasks();
-
-    /**
-     * init handlers
-     * ok to init message handler, data-accessor, and controller handlers twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-
-  }
-
-  @Override
-  void doDisconnect() {
-    if (_leaderElectionHandler != null) {
-      _leaderElectionHandler.reset();
-    }
-  }
-
-  @Override
-  public boolean isLeader() {
-    if (!isConnected()) {
-      return false;
-    }
-
-    try {
-      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
-      if (leader != null) {
-        String leaderName = leader.getInstanceName();
-        String sessionId = leader.getTypedSessionId().stringify();
-        if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
-            && sessionId.equals(_sessionId)) {
-          return true;
-        }
-      }
-    } catch (Exception e) {
-      // log
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index caf4dae..9836020 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -20,10 +20,12 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.List;
 
 import org.apache.helix.ControllerChangeListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
@@ -40,12 +42,15 @@ import org.apache.log4j.Logger;
 public class DistributedLeaderElection implements ControllerChangeListener {
   private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
 
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final GenericHelixController _controller;
+  final List<HelixTimerTask> _controllerTimerTasks;
 
-  public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) {
+  public DistributedLeaderElection(HelixManager manager, GenericHelixController controller,
+      List<HelixTimerTask> controllerTimerTasks) {
     _manager = manager;
     _controller = controller;
+    _controllerTimerTasks = controllerTimerTasks;
   }
 
   /**
@@ -68,7 +73,8 @@ public class DistributedLeaderElection implements ControllerChangeListener {
       return;
     }
 
-    ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager);
+    ControllerManagerHelper controllerHelper =
+        new ControllerManagerHelper(_manager, _controllerTimerTasks);
     try {
       if (changeContext.getType().equals(NotificationContext.Type.INIT)
           || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
@@ -84,7 +90,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
                 + _manager.getClusterName());
 
             updateHistory(manager);
-            _manager._baseDataAccessor.reset();
+            _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
             controllerHelper.addListenersToController(_controller);
             controllerHelper.startControllerTimerTasks();
           }
@@ -98,7 +104,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
         /**
          * clear write-through cache
          */
-        _manager._baseDataAccessor.reset();
+        _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
       }
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index b58e4b2..869563a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -293,4 +293,10 @@ public class HelixConnectionAdaptor implements HelixManager {
     }
   }
 
+  @Override
+  public void addControllerMessageListener(MessageListener listener) {
+    // TODO Auto-generated method stub
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
deleted file mode 100644
index ab618fe..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Arrays;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ParticipantManager extends AbstractManager {
-
-  private static Logger LOG = Logger.getLogger(ParticipantManager.class);
-
-  /**
-   * state-transition message handler factory for helix-participant
-   */
-  final StateMachineEngine _stateMachineEngine;
-
-  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
-  public ParticipantManager(String zkAddress, String clusterName, String instanceName) {
-    super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT);
-
-    _stateMachineEngine = new HelixStateMachineEngine(this);
-    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
-    _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-  }
-
-  @Override
-  public ParticipantHealthReportCollector getHealthReportCollector() {
-    checkConnected();
-    return _participantHealthInfoCollector;
-  }
-
-  @Override
-  public StateMachineEngine getStateMachineEngine() {
-    return _stateMachineEngine;
-  }
-
-  @Override
-  public void handleNewSession() {
-    waitUntilConnected();
-
-    /**
-     * stop timer tasks, reset all handlers, make sure cleanup completed for previous session
-     * disconnect if cleanup fails
-     */
-    stopTimerTasks();
-    resetHandlers();
-
-    /**
-     * clear write-through cache
-     */
-    _baseDataAccessor.reset();
-
-    /**
-     * from here on, we are dealing with new session
-     */
-    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
-      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
-    }
-
-    /**
-     * auto-join
-     */
-    ParticipantManagerHelper participantHelper =
-        new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-    participantHelper.joinCluster();
-
-    /**
-     * Invoke PreConnectCallbacks
-     */
-    for (PreConnectCallback callback : _preConnectCallbacks) {
-      callback.onPreConnect();
-    }
-
-    participantHelper.createLiveInstance();
-
-    participantHelper.carryOverPreviousCurrentState();
-
-    /**
-     * setup message listener
-     */
-    participantHelper.setupMsgHandler();
-
-    /**
-     * start health check timer task
-     */
-    participantHelper.createHealthCheckPath();
-    startTimerTasks();
-
-    /**
-     * init handlers
-     * ok to init message handler and data-accessor twice
-     * the second init will be skipped (see CallbackHandler)
-     */
-    initHandlers(_handlers);
-
-  }
-
-  /**
-   * helix-participant uses a write-through cache for current-state
-   */
-  @Override
-  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
-    String curStatePath =
-        PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
-    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
-
-  }
-
-  @Override
-  public boolean isLeader() {
-    return false;
-  }
-
-  /**
-   * disconnect logic for helix-participant
-   */
-  @Override
-  void doDisconnect() {
-    // nothing for participant
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index aa84c4d..da266f9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.messaging.DefaultMessagingService;
@@ -53,7 +54,7 @@ public class ParticipantManagerHelper {
   private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
 
   final ZkClient _zkclient;
-  final AbstractManager _manager;
+  final HelixManager _manager;
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;
   final String _instanceName;
@@ -65,8 +66,10 @@ public class ParticipantManagerHelper {
   final ZKHelixDataAccessor _dataAccessor;
   final DefaultMessagingService _messagingService;
   final StateMachineEngine _stateMachineEngine;
+  final LiveInstanceInfoProvider _liveInstanceInfoProvider;
 
-  public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) {
+  public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+      LiveInstanceInfoProvider liveInstanceInfoProvider) {
     _zkclient = zkclient;
     _manager = manager;
     _clusterName = manager.getClusterName();
@@ -80,6 +83,7 @@ public class ParticipantManagerHelper {
     _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
     _messagingService = (DefaultMessagingService) manager.getMessagingService();
     _stateMachineEngine = manager.getStateMachineEngine();
+    _liveInstanceInfoProvider = liveInstanceInfoProvider;
   }
 
   public void joinCluster() {
@@ -90,8 +94,8 @@ public class ParticipantManagerHelper {
           new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
               _manager.getClusterName()).build();
       autoJoin =
-          Boolean
-              .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+          Boolean.parseBoolean(_configAccessor.get(scope,
+              ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
       LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
     } catch (Exception e) {
       // autoJoin is false
@@ -126,6 +130,19 @@ public class ParticipantManagerHelper {
     liveInstance.setHelixVersion(_manager.getVersion());
     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
 
+    // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
+    if (_liveInstanceInfoProvider != null) {
+      LOG.info("invoke liveInstanceInfoProvider");
+      ZNRecord additionalLiveInstanceInfo =
+          _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+      if (additionalLiveInstanceInfo != null) {
+        additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+        ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
+        liveInstance = new LiveInstance(mergedLiveInstance);
+        LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
+      }
+    }
+
     boolean retry;
     do {
       retry = false;
@@ -248,7 +265,7 @@ public class ParticipantManagerHelper {
     }
   }
 
-  public void setupMsgHandler() {
+  public void setupMsgHandler() throws Exception {
     _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
         _stateMachineEngine);
     _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);