You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/01 12:11:00 UTC

[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

    [ https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16497910#comment-16497910 ] 

ASF GitHub Bot commented on FLINK-7866:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4949#discussion_r192376303
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java ---
    @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc
     			}
     
     			// we build up two indexes, one for resource id and one for host names of the preferred locations.
    -			HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size());
    -			HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size());
    +			Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size());
    +			Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size());
     
     			for (TaskManagerLocation locationPreference : locationPreferences) {
    -				preferredResourceIDs.add(locationPreference.getResourceID());
    -				preferredFQHostNames.add(locationPreference.getFQDNHostname());
    +				Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
    +				preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
    +
    +				oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
    +				preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
     			}
     
     			Iterator<IN> iterator = candidates.iterator();
     
    -			IN matchByHostName = null;
     			IN matchByAdditionalRequirements = null;
     
    +			final Map<IN, CandidateMatchedResult> candidateMatchedResults = new HashMap<>();
    +
     			while (iterator.hasNext()) {
     
     				IN candidate = iterator.next();
     				SlotContext slotContext = contextExtractor.apply(candidate);
     
     				// this if checks if the candidate has is a local slot
    -				if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) {
    +				Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
    +				if (localWeigh != null)	{
     					if (additionalRequirementsFilter.test(candidate)) {
    -						// we can stop, because we found a match with best possible locality.
    -						return resultProducer.apply(candidate, Locality.LOCAL);
    +						// we found a match with locality.
    +						candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0));
     					} else {
     						// next candidate because this failed on the additional requirements.
     						continue;
     					}
    -				}
    -
    -				// this if checks if the candidate is at least host-local, if we did not find another host-local
    -				// candidate before.
    -				if (matchByHostName == null) {
    -					if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) {
    +				} else {
    +					// this if checks if the candidate is host-local.
    +					Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
    +					if (hostLocalWeigh != null) {
     						if (additionalRequirementsFilter.test(candidate)) {
    -							// We remember the candidate, but still continue because there might still be a candidate
    -							// that is local to the desired task manager.
    -							matchByHostName = candidate;
    +							// we found a match with host locality.
    +							candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh));
     						} else {
     							// next candidate because this failed on the additional requirements.
     							continue;
     						}
     					}
    +				}
     
    -					// this if checks if the candidate at least fulfils the resource requirements, and is only required
    -					// if we did not yet find a valid candidate with better locality.
    -					if (matchByAdditionalRequirements == null
    -						&& additionalRequirementsFilter.test(candidate)) {
    -						// Again, we remember but continue in hope for a candidate with better locality.
    -						matchByAdditionalRequirements = candidate;
    -					}
    +				// this if checks if the candidate at least fulfils the resource requirements, and is only required
    +				// if we did not yet find a valid candidate with better locality.
    +				if (candidateMatchedResults.isEmpty()
    +					&& matchByAdditionalRequirements == null
    +					&& additionalRequirementsFilter.test(candidate)) {
    +					// Again, we remember but continue in hope for a candidate with better locality.
    +					matchByAdditionalRequirements = candidate;
     				}
     			}
     
    +			// find the best matched one.
    +			Map.Entry<IN, CandidateMatchedResult> theBestOne = candidateMatchedResults.entrySet()
    +					.stream()
    +					.sorted(Comparator.comparingInt((Map.Entry<IN, CandidateMatchedResult> matchResult)
    +						-> matchResult.getValue().getScore()).reversed())
    +					.findFirst()
    +					.orElse(null);
    +
     			// at the end of the iteration, we return the candidate with best possible locality or null.
    -			if (matchByHostName != null) {
    -				return resultProducer.apply(matchByHostName, Locality.HOST_LOCAL);
    +			if (theBestOne != null) {
    +				return resultProducer.apply(theBestOne.getKey(), theBestOne.getValue().getLocality());
     			} else if (matchByAdditionalRequirements != null) {
     				return resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL);
     			} else {
     				return null;
     			}
     		}
    +
    +		/**
    +		 * Helper class to record the match result.
    +		 */
    +		private class CandidateMatchedResult {
    +
    +			private int localCount;
    +
    +			private int hostLocalCount;
    +
    +			public CandidateMatchedResult(int localCount, int hostLocalCount) {
    +				this.localCount = localCount;
    +				this.hostLocalCount = hostLocalCount;
    +			}
    +
    +			// evaluate the match score
    +			public int getScore() {
    +				return localCount * 10 + hostLocalCount * 1;
    +			}
    +
    +			// get the highest locality.
    +			public Locality getLocality() {
    +				return localCount > 0 ? Locality.LOCAL : Locality.HOST_LOCAL;
    --- End diff --
    
    What if also `hostLocal == 0`? We do still return `Locality.HOST_LOCAL`?


> Weigh list of preferred locations for scheduling
> ------------------------------------------------
>
>                 Key: FLINK-7866
>                 URL: https://issues.apache.org/jira/browse/FLINK-7866
>             Project: Flink
>          Issue Type: Improvement
>          Components: Scheduler
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Till Rohrmann
>            Assignee: Sihua Zhou
>            Priority: Major
>             Fix For: 1.6.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to decide where to schedule a task, but to also weigh the list according to how often a location appeared and then select the location based on the weight. That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)