You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/10/27 17:27:43 UTC

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4916

    [FLINK-7153] Re-introduce preferred locations for scheduling

    ## What is the purpose of the change
    
    This PR makes the `TaskManagerLocation` accessible for asynchronous scheduling.
    
    Due to changes for Flink 1.3 where we introduced asynchronous scheduling, it was not always guaranteed that the scheduler knew about the scheduling locations of producer tasks. Especially the eager scheduling mode was affected since the slot allocation happened concurrently.
    
    In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to each `Execution`. In eager scheduling mode, a slot will only be requested for a task if all its inputs have a slot assigned (e.g. their `TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't wait for the completion of all inputs, but take those inputs which are already known.
    
    In order to distinguish whether we want to wait for all or take all available task manager locations, we add a `LocationPreferenceConstraint` which has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs to have a location assigned, and `ANY` means that we take what's currently known.
    
    In order to not deploy slots prematurely in eager mode, the slot assignment has been factored out into its own step. Before, one had to call `Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started the deployment. Now, one has to call `Execution#tryAssignResource` before one can call `Execution#deploy`.
    
    Moreover this PR fixes that the `FailoverRegions` are topologically sorted which is important for non queued scheduling.
    
    FYI @StephanEwen 
    
    ## Brief change log
    
    - Introduce `LocationPreferenceConstraint` to distinguish the waiting behaviour for the preferred locations
    - Split slot assignment and deployment into two separate steps
    - Moved preferred location calculation into the Execution to reduce code duplication between the `Scheduler` and the `SlotPool`
    - Changed preferred location calculation to be blocking if `LocationPreferenceConstraint#ALL` and not all input locations are known
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - Added `ExecutionTest` to check the correct assigned slot release in case of cancellation and to check the correct preferred location calculation
    - Added `ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations` to check that eager scheduling waits for all inputs to be assigned before scheduling consumer tasks
    - Moreover, the scheduler is being tested by existing tests such as `SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases for lazy scheduling (batch case)
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4916.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4916
    
----
commit 32eb1812583b84d80091d1a278d53ed663d8a065
Author: Till <ti...@gmail.com>
Date:   2017-10-16T12:04:13Z

    [FLINK-7153] Re-introduce preferred locations for scheduling

commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-27T07:47:03Z

    [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling
    
    The LocationPreferenceConstraint defines whether all or any preferred locations
    have to be taken into consideration when scheduling tasks. Especially for batch
    jobs where we do lazy scheduling not all input locations might be known for a
    consumer task. Therefore, we set the location preference constraint to any which
    means that only those location are taken into consideration which are known at
    scheduling time.

commit c821e67529deaaed96f183fc22bc0a9fe246fa23
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-26T16:22:43Z

    [hotfix] Make failover region topological sorted

commit 67baeade85e26758978bcdf7982576a2f4192aae
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-10-27T17:08:15Z

    [FLINK-7153] Add test cases

----


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4916


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148363944
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    --- End diff --
    
    If I read it correctly, case `ANY` can complete without any input being ready (all being not yet done), returning a completed future with an empty collection. Is that intended semantics?


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148479985
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
     	 * @return The preferred locations based in input streams, or an empty iterable,
     	 *         if there is no input-based preference.
     	 */
    -	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
    +	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
     		// otherwise, base the preferred locations on the input connections
     		if (inputEdges == null) {
     			return Collections.emptySet();
     		}
     		else {
    -			Set<TaskManagerLocation> locations = new HashSet<>();
    -			Set<TaskManagerLocation> inputLocations = new HashSet<>();
    +			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
    --- End diff --
    
    For example in the case of the broadcast join with lazy scheduling, it could be the case that the broadcasting operator produces first and thus triggers the `scheduleOrUpdateConsumers` call on the `ExecutionGraph`. This will then trigger the scheduling of the join operator. At this time, there might only be the location of the broadcast operator known. However, since we only return the forward operator's location future which has not been completed, the lazy scheduling will schedule without location preference because the `LocationPreferenceConstraint` is `ANY`.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148482755
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    +				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(1);
    --- End diff --
    
    Maybe it's better to initialize the array with the number of returned futures to avoid resizing completely.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148473363
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -220,14 +233,54 @@ public long getGlobalModVersion() {
     		return globalModVersion;
     	}
     
    +	public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
    +		return taskManagerLocationFuture;
    +	}
    +
     	public SimpleSlot getAssignedResource() {
    -		return assignedResource;
    +		return assignedResource.get();
    +	}
    +
    +	/**
    +	 * Tries to assign the given slot to the execution. The assignment works only if the
    +	 * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
    +	 *
    +	 * @param slot to assign to this execution
    +	 * @return true if the slot could be assigned to the execution, otherwise false
    +	 */
    +	@VisibleForTesting
    +	boolean tryAssignResource(final SimpleSlot slot) {
    +		Preconditions.checkNotNull(slot);
    --- End diff --
    
    Sure, will reformat it accordingly.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148358215
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    Having a scheduling mechanism that tries to satisfy both state locality and input locality could be interesting. Input locality may extend to locality of input partitions (in Kafka for example) as well, which makes it even more complicated.
    
    I think the current state of the heuristic is: previous location first (later: state locality first), if that leaves freedom, try to schedule based on inputs.
    
    I can see an extended variant where we first collect all vertices with constraints, try to satisfy those. They may in turn add more constraints (or preferences) which should be satisfied next. Repeat until all are satisfied, or it is not possible to satisfy the preferences any more. But that is a pretty big change, that we should discus and design properly, not push it into a bug fix.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148209063
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    There is no specific reason why we iterate over the vertices in topological order. We could also choose a completely random order for eager scheduling because the scheduling order will be determined by the preferred location futures (which at the moment is based on inputs only). If we should switch to state location then it basically means that we schedule the individual tasks independently because the vertices don't depend on the input locations.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148209318
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    --- End diff --
    
    Yes, that is exactly the idea. 


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148526103
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    +				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(1);
    --- End diff --
    
    I would almost leave it at `1` or at the default `10`, so as to not have large unused arrays lying around.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148366296
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
     	 * @return The preferred locations based in input streams, or an empty iterable,
     	 *         if there is no input-based preference.
     	 */
    -	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
    +	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
     		// otherwise, base the preferred locations on the input connections
     		if (inputEdges == null) {
     			return Collections.emptySet();
     		}
     		else {
    -			Set<TaskManagerLocation> locations = new HashSet<>();
    -			Set<TaskManagerLocation> inputLocations = new HashSet<>();
    +			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
    --- End diff --
    
    The code here changes some semantics:
    
      - Originally: Determine preferred input location per input gate. If one gate has too many candidates, and one gate as few candidates, then these few candidates are the preference. Example: a broadcast join where one input if *broadcast*, the other is *forward*. The code would pick the locality preference for the "forward" input.
    
      - Now: All input channels contribute to the same locality preference pool. If one input has too many candidates, no locality preferences exist at all. In the broadcast join case, the forward input is not taken into account any more.
    
    Is that intended? I think the broadcast join case is a good example why the per-input(gate) treatment is helpful.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148216607
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    In that case, it depends a bit on how the scheduler values the state location preference. If it is implemented that it strictly schedules tasks to its previous state location, then it could happen that these tasks don't end up in the same slot as other tasks with which they shared a slot before.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148360739
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -220,14 +233,54 @@ public long getGlobalModVersion() {
     		return globalModVersion;
     	}
     
    +	public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
    +		return taskManagerLocationFuture;
    +	}
    +
     	public SimpleSlot getAssignedResource() {
    -		return assignedResource;
    +		return assignedResource.get();
    +	}
    +
    +	/**
    +	 * Tries to assign the given slot to the execution. The assignment works only if the
    +	 * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
    +	 *
    +	 * @param slot to assign to this execution
    +	 * @return true if the slot could be assigned to the execution, otherwise false
    +	 */
    +	@VisibleForTesting
    +	boolean tryAssignResource(final SimpleSlot slot) {
    +		Preconditions.checkNotNull(slot);
    --- End diff --
    
    Style: This class uses partially `Preconditions.checkX(...)` and partially statically imported `checkX(...)`. Can we make thsi homogeneous?


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148211942
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    If we switch to state location then we can't allocate resources according to the order of topologically, because stateless vertices may share the same  SlotSharingGroup with stateful vertices, if stateless vertices allocated before the stateful vertices, the result can be bad.  An intuitive way to do this is to allocate resources to stateful vertices firstly.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148474674
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---
    @@ -133,32 +133,33 @@ public void shutdown() {
     
     
     	@Override
    -	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
    +	public CompletableFuture<SimpleSlot> allocateSlot(
    +			ScheduledUnit task,
    +			boolean allowQueued,
    +			Collection<TaskManagerLocation> preferredLocations) {
    +
     		try {
    -			final Object ret = scheduleTask(task, allowQueued);
    +			final Object ret = scheduleTask(task, allowQueued, preferredLocations);
     
     			if (ret instanceof SimpleSlot) {
     				return CompletableFuture.completedFuture((SimpleSlot) ret);
    -			}
    -			else if (ret instanceof CompletableFuture) {
    +			} else if (ret instanceof CompletableFuture) {
    --- End diff --
    
    Beauty lies in the eye of the bee holder ;-) I'll revert it.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148366967
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---
    @@ -133,32 +133,33 @@ public void shutdown() {
     
     
     	@Override
    -	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
    +	public CompletableFuture<SimpleSlot> allocateSlot(
    +			ScheduledUnit task,
    +			boolean allowQueued,
    +			Collection<TaskManagerLocation> preferredLocations) {
    +
     		try {
    -			final Object ret = scheduleTask(task, allowQueued);
    +			final Object ret = scheduleTask(task, allowQueued, preferredLocations);
     
     			if (ret instanceof SimpleSlot) {
     				return CompletableFuture.completedFuture((SimpleSlot) ret);
    -			}
    -			else if (ret instanceof CompletableFuture) {
    +			} else if (ret instanceof CompletableFuture) {
    --- End diff --
    
    Ah, all the nice spacey-easy-to-parse formatting going away here...


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by summerleafs <gi...@git.apache.org>.
Github user summerleafs commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r147889602
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    Hi, allocate resources according to the order of topologically, is just to facilitate the optimization of 'allocate resource base on prefer input'? it may cause bad result if we allocate base on state.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148473280
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -126,9 +134,11 @@
     	/** A future that completes once the Execution reaches a terminal ExecutionState */
     	private final CompletableFuture<ExecutionState> terminationFuture;
     
    +	private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
    +
     	private volatile ExecutionState state = CREATED;
     
    -	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
    +	private final AtomicReference<SimpleSlot> assignedResource;
    --- End diff --
    
    Good point. I'll change it.


---

[GitHub] flink issue #4916: [FLINK-7153] Re-introduce preferred locations for schedul...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4916
  
    Thanks a lot for your review @StephanEwen and @sihuazhou. I've addressed all your comments @StephanEwen. 
    
    Next thing I'll do is to rebase onto the latest master and if Travis gives green light and you have no further objections, then I would like to merge it.


---

[GitHub] flink issue #4916: [FLINK-7153] Re-introduce preferred locations for schedul...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4916
  
    Thanks for the follow-up!
    
    +1 to merge this


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148437379
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
     	 * @return The preferred locations based in input streams, or an empty iterable,
     	 *         if there is no input-based preference.
     	 */
    -	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
    +	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
     		// otherwise, base the preferred locations on the input connections
     		if (inputEdges == null) {
     			return Collections.emptySet();
     		}
     		else {
    -			Set<TaskManagerLocation> locations = new HashSet<>();
    -			Set<TaskManagerLocation> inputLocations = new HashSet<>();
    +			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
    --- End diff --
    
    I think if we weigh inputs in scheduler maybe this problem can be solved. In above example, imagine that, a is "broadcast" and it downstream‘s parallelism is 10, and b is "forward" , then every ExecutionEdge for a's weight is ```a-total-weight / 10``` and ExecutionEdge of b's weight is ```b-total-weight / 1```, we can evaluate total weight according to vertex's throughput, currently, all vertex's total weight are equal, so location of b's input will be picked.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148364165
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    +				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(1);
    --- End diff --
    
    Is the intention to return the first result available here (assuming from the size 1 initialization of the array list)?
    If yes, should the loop below break after the input with a completed location future?


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148216894
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    But be aware that the `allocateAndAssign` call is non-blocking and the actual order depends on the preferred locations futures.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148474427
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    +				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(1);
    --- End diff --
    
    No, the intention is to return all currently known locations here. Usually there will only be one input because `ANY` is used by lazy scheduling. Initializing it with size 1 is a compromise between size and resizing costs. We could also initialize it with the number of inputs (since this is a small number) but in most cases not all inputs will be known for `ANY`.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by summerleafs <gi...@git.apache.org>.
Github user summerleafs commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r147891494
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    --- End diff --
    
    Hi Till,`getPreferredLocations()` is not invoked here because flink doesn't yet support reading the checkpoint data locally? I have create a issue for flink reading checkpoint locally [here](https://issues.apache.org/jira/browse/FLINK-7873?filter=-1), when it complete i wonder if we can invoke `getPreferedLocations()` instead of `getPreferredLocationsBasedOnInputs()`.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148367405
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    +log4j.rootLogger=INFO, console
    --- End diff --
    
    Undo log level setting


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148360937
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -126,9 +134,11 @@
     	/** A future that completes once the Execution reaches a terminal ExecutionState */
     	private final CompletableFuture<ExecutionState> terminationFuture;
     
    +	private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
    +
     	private volatile ExecutionState state = CREATED;
     
    -	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
    +	private final AtomicReference<SimpleSlot> assignedResource;
    --- End diff --
    
    Minor comment: Since the resource is accessed quite frequently, using a volatile variable with an  `AtomicReferenceFieldUpdater` could be good here.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148477247
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---
    @@ -476,14 +482,13 @@ else if (numSources < parallelism) {
     	 * @return The preferred locations based in input streams, or an empty iterable,
     	 *         if there is no input-based preference.
     	 */
    -	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
    +	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
     		// otherwise, base the preferred locations on the input connections
     		if (inputEdges == null) {
     			return Collections.emptySet();
     		}
     		else {
    -			Set<TaskManagerLocation> locations = new HashSet<>();
    -			Set<TaskManagerLocation> inputLocations = new HashSet<>();
    +			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
    --- End diff --
    
    This change was not entirely intended and the previous code makes totally sense with your explanation. I'm actually in favour of reverting my changes to not change the semantics for the moment.
    
    However, for the future, I'm wondering whether this kind of decision should be made by the `ExecutionVertex` or whether it shouldn't rather be the task of the `Scheduler` to make this kind of decision. 
    
    For example, what if a task has multiple input gates and one of them with exactly one producer. Then it will only return the location of this single producer. Now if this TM has no more slots left, then we would basically randomly pick another slot even though there might be other TMs one which another producer for this task would run.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148474569
  
    --- Diff: flink-runtime/src/test/resources/log4j-test.properties ---
    @@ -16,7 +16,7 @@
     # limitations under the License.
     ################################################################################
     
    -log4j.rootLogger=OFF, console
    +log4j.rootLogger=INFO, console
    --- End diff --
    
    Good catch. Will revert it.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148434074
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
     		// that way we do not have any operation that can fail between allocating the slots
     		// and adding them to the list. If we had a failure in between there, that would
     		// cause the slots to get lost
    -		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
     		final boolean queued = allowQueuedScheduling;
     
    -		// we use this flag to handle failures in a 'finally' clause
    -		// that allows us to not go through clumsy cast-and-rethrow logic
    -		boolean successful = false;
    +		// collecting all the slots may resize and fail in that operation without slots getting lost
    +		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
     
    -		try {
    -			// collecting all the slots may resize and fail in that operation without slots getting lost
    -			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		// allocate the slots (obtain all their futures
    +		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +			// these calls are not blocking, they only return futures
    --- End diff --
    
    Aha, just by the way, scheduling according to the weight maybe not a bad choice. For both state and inputs, we can weigh them (maybe can weigh state according to it's size and weigh inputs according to it's throughput), then schedule according to the weight. 
    
    This method can be easy to extend for other factors that we want to take account in scheduler.


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148526210
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---
    @@ -133,32 +133,33 @@ public void shutdown() {
     
     
     	@Override
    -	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
    +	public CompletableFuture<SimpleSlot> allocateSlot(
    +			ScheduledUnit task,
    +			boolean allowQueued,
    +			Collection<TaskManagerLocation> preferredLocations) {
    +
     		try {
    -			final Object ret = scheduleTask(task, allowQueued);
    +			final Object ret = scheduleTask(task, allowQueued, preferredLocations);
     
     			if (ret instanceof SimpleSlot) {
     				return CompletableFuture.completedFuture((SimpleSlot) ret);
    -			}
    -			else if (ret instanceof CompletableFuture) {
    +			} else if (ret instanceof CompletableFuture) {
    --- End diff --
    
    No worries, just teasing here ;-)


---

[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148473906
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
     	//  Miscellaneous
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Calculates the preferred locations based on the location preference constraint.
    +	 *
    +	 * @param locationPreferenceConstraint constraint for the location preference
    +	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
    +	 * 		have been a resource assigned.
    +	 */
    +	@VisibleForTesting
    +	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
    +		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
    +
    +		switch(locationPreferenceConstraint) {
    +			case ALL:
    +				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
    +				break;
    +			case ANY:
    --- End diff --
    
    Yes that is the semantics of `ANY` at the moment. You basically do the scheduling decision based on what you currently know. In the worst case, you don't know any input location yet. I had to introduce this mode for lazy scheduling. In this case, however, you should always know at least one input location.


---