You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/01 12:11:00 UTC
[jira] [Commented] (FLINK-7866) Weigh list of preferred locations
for scheduling
[ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497910#comment-16497910 ]
ASF GitHub Bot commented on FLINK-7866:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/4949#discussion_r192376303
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java ---
@@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc
}
// we build up two indexes, one for resource id and one for host names of the preferred locations.
- HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size());
- HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size());
+ Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size());
+ Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size());
for (TaskManagerLocation locationPreference : locationPreferences) {
- preferredResourceIDs.add(locationPreference.getResourceID());
- preferredFQHostNames.add(locationPreference.getFQDNHostname());
+ Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
+ preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
+
+ oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
+ preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
}
Iterator<IN> iterator = candidates.iterator();
- IN matchByHostName = null;
IN matchByAdditionalRequirements = null;
+ final Map<IN, CandidateMatchedResult> candidateMatchedResults = new HashMap<>();
+
while (iterator.hasNext()) {
IN candidate = iterator.next();
SlotContext slotContext = contextExtractor.apply(candidate);
// this if checks if the candidate has is a local slot
- if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) {
+ Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
+ if (localWeigh != null) {
if (additionalRequirementsFilter.test(candidate)) {
- // we can stop, because we found a match with best possible locality.
- return resultProducer.apply(candidate, Locality.LOCAL);
+ // we found a match with locality.
+ candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0));
} else {
// next candidate because this failed on the additional requirements.
continue;
}
- }
-
- // this if checks if the candidate is at least host-local, if we did not find another host-local
- // candidate before.
- if (matchByHostName == null) {
- if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) {
+ } else {
+ // this if checks if the candidate is host-local.
+ Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
+ if (hostLocalWeigh != null) {
if (additionalRequirementsFilter.test(candidate)) {
- // We remember the candidate, but still continue because there might still be a candidate
- // that is local to the desired task manager.
- matchByHostName = candidate;
+ // we found a match with host locality.
+ candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh));
} else {
// next candidate because this failed on the additional requirements.
continue;
}
}
+ }
- // this if checks if the candidate at least fulfils the resource requirements, and is only required
- // if we did not yet find a valid candidate with better locality.
- if (matchByAdditionalRequirements == null
- && additionalRequirementsFilter.test(candidate)) {
- // Again, we remember but continue in hope for a candidate with better locality.
- matchByAdditionalRequirements = candidate;
- }
+ // this if checks if the candidate at least fulfils the resource requirements, and is only required
+ // if we did not yet find a valid candidate with better locality.
+ if (candidateMatchedResults.isEmpty()
+ && matchByAdditionalRequirements == null
+ && additionalRequirementsFilter.test(candidate)) {
+ // Again, we remember but continue in hope for a candidate with better locality.
+ matchByAdditionalRequirements = candidate;
}
}
+ // find the best matched one.
+ Map.Entry<IN, CandidateMatchedResult> theBestOne = candidateMatchedResults.entrySet()
+ .stream()
+ .sorted(Comparator.comparingInt((Map.Entry<IN, CandidateMatchedResult> matchResult)
+ -> matchResult.getValue().getScore()).reversed())
+ .findFirst()
+ .orElse(null);
+
// at the end of the iteration, we return the candidate with best possible locality or null.
- if (matchByHostName != null) {
- return resultProducer.apply(matchByHostName, Locality.HOST_LOCAL);
+ if (theBestOne != null) {
+ return resultProducer.apply(theBestOne.getKey(), theBestOne.getValue().getLocality());
} else if (matchByAdditionalRequirements != null) {
return resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL);
} else {
return null;
}
}
+
+ /**
+ * Helper class to record the match result.
+ */
+ private class CandidateMatchedResult {
+
+ private int localCount;
+
+ private int hostLocalCount;
+
+ public CandidateMatchedResult(int localCount, int hostLocalCount) {
+ this.localCount = localCount;
+ this.hostLocalCount = hostLocalCount;
+ }
+
+ // evaluate the match score
+ public int getScore() {
+ return localCount * 10 + hostLocalCount * 1;
+ }
+
+ // get the highest locality.
+ public Locality getLocality() {
+ return localCount > 0 ? Locality.LOCAL : Locality.HOST_LOCAL;
--- End diff --
What if also `hostLocal == 0`? We do still return `Locality.HOST_LOCAL`?
> Weigh list of preferred locations for scheduling
> ------------------------------------------------
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
> Issue Type: Improvement
> Components: Scheduler
> Affects Versions: 1.4.0, 1.3.2
> Reporter: Till Rohrmann
> Assignee: Sihua Zhou
> Priority: Major
> Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to decide where to schedule a task, but to also weigh the list according to how often a location appeared and then select the location based on the weight. That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)