You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/09/22 23:11:07 UTC
[2/3] helix git commit: Refactor TaskAssignmentCalculator API
Refactor TaskAssignmentCalculator API
Refactoring TaskAssignmentCalculator API, since ClusterDataCache is too large and not all the contents inside are used.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb2a9db
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb2a9db
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb2a9db
Branch: refs/heads/helix-0.6.x
Commit: 7bb2a9db2396a00bb9a721634a2432240679c657
Parents: 0a18726
Author: Junkai Xue <jx...@linkedin.com>
Authored: Tue Sep 13 16:00:08 2016 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Sep 13 16:00:08 2016 -0700
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 36 ++++++++++++++------
.../FixedTargetTaskAssignmentCalculator.java | 35 +++++++------------
.../helix/task/FixedTargetTaskRebalancer.java | 4 +--
.../task/GenericTaskAssignmentCalculator.java | 6 ++--
.../helix/task/GenericTaskRebalancer.java | 4 +--
.../org/apache/helix/task/JobRebalancer.java | 11 +++---
.../helix/task/TaskAssignmentCalculator.java | 10 +++---
7 files changed, 56 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index dacf98d..c8ca941 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -453,21 +453,35 @@ public class ClusterDataCache {
}
/**
- * Return all the nodes that are enabled and tagged same as the job.
- * @param allInstances List of instances to filter with instance tag
- * @param instanceTag The instance group tag
- * @return A new set contains instance name and that are marked enabled and have same
- * tag with job. The original set will not be changed during the filtering
+ * Return all the live nodes that are enabled
+ * @return A new set contains live instance name and that are marked enabled
*/
- public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances,
- String instanceTag) {
+ public Set<String> getAllEnabledLiveInstances() {
+ return getAllEnabledInstances(null);
+ }
+
+ /**
+ * Return all the live nodes that are enabled and tagged same as the job.
+ * @param instanceTag The instance group tag, could be null, when no instance group specified
+ * @return A new set contains live instance name and that are marked enabled and have same
+ * tag with job, only if instance tag input is not null.
+ */
+ public Set<String> getAllEnabledLiveInstancesWithTag(String instanceTag) {
+ return getAllEnabledInstances(instanceTag);
+ }
+
+ private Set<String> getAllEnabledInstances(String instanceTag) {
Set<String> enabledTagInstances = new HashSet<String>();
- for (String instance : allInstances) {
+ for (String instance : _liveInstanceMap.keySet()) {
InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
- if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig
- .containsTag(instanceTag)) {
- enabledTagInstances.add(instance);
+ // Check instance is enabled
+ if (instanceConfig != null && instanceConfig.getInstanceEnabled()) {
+ // Check whether it has instance group or not
+ // If it has instance group, check whether it belongs to that group or not
+ if (instanceTag == null || instanceConfig.containsTag(instanceTag)) {
+ enabledTagInstances.add(instance);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
index 09db616..0768b51 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskAssignmentCalculator.java
@@ -48,36 +48,37 @@ import org.apache.log4j.Logger;
public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculator {
private static final Logger LOG = Logger.getLogger(FixedTargetTaskAssignmentCalculator.class);
- @Override
- public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
- return getAllTaskPartitions(getTgtIdealState(jobCfg, cache), jobCfg, jobCtx);
+ @Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap) {
+ return getAllTaskPartitions(getTgtIdealState(jobCfg, idealStateMap), jobCfg, jobCtx);
}
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
- IdealState tgtIs = getTgtIdealState(jobCfg, cache);
+ Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
+ IdealState tgtIs = getTgtIdealState(jobCfg, idealStateMap);
if (tgtIs == null) {
LOG.warn("Missing target resource for the scheduled job!");
return Collections.emptyMap();
}
Set<String> tgtStates = jobCfg.getTargetPartitionStates();
return getTgtPartitionAssignment(currStateOutput, instances, tgtIs, tgtStates, partitionSet,
- jobContext, cache);
+ jobContext);
}
/**
* Gets the ideal state of the target resource of this job
* @param jobCfg job config containing target resource id
- * @param cache snapshot of the cluster containing the task and target resource
+ * @param idealStateMap the map of resource name map to ideal state
* @return target resource ideal state, or null
*/
- private static IdealState getTgtIdealState(JobConfig jobCfg, ClusterDataCache cache) {
+ private static IdealState getTgtIdealState(JobConfig jobCfg,
+ Map<String, IdealState> idealStateMap) {
String tgtResourceId = jobCfg.getTargetResource();
- return cache.getIdealState(tgtResourceId);
+ return idealStateMap.get(tgtResourceId);
}
/**
@@ -131,7 +132,7 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
*/
private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
CurrentStateOutput currStateOutput, Iterable<String> instances, IdealState tgtIs,
- Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx, ClusterDataCache cache) {
+ Set<String> tgtStates, Set<Integer> includeSet, JobContext jobCtx) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
for (String instance : instances) {
result.put(instance, new TreeSet<Integer>());
@@ -153,18 +154,6 @@ public class FixedTargetTaskAssignmentCalculator extends TaskAssignmentCalculato
continue;
}
- InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instance);
-
- if (instanceConfig == null) {
- LOG.error("Instance config not found for instance : " + instance);
- continue;
- }
-
- if (!instanceConfig.getInstanceEnabled()) {
- LOG.debug("Instance has been disabled, ignore instance : " + instance);
- continue;
- }
-
String s =
currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName),
instance);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 569fe03..1589c1a 100644
--- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -43,7 +43,7 @@ import org.apache.helix.model.ResourceAssignment;
@Override public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
return taskAssignmentCalculator
- .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
}
@Override public Map<String, SortedSet<Integer>> getTaskAssignment(
@@ -53,6 +53,6 @@ import org.apache.helix.model.ResourceAssignment;
ClusterDataCache cache) {
return taskAssignmentCalculator
.getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
- workflowCfg, workflowCtx, partitionSet, cache);
+ workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index fbc7af3..58ba670 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,6 +34,7 @@ import java.util.TreeSet;
import org.apache.helix.HelixException;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.util.JenkinsHash;
@@ -52,7 +53,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
@Override
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap) {
Map<String, TaskConfig> taskMap = jobCfg.getTaskConfigMap();
Map<String, Integer> taskIdMap = jobCtx.getTaskIdPartitionMap();
for (TaskConfig taskCfg : taskMap.values()) {
@@ -69,7 +71,7 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
public Map<String, SortedSet<Integer>> getTaskAssignment(CurrentStateOutput currStateOutput,
ResourceAssignment prevAssignment, Collection<String> instances, JobConfig jobCfg,
final JobContext jobContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
- Set<Integer> partitionSet, ClusterDataCache cache) {
+ Set<Integer> partitionSet, Map<String, IdealState> idealStateMap) {
// Gather input to the full auto rebalancing algorithm
LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>();
states.put("ONLINE", 1);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
index 6a005b9..1720fbb 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskRebalancer.java
@@ -42,7 +42,7 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache) {
return taskAssignmentCalculator
- .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache);
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowCfg, workflowCtx, cache.getIdealStates());
}
@Override
@@ -52,6 +52,6 @@ public class GenericTaskRebalancer extends DeprecatedTaskRebalancer {
Set<Integer> partitionSet, ClusterDataCache cache) {
return taskAssignmentCalculator
.getTaskAssignment(currStateOutput, prevAssignment, instances, jobCfg, jobContext,
- workflowCfg, workflowCtx, partitionSet, cache);
+ workflowCfg, workflowCtx, partitionSet, cache.getIdealStates());
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index 378ad95..cf7f5e6 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -131,9 +131,8 @@ public class JobRebalancer extends TaskRebalancer {
// Fetch the previous resource assignment from the property store. This is required because of
// HELIX-230.
Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
- ? clusterData.getLiveInstances().keySet()
- : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
- jobCfg.getInstanceGroupTag());
+ ? clusterData.getAllEnabledLiveInstances()
+ : clusterData.getAllEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job!");
@@ -222,8 +221,8 @@ public class JobRebalancer extends TaskRebalancer {
// Process all the current assignments of tasks.
TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalulator(jobCfg);
- Set<Integer> allPartitions =
- taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache);
+ Set<Integer> allPartitions = taskAssignmentCal
+ .getAllTaskPartitions(jobCfg, jobCtx, workflowConfig, workflowCtx, cache.getIdealStates());
if (allPartitions == null || allPartitions.isEmpty()) {
// Empty target partitions, mark the job as FAILED.
@@ -424,7 +423,7 @@ public class JobRebalancer extends TaskRebalancer {
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments = taskAssignmentCal
.getTaskAssignment(currStateOutput, prevAssignment, liveInstances, jobCfg, jobCtx,
- workflowConfig, workflowCtx, allPartitions, cache);
+ workflowConfig, workflowCtx, allPartitions, cache.getIdealStates());
for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
String instance = entry.getKey();
if (!tgtPartitionAssignments.containsKey(instance) || excludedInstances
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb2a9db/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
index a3ed5ab..a6a9ed3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskAssignmentCalculator.java
@@ -2,6 +2,7 @@ package org.apache.helix.task;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import java.util.Collection;
@@ -17,11 +18,12 @@ public abstract class TaskAssignmentCalculator {
* @param jobCtx the task context
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
- * @param cache cluster snapshot
+ * @param idealStateMap the map of resource name map to ideal state
* @return set of partition numbers
*/
public abstract Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
- WorkflowConfig workflowCfg, WorkflowContext workflowCtx, ClusterDataCache cache);
+ WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ Map<String, IdealState> idealStateMap);
/**
* Compute an assignment of tasks to instances
@@ -34,12 +36,12 @@ public abstract class TaskAssignmentCalculator {
* @param workflowCfg the workflow configuration
* @param workflowCtx the workflow context
* @param partitionSet the partitions to assign
- * @param cache cluster snapshot
+ * @param idealStateMap the map of resource name map to ideal state
* @return map of instances to set of partition numbers
*/
public abstract Map<String, SortedSet<Integer>> getTaskAssignment(
CurrentStateOutput currStateOutput, ResourceAssignment prevAssignment,
Collection<String> instances, JobConfig jobCfg, JobContext jobContext,
WorkflowConfig workflowCfg, WorkflowContext workflowCtx, Set<Integer> partitionSet,
- ClusterDataCache cache);
+ Map<String, IdealState> idealStateMap);
}