You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/01 12:10:59 UTC

[GitHub] flink pull request #4949: [FLINK-7866] [runtime] Weigh list of preferred loc...

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4949#discussion_r192376303
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java ---
    @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc
     			}
     
     			// we build up two indexes, one for resource id and one for host names of the preferred locations.
    -			HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size());
    -			HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size());
    +			Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size());
    +			Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size());
     
     			for (TaskManagerLocation locationPreference : locationPreferences) {
    -				preferredResourceIDs.add(locationPreference.getResourceID());
    -				preferredFQHostNames.add(locationPreference.getFQDNHostname());
    +				Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0);
    +				preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1);
    +
    +				oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0);
    +				preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1);
     			}
     
     			Iterator<IN> iterator = candidates.iterator();
     
    -			IN matchByHostName = null;
     			IN matchByAdditionalRequirements = null;
     
    +			final Map<IN, CandidateMatchedResult> candidateMatchedResults = new HashMap<>();
    +
     			while (iterator.hasNext()) {
     
     				IN candidate = iterator.next();
     				SlotContext slotContext = contextExtractor.apply(candidate);
     
     				// this if checks if the candidate has is a local slot
    -				if (preferredResourceIDs.contains(slotContext.getTaskManagerLocation().getResourceID())) {
    +				Integer localWeigh = preferredResourceIDs.get(slotContext.getTaskManagerLocation().getResourceID());
    +				if (localWeigh != null)	{
     					if (additionalRequirementsFilter.test(candidate)) {
    -						// we can stop, because we found a match with best possible locality.
    -						return resultProducer.apply(candidate, Locality.LOCAL);
    +						// we found a match with locality.
    +						candidateMatchedResults.put(candidate, new CandidateMatchedResult(localWeigh, 0));
     					} else {
     						// next candidate because this failed on the additional requirements.
     						continue;
     					}
    -				}
    -
    -				// this if checks if the candidate is at least host-local, if we did not find another host-local
    -				// candidate before.
    -				if (matchByHostName == null) {
    -					if (preferredFQHostNames.contains(slotContext.getTaskManagerLocation().getFQDNHostname())) {
    +				} else {
    +					// this if checks if the candidate is host-local.
    +					Integer hostLocalWeigh = preferredFQHostNames.get(slotContext.getTaskManagerLocation().getFQDNHostname());
    +					if (hostLocalWeigh != null) {
     						if (additionalRequirementsFilter.test(candidate)) {
    -							// We remember the candidate, but still continue because there might still be a candidate
    -							// that is local to the desired task manager.
    -							matchByHostName = candidate;
    +							// we found a match with host locality.
    +							candidateMatchedResults.put(candidate, new CandidateMatchedResult(0, hostLocalWeigh));
     						} else {
     							// next candidate because this failed on the additional requirements.
     							continue;
     						}
     					}
    +				}
     
    -					// this if checks if the candidate at least fulfils the resource requirements, and is only required
    -					// if we did not yet find a valid candidate with better locality.
    -					if (matchByAdditionalRequirements == null
    -						&& additionalRequirementsFilter.test(candidate)) {
    -						// Again, we remember but continue in hope for a candidate with better locality.
    -						matchByAdditionalRequirements = candidate;
    -					}
    +				// this if checks if the candidate at least fulfils the resource requirements, and is only required
    +				// if we did not yet find a valid candidate with better locality.
    +				if (candidateMatchedResults.isEmpty()
    +					&& matchByAdditionalRequirements == null
    +					&& additionalRequirementsFilter.test(candidate)) {
    +					// Again, we remember but continue in hope for a candidate with better locality.
    +					matchByAdditionalRequirements = candidate;
     				}
     			}
     
    +			// find the best matched one.
    +			Map.Entry<IN, CandidateMatchedResult> theBestOne = candidateMatchedResults.entrySet()
    +					.stream()
    +					.sorted(Comparator.comparingInt((Map.Entry<IN, CandidateMatchedResult> matchResult)
    +						-> matchResult.getValue().getScore()).reversed())
    +					.findFirst()
    +					.orElse(null);
    +
     			// at the end of the iteration, we return the candidate with best possible locality or null.
    -			if (matchByHostName != null) {
    -				return resultProducer.apply(matchByHostName, Locality.HOST_LOCAL);
    +			if (theBestOne != null) {
    +				return resultProducer.apply(theBestOne.getKey(), theBestOne.getValue().getLocality());
     			} else if (matchByAdditionalRequirements != null) {
     				return resultProducer.apply(matchByAdditionalRequirements, Locality.NON_LOCAL);
     			} else {
     				return null;
     			}
     		}
    +
    +		/**
    +		 * Helper class to record the match result.
    +		 */
    +		private class CandidateMatchedResult {
    +
    +			private int localCount;
    +
    +			private int hostLocalCount;
    +
    +			public CandidateMatchedResult(int localCount, int hostLocalCount) {
    +				this.localCount = localCount;
    +				this.hostLocalCount = hostLocalCount;
    +			}
    +
    +			// evaluate the match score
    +			public int getScore() {
    +				return localCount * 10 + hostLocalCount * 1;
    +			}
    +
    +			// get the highest locality.
    +			public Locality getLocality() {
    +				return localCount > 0 ? Locality.LOCAL : Locality.HOST_LOCAL;
    --- End diff --
    
    What if also `hostLocal == 0`? We do still return `Locality.HOST_LOCAL`?


---