You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/11/11 22:10:02 UTC
[08/10] [HELIX-279] Apply gc handling fixes to ZKHelixManager
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 933bf78..96b1ac8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.State;
@@ -40,6 +41,8 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.log4j.Logger;
+import com.google.common.collect.Sets;
+
/**
* For partition compute best possible (instance,state) pair based on
* IdealState,StateModel,LiveInstance
@@ -86,7 +89,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
private ResourceAssignment mapDroppedResource(Cluster cluster, ResourceId resourceId,
ResourceCurrentState currentStateOutput, StateModelDefinition stateModelDef) {
ResourceAssignment partitionMapping = new ResourceAssignment(resourceId);
- Set<? extends PartitionId> mappedPartitions =
+ Set<PartitionId> mappedPartitions =
currentStateOutput.getCurrentStateMappedPartitions(resourceId);
if (mappedPartitions == null) {
return partitionMapping;
@@ -106,6 +109,58 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
return partitionMapping;
}
+ /**
+ * Update a ResourceAssignment with dropped and disabled participants for partitions
+ * @param cluster cluster snapshot
+ * @param resourceAssignment current resource assignment
+ * @param currentStateOutput aggregated current state
+ * @param stateModelDef state model definition for the resource
+ */
+ private void mapDroppedAndDisabledPartitions(Cluster cluster,
+ ResourceAssignment resourceAssignment, ResourceCurrentState currentStateOutput,
+ StateModelDefinition stateModelDef) {
+ // get the total partition set: mapped and current state
+ ResourceId resourceId = resourceAssignment.getResourceId();
+ Set<PartitionId> mappedPartitions = Sets.newHashSet();
+ mappedPartitions.addAll(currentStateOutput.getCurrentStateMappedPartitions(resourceId));
+ mappedPartitions.addAll(resourceAssignment.getMappedPartitionIds());
+ for (PartitionId partitionId : mappedPartitions) {
+ // for each partition, get the dropped and disabled mappings
+ Set<ParticipantId> disabledParticipants =
+ ConstraintBasedAssignment.getDisabledParticipants(cluster.getParticipantMap(),
+ partitionId);
+
+ // get the error participants
+ Map<ParticipantId, State> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceId, partitionId);
+ Set<ParticipantId> errorParticipants = Sets.newHashSet();
+ for (ParticipantId participantId : currentStateMap.keySet()) {
+ State state = currentStateMap.get(participantId);
+ if (state.equals(State.from(HelixDefinedState.ERROR))) {
+ errorParticipants.add(participantId);
+ }
+ }
+
+ // get the dropped and disabled map
+ State initialState = stateModelDef.getTypedInitialState();
+ Map<ParticipantId, State> participantStateMap = resourceAssignment.getReplicaMap(partitionId);
+ Set<ParticipantId> participants = participantStateMap.keySet();
+ Map<ParticipantId, State> droppedAndDisabledMap =
+ ConstraintBasedAssignment.dropAndDisablePartitions(currentStateMap, participants,
+ disabledParticipants, initialState);
+
+ // don't map error participants
+ for (ParticipantId participantId : errorParticipants) {
+ droppedAndDisabledMap.remove(participantId);
+ }
+ // save the mappings, overwriting as necessary
+ participantStateMap.putAll(droppedAndDisabledMap);
+
+ // include this add step in case the resource assignment did not already map this partition
+ resourceAssignment.addReplicaMap(partitionId, participantStateMap);
+ }
+ }
+
private BestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
Map<ResourceId, ResourceConfig> resourceMap, ResourceCurrentState currentStateOutput) {
BestPossibleStateOutput output = new BestPossibleStateOutput();
@@ -127,11 +182,14 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
rebalancer.computeResourceMapping(rebalancerConfig, cluster, currentStateOutput);
}
}
+ RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
+ StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
if (resourceAssignment == null) {
- RebalancerContext context = rebalancerConfig.getRebalancerContext(RebalancerContext.class);
- StateModelDefinition stateModelDef = stateModelDefs.get(context.getStateModelDefId());
resourceAssignment =
mapDroppedResource(cluster, resourceId, currentStateOutput, stateModelDef);
+ } else {
+ mapDroppedAndDisabledPartitions(cluster, resourceAssignment, currentStateOutput,
+ stateModelDef);
}
output.setResourceAssignment(resourceId, resourceAssignment);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 5730289..c036b14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -68,7 +68,8 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
continue;
}
- if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTypedTgtSessionId())) {
+ if (!liveParticipant.getRunningInstance().getSessionId()
+ .equals(message.getTypedTgtSessionId())) {
continue;
}
@@ -126,17 +127,11 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
Map<PartitionId, State> partitionStateMap = curState.getTypedPartitionStateMap();
for (PartitionId partitionId : partitionStateMap.keySet()) {
- Partition partition = resource.getSubUnit(partitionId);
- if (partition != null) {
- currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
- curState.getState(partitionId));
- } else {
- // log
- }
+ currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+ curState.getState(partitionId));
}
}
}
-
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index d6fe8c3..08e6799 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -82,7 +82,7 @@ public class MessageGenerationStage extends AbstractBaseStage {
ResourceAssignment resourceAssignment =
bestPossibleStateOutput.getResourceAssignment(resourceId);
- for (PartitionId subUnitId : resourceConfig.getSubUnitMap().keySet()) {
+ for (PartitionId subUnitId : resourceAssignment.getMappedPartitionIds()) {
Map<ParticipantId, State> instanceStateMap = resourceAssignment.getReplicaMap(subUnitId);
// we should generate message based on the desired-state priority
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 4a3fe28..bbbf5c6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -95,11 +95,13 @@ public class MessageSelectionStage extends AbstractBaseStage {
event.getAttribute(AttributeName.RESOURCES.toString());
ResourceCurrentState currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
MessageOutput messageGenOutput = event.getAttribute(AttributeName.MESSAGES_ALL.toString());
if (cluster == null || resourceMap == null || currentStateOutput == null
- || messageGenOutput == null) {
+ || messageGenOutput == null || bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event:" + event
- + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+ + ". Requires DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE|MESSAGES_ALL");
}
MessageOutput output = new MessageOutput();
@@ -120,7 +122,8 @@ public class MessageSelectionStage extends AbstractBaseStage {
configResource == null ? null : configResource.getRebalancerConfig(), cluster);
// TODO fix it
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+ .getMappedPartitionIds()) {
List<Message> messages = messageGenOutput.getMessages(resourceId, partitionId);
List<Message> selectedMessages =
selectMessages(cluster.getLiveParticipantMap(),
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
index a7b75a3..764b422 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageThrottleStage.java
@@ -121,10 +121,13 @@ public class MessageThrottleStage extends AbstractBaseStage {
event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
+ if (cluster == null || resourceMap == null || msgSelectionOutput == null
+ || bestPossibleStateOutput == null) {
throw new StageException("Missing attributes in event: " + event
- + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+ + ". Requires ClusterDataCache|RESOURCES|BEST_POSSIBLE_STATE|MESSAGES_SELECTED");
}
MessageOutput output = new MessageOutput();
@@ -145,9 +148,9 @@ public class MessageThrottleStage extends AbstractBaseStage {
// go through all new messages, throttle if necessary
// assume messages should be sorted by state transition priority in messageSelection stage
for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
// TODO fix it
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+ .getMappedPartitionIds()) {
List<Message> messages = msgSelectionOutput.getMessages(resourceId, partitionId);
if (constraint != null && messages != null && messages.size() > 0) {
messages = throttle(throttleCounterMap, constraint, messages, true);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index 3dd3b81..2f5ec1d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -31,6 +31,8 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.CurrentState;
+import com.google.common.collect.Sets;
+
public class ResourceCurrentState {
/**
* map of resource-id to map of partition-id to map of participant-id to state
@@ -225,12 +227,17 @@ public class ResourceCurrentState {
* @param resourceId resource to look up
* @return set of mapped partitions, or empty set if there are none
*/
- public Set<? extends PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
+ public Set<PartitionId> getCurrentStateMappedPartitions(ResourceId resourceId) {
Map<PartitionId, Map<ParticipantId, State>> currentStateMap = _currentStateMap.get(resourceId);
+ Map<PartitionId, Map<ParticipantId, State>> pendingStateMap = _pendingStateMap.get(resourceId);
+ Set<PartitionId> partitionSet = Sets.newHashSet();
if (currentStateMap != null) {
- return currentStateMap.keySet();
+ partitionSet.addAll(currentStateMap.keySet());
+ }
+ if (pendingStateMap != null) {
+ partitionSet.addAll(pendingStateMap.keySet());
}
- return Collections.emptySet();
+ return partitionSet;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index 02188be..bc2ee50 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -53,20 +53,24 @@ public class TaskAssignmentStage extends AbstractBaseStage {
Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
Cluster cluster = event.getAttribute("ClusterDataCache");
Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
if (manager == null || resourceMap == null || messageOutput == null || cluster == null
|| liveParticipantMap == null) {
- throw new StageException("Missing attributes in event:" + event
- + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
+ throw new StageException(
+ "Missing attributes in event:"
+ + event
+ + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|BEST_POSSIBLE_STATE|DataCache|liveInstanceMap");
}
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
List<Message> messagesToSend = new ArrayList<Message>();
for (ResourceId resourceId : resourceMap.keySet()) {
- ResourceConfig resource = resourceMap.get(resourceId);
- for (PartitionId partitionId : resource.getSubUnitMap().keySet()) {
+ for (PartitionId partitionId : bestPossibleStateOutput.getResourceAssignment(resourceId)
+ .getMappedPartitionIds()) {
List<Message> messages = messageOutput.getMessages(resourceId, partitionId);
messagesToSend.addAll(messages);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
deleted file mode 100644
index f623ca5..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
+++ /dev/null
@@ -1,691 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.ClusterMessagingService;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.ConfigChangeListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.CurrentStateChangeListener;
-import org.apache.helix.ExternalViewChangeListener;
-import org.apache.helix.HealthStateChangeListener;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerProperties;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.IdealStateChangeListener;
-import org.apache.helix.InstanceConfigChangeListener;
-import org.apache.helix.InstanceType;
-import org.apache.helix.LiveInstanceChangeListener;
-import org.apache.helix.LiveInstanceInfoProvider;
-import org.apache.helix.MessageListener;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ScopedConfigChangeListener;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.messaging.DefaultMessagingService;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper.States;
-
-public abstract class AbstractManager implements HelixManager, IZkStateListener {
- private static Logger LOG = Logger.getLogger(AbstractManager.class);
-
- final String _zkAddress;
- final String _clusterName;
- final String _instanceName;
- final InstanceType _instanceType;
- final int _sessionTimeout;
- final List<PreConnectCallback> _preConnectCallbacks;
- protected final List<CallbackHandler> _handlers;
- final HelixManagerProperties _properties;
-
- /**
- * helix version#
- */
- final String _version;
-
- protected ZkClient _zkclient = null;
- final DefaultMessagingService _messagingService;
-
- BaseDataAccessor<ZNRecord> _baseDataAccessor;
- ZKHelixDataAccessor _dataAccessor;
- final Builder _keyBuilder;
- ConfigAccessor _configAccessor;
- ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
- LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
- final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
-
- volatile String _sessionId;
-
- /**
- * Keep track of timestamps that zk State has become Disconnected
- * If in a _timeWindowLengthMs window zk State has become Disconnected
- * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
- */
- final List<Long> _disconnectTimeHistory = new LinkedList<Long>();
-
- final int _flappingTimeWindowMs;
- final int _maxDisconnectThreshold;
-
- public AbstractManager(String zkAddress, String clusterName, String instanceName,
- InstanceType instanceType) {
-
- LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
- + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
-
- _zkAddress = zkAddress;
- _clusterName = clusterName;
- _instanceType = instanceType;
- _instanceName = instanceName;
- _preConnectCallbacks = new LinkedList<PreConnectCallback>();
- _handlers = new ArrayList<CallbackHandler>();
- _properties = new HelixManagerProperties("cluster-manager-version.properties");
- _version = _properties.getVersion();
-
- _keyBuilder = new Builder(clusterName);
- _messagingService = new DefaultMessagingService(this);
-
- /**
- * use system property if available
- */
- _flappingTimeWindowMs =
- getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
- ZKHelixManager.FLAPPING_TIME_WINDIOW);
-
- _maxDisconnectThreshold =
- getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
- ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
-
- _sessionTimeout =
- getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
-
- }
-
- private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
- String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
-
- try {
- int value = Integer.parseInt(valueString);
- if (value > 0) {
- return value;
- }
- } catch (NumberFormatException e) {
- LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
- + ", using default value: " + propertyDefaultValue);
- }
-
- return propertyDefaultValue;
- }
-
- /**
- * different types of helix manager should impl its own handle new session logic
- */
- // public abstract void handleNewSession();
-
- @Override
- public void connect() throws Exception {
- LOG.info("ClusterManager.connect()");
- if (isConnected()) {
- LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
- + " already connected. skip connect");
- return;
- }
-
- try {
- createClient();
- _messagingService.onConnected();
- } catch (Exception e) {
- LOG.error("fail to connect " + _instanceName, e);
- disconnect();
- throw e;
- }
- }
-
- @Override
- public boolean isConnected() {
- if (_zkclient == null) {
- return false;
- }
- ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
- if (zkconnection != null) {
- States state = zkconnection.getZookeeperState();
- return state == States.CONNECTED;
- }
- return false;
- }
-
- /**
- * specific disconnect logic for each helix-manager type
- */
- abstract void doDisconnect();
-
- /**
- * This function can be called when the connection are in bad state(e.g. flapping),
- * in which isConnected() could be false and we want to disconnect from cluster.
- */
- @Override
- public void disconnect() {
- LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);
-
- try {
- /**
- * stop all timer tasks
- */
- stopTimerTasks();
-
- /**
- * shutdown thread pool first to avoid reset() being invoked in the middle of state
- * transition
- */
- _messagingService.getExecutor().shutdown();
-
- // TODO reset user defined handlers only
- resetHandlers();
-
- _dataAccessor.shutdown();
-
- doDisconnect();
-
- _zkclient.unsubscribeAll();
- } finally {
- _zkclient.close();
- LOG.info("Cluster manager: " + _instanceName + " disconnected");
- }
- }
-
- @Override
- public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
- new EventType[] {
- EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
- new EventType[] {
- EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted,
- EventType.NodeCreated
- });
- }
-
- @Override
- public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
- new EventType[] {
- EventType.NodeChildrenChanged
- });
- }
-
- @Override
- public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
- throws Exception {
- addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
- new EventType[] {
- EventType.NodeChildrenChanged
- });
- }
-
- @Override
- public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
- throws Exception {
- Builder keyBuilder = new Builder(_clusterName);
-
- PropertyKey propertyKey = null;
- switch (scope) {
- case CLUSTER:
- propertyKey = keyBuilder.clusterConfigs();
- break;
- case PARTICIPANT:
- propertyKey = keyBuilder.instanceConfigs();
- break;
- case RESOURCE:
- propertyKey = keyBuilder.resourceConfigs();
- break;
- default:
- break;
- }
-
- if (propertyKey != null) {
- addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] {
- EventType.NodeChildrenChanged
- });
- } else {
- LOG.error("Can't add listener to config scope: " + scope);
- }
- }
-
- @Override
- public void addMessageListener(MessageListener listener, String instanceName) {
- addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
- String instanceName, String sessionId) throws Exception {
- addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
- ChangeType.CURRENT_STATE, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
- throws Exception {
- addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
- addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public void addControllerListener(ControllerChangeListener listener) {
- addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
- new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- void addControllerMessageListener(MessageListener listener) {
- addListener(listener, new Builder(_clusterName).controllerMessages(),
- ChangeType.MESSAGES_CONTROLLER, new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- });
- }
-
- @Override
- public boolean removeListener(PropertyKey key, Object listener) {
- LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
- + _clusterName + " by instance: " + _instanceName);
-
- synchronized (this) {
- List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
- for (CallbackHandler handler : _handlers) {
- // compare property-key path and listener reference
- if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
- toRemove.add(handler);
- }
- }
-
- _handlers.removeAll(toRemove);
-
- // handler.reset() may modify the handlers list, so do it outside the iteration
- for (CallbackHandler handler : toRemove) {
- handler.reset();
- }
- }
-
- return true;
- }
-
- @Override
- public HelixDataAccessor getHelixDataAccessor() {
- checkConnected();
- return _dataAccessor;
- }
-
- @Override
- public ConfigAccessor getConfigAccessor() {
- checkConnected();
- return _configAccessor;
- }
-
- @Override
- public String getClusterName() {
- return _clusterName;
- }
-
- @Override
- public String getInstanceName() {
- return _instanceName;
- }
-
- @Override
- public String getSessionId() {
- checkConnected();
- return _sessionId;
- }
-
- @Override
- public long getLastNotificationTime() {
- // TODO Auto-generated method stub
- return 0;
- }
-
- @Override
- public HelixAdmin getClusterManagmentTool() {
- checkConnected();
- if (_zkclient != null) {
- return new ZKHelixAdmin(_zkclient);
- }
-
- LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
- return null;
- }
-
- @Override
- public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
- checkConnected();
-
- if (_helixPropertyStore == null) {
- String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
- _helixPropertyStore =
- new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path,
- null);
- }
-
- return _helixPropertyStore;
- }
-
- @Override
- public ClusterMessagingService getMessagingService() {
- // The caller can register message handler factories on messaging service before the
- // helix manager is connected. Thus we do not check connected here
- return _messagingService;
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- // helix-participant will override this
- return null;
- }
-
- @Override
- public InstanceType getInstanceType() {
- return _instanceType;
- }
-
- @Override
- public String getVersion() {
- return _version;
- }
-
- @Override
- public HelixManagerProperties getProperties() {
- return _properties;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- // helix-participant will override this
- return null;
- }
-
- @Override
- public abstract boolean isLeader();
-
- @Override
- public void startTimerTasks() {
- for (HelixTimerTask task : _timerTasks) {
- task.start();
- }
-
- }
-
- @Override
- public void stopTimerTasks() {
- for (HelixTimerTask task : _timerTasks) {
- task.stop();
- }
-
- }
-
- @Override
- public void addPreConnectCallback(PreConnectCallback callback) {
- LOG.info("Adding preconnect callback: " + callback);
- _preConnectCallbacks.add(callback);
- }
-
- @Override
- public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
- _liveInstanceInfoProvider = liveInstanceInfoProvider;
- }
-
- /**
- * wait until we get a non-zero session-id. note that we might lose zkconnection
- * right after we read session-id. but it's ok to get stale session-id and we will have
- * another handle-new-session callback to correct this.
- */
- protected void waitUntilConnected() {
- boolean isConnected;
- do {
- isConnected =
- _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
- if (!isConnected) {
- LOG.error("fail to connect zkserver: " + _zkAddress + " in "
- + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
- + ", clusterName: " + _clusterName);
- continue;
- }
-
- ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
- _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
-
- /**
- * at the time we read session-id, zkconnection might be lost again
- * wait until we get a non-zero session-id
- */
- } while ("0".equals(_sessionId));
-
- LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
- + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
- + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
- }
-
- protected void checkConnected() {
- if (!isConnected()) {
- throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
- }
- }
-
- protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
- EventType[] eventType) {
- checkConnected();
-
- PropertyType type = propertyKey.getType();
-
- synchronized (this) {
- for (CallbackHandler handler : _handlers) {
- // compare property-key path and listener reference
- if (handler.getPath().equals(propertyKey.getPath())
- && handler.getListener().equals(listener)) {
- LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
- + " already exists. skip add");
-
- return;
- }
- }
-
- CallbackHandler newHandler =
- new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType);
-
- _handlers.add(newHandler);
- LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
- + newHandler.getPath());
- }
- }
-
- protected void initHandlers(List<CallbackHandler> handlers) {
- synchronized (this) {
- if (handlers != null) {
- for (CallbackHandler handler : handlers) {
- handler.init();
- LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
- protected void resetHandlers() {
- synchronized (this) {
- if (_handlers != null) {
- // get a copy of the list and iterate over the copy list
- // in case handler.reset() modify the original handler list
- List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
- tmpHandlers.addAll(_handlers);
-
- for (CallbackHandler handler : tmpHandlers) {
- handler.reset();
- LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
- }
- }
- }
- }
-
- /**
- * different helix-manager may override this to have a cache-enabled based-data-accessor
- * @param baseDataAccessor
- * @return
- */
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- return baseDataAccessor;
- }
-
- void createClient() throws Exception {
- PathBasedZkSerializer zkSerializer =
- ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
-
- _zkclient =
- new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
-
- ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
-
- _baseDataAccessor = createBaseDataAccessor(baseDataAccessor);
-
- _dataAccessor = new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
- _configAccessor = new ConfigAccessor(_zkclient);
-
- int retryCount = 0;
-
- _zkclient.subscribeStateChanges(this);
- while (retryCount < 3) {
- try {
- _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
- handleStateChanged(KeeperState.SyncConnected);
- handleNewSession();
- break;
- } catch (HelixException e) {
- LOG.error("fail to createClient.", e);
- throw e;
- } catch (Exception e) {
- retryCount++;
-
- LOG.error("fail to createClient. retry " + retryCount, e);
- if (retryCount == 3) {
- throw e;
- }
- }
- }
- }
-
- // TODO separate out flapping detection code
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- switch (state) {
- case SyncConnected:
- ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
- LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
- break;
- case Disconnected:
- LOG.info("KeeperState:" + state + ", disconnectedSessionId: " + _sessionId + ", instance: "
- + _instanceName + ", type: " + _instanceType);
-
- /**
- * Track the time stamp that the disconnected happens, then check history and see if
- * we should disconnect the helix-manager
- */
- _disconnectTimeHistory.add(System.currentTimeMillis());
- if (isFlapping()) {
- LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
- + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
- + _flappingTimeWindowMs + "ms.");
- disconnect();
- }
- break;
- case Expired:
- LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
- + _instanceName + ", type: " + _instanceType);
- break;
- default:
- break;
- }
- }
-
- /**
- * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous
- * _timeWindowLengthMs Ms
- * time window, we think that there are something wrong going on and disconnect the zkHelixManager
- * from zk.
- */
- private boolean isFlapping() {
- if (_disconnectTimeHistory.size() == 0) {
- return false;
- }
- long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
-
- // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
- while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
- _disconnectTimeHistory.remove(0);
- }
- return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
- }
-
- /**
- * controller should override it to return a list of timers that need to start/stop when
- * leadership changes
- * @return
- */
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
deleted file mode 100644
index dd8e9be..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
+++ /dev/null
@@ -1,174 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Timer;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.monitoring.ZKPathDataDumpTask;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class ControllerManager extends AbstractManager {
- private static Logger LOG = Logger.getLogger(ControllerManager.class);
-
- final GenericHelixController _controller = new GenericHelixController();
-
- // TODO merge into GenericHelixController
- private CallbackHandler _leaderElectionHandler = null;
-
- /**
- * status dump timer-task
- */
- static class StatusDumpTask extends HelixTimerTask {
- Timer _timer = null;
- final ZkClient zkclient;
- final AbstractManager helixController;
-
- public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) {
- this.zkclient = zkclient;
- this.helixController = helixController;
- }
-
- @Override
- public void start() {
- long initialDelay = 30 * 60 * 1000;
- long period = 120 * 60 * 1000;
- int timeThresholdNoChange = 180 * 60 * 1000;
-
- if (_timer == null) {
- LOG.info("Start StatusDumpTask");
- _timer = new Timer("StatusDumpTimerTask", true);
- _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController, zkclient,
- timeThresholdNoChange), initialDelay, period);
- }
-
- }
-
- @Override
- public void stop() {
- if (_timer != null) {
- LOG.info("Stop StatusDumpTask");
- _timer.cancel();
- _timer = null;
- }
- }
- }
-
- public ControllerManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
-
- _timerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _timerTasks.add(new StatusDumpTask(_zkclient, this));
- }
-
- @Override
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return _timerTasks;
- }
-
- @Override
- public void handleNewSession() throws Exception {
- waitUntilConnected();
-
- /**
- * reset all handlers, make sure cleanup completed for previous session
- * disconnect if fail to cleanup
- */
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- // TODO reset user defined handlers only
- resetHandlers();
-
- /**
- * from here on, we are dealing with new session
- */
-
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.init();
- } else {
- _leaderElectionHandler =
- new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
- new DistributedLeaderElection(this, _controller), new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- }, ChangeType.CONTROLLER);
- }
-
- /**
- * init handlers
- * ok to init message handler and controller handlers twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
- }
-
- @Override
- void doDisconnect() {
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- }
-
- @Override
- public boolean isLeader() {
- if (!isConnected()) {
- return false;
- }
-
- try {
- LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderName = leader.getInstanceName();
- String sessionId = leader.getTypedSessionId().stringify();
- if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
- && sessionId.equals(_sessionId)) {
- return true;
- }
- }
- } catch (Exception e) {
- // log
- }
- return false;
- }
-
- /**
- * helix-controller uses a write-through cache for external-view
- */
- @Override
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
- return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(extViewPath));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index df90f6e..d2b520b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
import java.util.List;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.HelixManager;
import org.apache.helix.HelixTimerTask;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.GenericHelixController;
@@ -35,14 +36,14 @@ import org.apache.log4j.Logger;
public class ControllerManagerHelper {
private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
- final AbstractManager _manager;
+ final HelixManager _manager;
final DefaultMessagingService _messagingService;
final List<HelixTimerTask> _controllerTimerTasks;
- public ControllerManagerHelper(AbstractManager manager) {
+ public ControllerManagerHelper(HelixManager manager, List<HelixTimerTask> controllerTimerTasks) {
_manager = manager;
_messagingService = (DefaultMessagingService) manager.getMessagingService();
- _controllerTimerTasks = manager.getControllerHelixTimerTasks();
+ _controllerTimerTasks = controllerTimerTasks;
}
public void addListenersToController(GenericHelixController controller) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index a7d1f25..4fe9164 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -321,6 +321,7 @@ public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFact
_manager.getMessagingService().send(recipientCriteria, messageTemplate, callback,
timeOut);
}
+
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
deleted file mode 100644
index f169317..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.HelixConstants.ChangeType;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixTimerTask;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.healthcheck.HealthStatsAggregationTask;
-import org.apache.helix.healthcheck.HealthStatsAggregator;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-public class DistributedControllerManager extends AbstractManager {
- private static Logger LOG = Logger.getLogger(DistributedControllerManager.class);
-
- final StateMachineEngine _stateMachineEngine;
- final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
- CallbackHandler _leaderElectionHandler = null;
- final GenericHelixController _controller = new GenericHelixController();
-
- /**
- * hold timer tasks for controller only
- * we need to add/remove controller timer tasks during handle new session
- */
- final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
-
- public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT);
-
- _stateMachineEngine = new HelixStateMachineEngine(this);
- _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
- _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
-
- _controllerTimerTasks.add(new HealthStatsAggregationTask(new HealthStatsAggregator(this)));
- _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
-
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- checkConnected();
- return _participantHealthInfoCollector;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- return _stateMachineEngine;
- }
-
- @Override
- protected List<HelixTimerTask> getControllerHelixTimerTasks() {
- return _controllerTimerTasks;
- }
-
- @Override
- public void handleNewSession() throws Exception {
- waitUntilConnected();
-
- ParticipantManagerHelper participantHelper =
- new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
-
- /**
- * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
- * disconnect if fail to cleanup
- */
- stopTimerTasks();
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- resetHandlers();
-
- /**
- * clean up write-through cache
- */
- _baseDataAccessor.reset();
-
- /**
- * from here on, we are dealing with new session
- */
- if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
- throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
- }
-
- /**
- * auto-join
- */
- participantHelper.joinCluster();
-
- /**
- * Invoke PreConnectCallbacks
- */
- for (PreConnectCallback callback : _preConnectCallbacks) {
- callback.onPreConnect();
- }
-
- participantHelper.createLiveInstance();
-
- participantHelper.carryOverPreviousCurrentState();
-
- participantHelper.setupMsgHandler();
-
- /**
- * leader election
- */
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.init();
- } else {
- _leaderElectionHandler =
- new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
- new DistributedLeaderElection(this, _controller), new EventType[] {
- EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
- }, ChangeType.CONTROLLER);
- }
-
- /**
- * start health-check timer task
- */
- participantHelper.createHealthCheckPath();
- startTimerTasks();
-
- /**
- * init handlers
- * ok to init message handler, data-accessor, and controller handlers twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
-
- }
-
- @Override
- void doDisconnect() {
- if (_leaderElectionHandler != null) {
- _leaderElectionHandler.reset();
- }
- }
-
- @Override
- public boolean isLeader() {
- if (!isConnected()) {
- return false;
- }
-
- try {
- LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
- if (leader != null) {
- String leaderName = leader.getInstanceName();
- String sessionId = leader.getTypedSessionId().stringify();
- if (leaderName != null && leaderName.equals(_instanceName) && sessionId != null
- && sessionId.equals(_sessionId)) {
- return true;
- }
- }
- } catch (Exception e) {
- // log
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
index caf4dae..9836020 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -20,10 +20,12 @@ package org.apache.helix.manager.zk;
*/
import java.lang.management.ManagementFactory;
+import java.util.List;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
@@ -40,12 +42,15 @@ import org.apache.log4j.Logger;
public class DistributedLeaderElection implements ControllerChangeListener {
private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
- final AbstractManager _manager;
+ final HelixManager _manager;
final GenericHelixController _controller;
+ final List<HelixTimerTask> _controllerTimerTasks;
- public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) {
+ public DistributedLeaderElection(HelixManager manager, GenericHelixController controller,
+ List<HelixTimerTask> controllerTimerTasks) {
_manager = manager;
_controller = controller;
+ _controllerTimerTasks = controllerTimerTasks;
}
/**
@@ -68,7 +73,8 @@ public class DistributedLeaderElection implements ControllerChangeListener {
return;
}
- ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager);
+ ControllerManagerHelper controllerHelper =
+ new ControllerManagerHelper(_manager, _controllerTimerTasks);
try {
if (changeContext.getType().equals(NotificationContext.Type.INIT)
|| changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
@@ -84,7 +90,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
+ _manager.getClusterName());
updateHistory(manager);
- _manager._baseDataAccessor.reset();
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
controllerHelper.addListenersToController(_controller);
controllerHelper.startControllerTimerTasks();
}
@@ -98,7 +104,7 @@ public class DistributedLeaderElection implements ControllerChangeListener {
/**
* clear write-through cache
*/
- _manager._baseDataAccessor.reset();
+ _manager.getHelixDataAccessor().getBaseDataAccessor().reset();
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
index b58e4b2..869563a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/HelixConnectionAdaptor.java
@@ -293,4 +293,10 @@ public class HelixConnectionAdaptor implements HelixManager {
}
}
+ @Override
+ public void addControllerMessageListener(MessageListener listener) {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
deleted file mode 100644
index ab618fe..0000000
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Arrays;
-
-import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PreConnectCallback;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
-import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
-import org.apache.helix.healthcheck.ParticipantHealthReportTask;
-import org.apache.helix.participant.HelixStateMachineEngine;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.log4j.Logger;
-
-public class ParticipantManager extends AbstractManager {
-
- private static Logger LOG = Logger.getLogger(ParticipantManager.class);
-
- /**
- * state-transition message handler factory for helix-participant
- */
- final StateMachineEngine _stateMachineEngine;
-
- final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
-
- public ParticipantManager(String zkAddress, String clusterName, String instanceName) {
- super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT);
-
- _stateMachineEngine = new HelixStateMachineEngine(this);
- _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
- _timerTasks.add(new ParticipantHealthReportTask(_participantHealthInfoCollector));
- }
-
- @Override
- public ParticipantHealthReportCollector getHealthReportCollector() {
- checkConnected();
- return _participantHealthInfoCollector;
- }
-
- @Override
- public StateMachineEngine getStateMachineEngine() {
- return _stateMachineEngine;
- }
-
- @Override
- public void handleNewSession() {
- waitUntilConnected();
-
- /**
- * stop timer tasks, reset all handlers, make sure cleanup completed for previous session
- * disconnect if cleanup fails
- */
- stopTimerTasks();
- resetHandlers();
-
- /**
- * clear write-through cache
- */
- _baseDataAccessor.reset();
-
- /**
- * from here on, we are dealing with new session
- */
- if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
- throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
- }
-
- /**
- * auto-join
- */
- ParticipantManagerHelper participantHelper =
- new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
- participantHelper.joinCluster();
-
- /**
- * Invoke PreConnectCallbacks
- */
- for (PreConnectCallback callback : _preConnectCallbacks) {
- callback.onPreConnect();
- }
-
- participantHelper.createLiveInstance();
-
- participantHelper.carryOverPreviousCurrentState();
-
- /**
- * setup message listener
- */
- participantHelper.setupMsgHandler();
-
- /**
- * start health check timer task
- */
- participantHelper.createHealthCheckPath();
- startTimerTasks();
-
- /**
- * init handlers
- * ok to init message handler and data-accessor twice
- * the second init will be skipped (see CallbackHandler)
- */
- initHandlers(_handlers);
-
- }
-
- /**
- * helix-participant uses a write-through cache for current-state
- */
- @Override
- BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
- String curStatePath =
- PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, _clusterName, _instanceName);
- return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, Arrays.asList(curStatePath));
-
- }
-
- @Override
- public boolean isLeader() {
- return false;
- }
-
- /**
- * disconnect logic for helix-participant
- */
- @Override
- void doDisconnect() {
- // nothing for participant
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/f8e3b1af/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
index aa84c4d..da266f9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -30,6 +30,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.DefaultMessagingService;
@@ -53,7 +54,7 @@ public class ParticipantManagerHelper {
private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
final ZkClient _zkclient;
- final AbstractManager _manager;
+ final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
final String _instanceName;
@@ -65,8 +66,10 @@ public class ParticipantManagerHelper {
final ZKHelixDataAccessor _dataAccessor;
final DefaultMessagingService _messagingService;
final StateMachineEngine _stateMachineEngine;
+ final LiveInstanceInfoProvider _liveInstanceInfoProvider;
- public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) {
+ public ParticipantManagerHelper(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+ LiveInstanceInfoProvider liveInstanceInfoProvider) {
_zkclient = zkclient;
_manager = manager;
_clusterName = manager.getClusterName();
@@ -80,6 +83,7 @@ public class ParticipantManagerHelper {
_dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
_messagingService = (DefaultMessagingService) manager.getMessagingService();
_stateMachineEngine = manager.getStateMachineEngine();
+ _liveInstanceInfoProvider = liveInstanceInfoProvider;
}
public void joinCluster() {
@@ -90,8 +94,8 @@ public class ParticipantManagerHelper {
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
_manager.getClusterName()).build();
autoJoin =
- Boolean
- .parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+ Boolean.parseBoolean(_configAccessor.get(scope,
+ ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
} catch (Exception e) {
// autoJoin is false
@@ -126,6 +130,19 @@ public class ParticipantManagerHelper {
liveInstance.setHelixVersion(_manager.getVersion());
liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+ // LiveInstanceInfoProvider liveInstanceInfoProvider = _manager._liveInstanceInfoProvider;
+ if (_liveInstanceInfoProvider != null) {
+ LOG.info("invoke liveInstanceInfoProvider");
+ ZNRecord additionalLiveInstanceInfo =
+ _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
+ if (additionalLiveInstanceInfo != null) {
+ additionalLiveInstanceInfo.merge(liveInstance.getRecord());
+ ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
+ liveInstance = new LiveInstance(mergedLiveInstance);
+ LOG.info("instanceName: " + _instanceName + ", mergedLiveInstance: " + liveInstance);
+ }
+ }
+
boolean retry;
do {
retry = false;
@@ -248,7 +265,7 @@ public class ParticipantManagerHelper {
}
}
- public void setupMsgHandler() {
+ public void setupMsgHandler() throws Exception {
_messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
_stateMachineEngine);
_manager.addMessageListener(_messagingService.getExecutor(), _instanceName);