You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2017/02/12 23:24:15 UTC

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/3295

    [FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk

    ## Problem Addressed
    
    Currently, eager scheduling immediately triggers the scheduling for all vertices and their subtasks in topological order.
    
    This has two problems:
    
      - This works only, as long as resource acquisition is "synchronous". With dynamic resource acquisition in FLIP-6, the resources are returned as Futures which may complete out of order. This results in out-of-order (not in topological order) scheduling of tasks which does not work for streaming.
    
      - Deploying some tasks that depend on other tasks before it is clear that the other tasks have resources as well leads to situations where many deploy/recovery cycles happen before enough resources are available to get the job running fully.
    
    ## Implemented Change
    
      - The `Execution` has separate methods to allocate a resource and to deploy the task to that resource
      - The **eager** scheduling mode allocates all resources in one chunk and then deploys once all resources are available.
    
    As a utility, this implements the `FutureUtils.combineAll` method that combines the Futures of the individual resources to a combined Future.
    
    ## Tests
    
    The main tests are in `ExecutionGraphSchedulingTest`. The used utilities are tested in `FutureUtilsTest` and in `ExecutionGraphUtilsTest`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink slot_scheduling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3295
    
----
commit 1f18cbb0d6d119fa5e5c4803201c28887b90cef5
Author: Stephan Ewen <se...@apache.org>
Date:   2017-02-03T19:26:23Z

    [FLINK-5747] [distributed coordination] Eager scheduling allocates slots and deploys tasks in bulk
    
    That way, strictly topological deployment can be guaranteed.
    
    Also, many quick deploy/not-enough-resources/fail/recover cycles can be
    avoided in the cases where resources need some time to appear.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101753597
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph;
    +
    +import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.instance.SimpleSlot;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import java.util.List;
    +
    +/**
    + * Utilities for dealing with the execution graphs and scheduling.
    + */
    +public class ExecutionGraphUtils {
    +
    +	/**
    +	 * Releases the slot represented by the given future. If the future is complete, the
    +	 * slot is immediately released. Otherwise, the slot is released as soon as the future
    +	 * is completed.
    +	 * 
    +	 * <p>Note that releasing the slot means cancelling any task execution currently
    +	 * associated with that slot.
    +	 * 
    +	 * @param slotFuture The future for the slot to release.
    +	 */
    +	public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
    +		slotFuture.handle(ReleaseSlotFunction.INSTANCE);
    +	}
    +
    +	/**
    +	 * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
    +	 * For each future in that collection holds: If the future is complete, its slot is
    +	 * immediately released. Otherwise, the slot is released as soon as the future
    +	 * is completed.
    +	 * 
    +	 * <p>This methods never throws any exceptions (subclasses of {@code Exception})
    +	 * and continues to release the remaining slots if one slot release failed. We only
    +	 * catch Exceptions here (and not other throwables) because the code executed while
    +	 * releasing slot does not involve any dynamic 
    --- End diff --
    
    true, will fix that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101829019
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    +
    +		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
    +
    +		for (Future<?> future : futures) {
    +			future.handle(conjunct.completionHandler);
    +		}
    +
    +		return conjunct;
    +	}
    +
    +	/**
    +	 * A future that is complete once multiple other futures completed. The futures are not
    +	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 * 
    +	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
    +	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
    +	 * many of the Futures are already complete.
    +	 */
    +	public interface ConjunctFuture extends CompletableFuture<Void> {
    +
    +		/**
    +		 * Gets the total number of Futures in the conjunction.
    +		 * @return The total number of Futures in the conjunction.
    +		 */
    +		int getNumFuturesTotal();
    +
    +		/**
    +		 * Gets the number of Futures in the conjunction that are already complete.
    +		 * @return The number of Futures in the conjunction that are already complete
    +		 */
    +		int getNumFuturesCompleted();
    +	}
    +
    +	/**
    +	 * The implementation of the {@link ConjunctFuture}.
    +	 * 
    +	 * <p>Implementation notice: The member fields all have package-private access, because they are
    +	 * either accessed by an inner subclass or by the enclosing class.
    +	 */
    +	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    --- End diff --
    
    Yes, with set rather then add it should work. Since the list gets initialized with an array, I would actually just use an array in the first place.
    
    Followup ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101747876
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---
    @@ -41,7 +41,7 @@
      * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
      * parallelism can be set via {@link #setParallelism(int)}.
      */
    -@Public
    +@Internal
    --- End diff --
    
    :+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101753433
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.junit.Assert.*;
    +
    +/**
    + * Tests for the utility methods in {@link FutureUtils}
    + */
    +public class FutureUtilsTest {
    +
    +	@Test
    +	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
    +		try {
    +			FutureUtils.combineAll(null);
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Collections.<Future<?>>emptyList());
    +			fail();
    +		} catch (IllegalArgumentException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Arrays.asList(
    +					new FlinkCompletableFuture<Object>(),
    +					null,
    +					new FlinkCompletableFuture<Object>()));
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +	}
    +
    +	@Test
    +	public void testConjunctFutureCompletion() throws Exception {
    +		// some futures that we combine
    +		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
    +
    +		// some future is initially completed
    +		future2.complete(new Object());
    +
    +		// build the conjunct future
    +		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
    +
    +		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
    +			@Override
    +			public void accept(Void value) {}
    +		});
    --- End diff --
    
    This is simply added as a test that applyied functions (and their result futures) are not called before completion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101829659
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    +
    +				// we need to first add the slots to this list, to be safe on release
    +				resources.add(slots);
    +
    +				for (ExecutionAndSlot ens : slots) {
    +					slotFutures.add(ens.slotFuture);
    +				}
    +			}
    +
    +			// this future is complete once all slot futures are complete.
    +			// the future fails once one slot future fails.
    +			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
    --- End diff --
    
    True, it is not incorrect. But some tasks would be already deployed if we start as soon as some futures are ready. They would need to be canceled again, which gives these not so nice fast deploy/out-of-resource/cancel/wait-for-cancellation/retry/etc loops.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101737070
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    +
    +		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
    +
    +		for (Future<?> future : futures) {
    +			future.handle(conjunct.completionHandler);
    +		}
    +
    +		return conjunct;
    +	}
    +
    +	/**
    +	 * A future that is complete once multiple other futures completed. The futures are not
    +	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 * 
    +	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
    +	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
    +	 * many of the Futures are already complete.
    +	 */
    +	public interface ConjunctFuture extends CompletableFuture<Void> {
    +
    +		/**
    +		 * Gets the total number of Futures in the conjunction.
    +		 * @return The total number of Futures in the conjunction.
    +		 */
    +		int getNumFuturesTotal();
    +
    +		/**
    +		 * Gets the number of Futures in the conjunction that are already complete.
    +		 * @return The number of Futures in the conjunction that are already complete
    +		 */
    +		int getNumFuturesCompleted();
    +	}
    +
    +	/**
    +	 * The implementation of the {@link ConjunctFuture}.
    +	 * 
    +	 * <p>Implementation notice: The member fields all have package-private access, because they are
    +	 * either accessed by an inner subclass or by the enclosing class.
    +	 */
    +	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    --- End diff --
    
    I like the idea of the ConjunctFuture :-) I was wondering whether we can generalize it a little bit more by also collecting the actual values. Then the ConjunctFuture would effectively return a collection of the common base type of all registered futures. Here is a commit where I tried it out: https://github.com/tillrohrmann/flink/commit/f1f5ab63bfe75d629230e0fc2cf37d2499d85548. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101766842
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    +
    +				// we need to first add the slots to this list, to be safe on release
    +				resources.add(slots);
    +
    +				for (ExecutionAndSlot ens : slots) {
    +					slotFutures.add(ens.slotFuture);
    +				}
    +			}
    +
    +			// this future is complete once all slot futures are complete.
    +			// the future fails once one slot future fails.
    +			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
    --- End diff --
    
    Shouldn't the `fail` operations be idempotent and only take effect for the first failure?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101746547
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    +
    +				// we need to first add the slots to this list, to be safe on release
    +				resources.add(slots);
    +
    +				for (ExecutionAndSlot ens : slots) {
    +					slotFutures.add(ens.slotFuture);
    +				}
    +			}
    +
    +			// this future is complete once all slot futures are complete.
    +			// the future fails once one slot future fails.
    +			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
    --- End diff --
    
    Just a thought: Right now we're waiting for all futures to complete. But couldn't we also create graph of dependencies mirroring the topological ordering by combining multiple `ConjunctFutures` where each `ConjunctFuture` represents the inputs of a given vertex. That way, we could speed up the deployment a little bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101739380
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    --- End diff --
    
    However, as I see, we would have to reflect the same order in the collection because it requires a topological order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101827724
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    --- End diff --
    
    Yes, will change that...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101753992
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -292,7 +305,8 @@ public ExecutionGraph(
     		this.stateTimestamps = new long[JobStatus.values().length];
     		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
     
    -		this.timeout = timeout;
    +		this.rpcCallTimeout = timeout;
    --- End diff --
    
    will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101731399
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    --- End diff --
    
    Couldn't an empty futures list return a completed `ConjunctFuture`? This would resemble a little bit more the \u2200 semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101753888
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    +
    +				// we need to first add the slots to this list, to be safe on release
    +				resources.add(slots);
    +
    +				for (ExecutionAndSlot ens : slots) {
    +					slotFutures.add(ens.slotFuture);
    +				}
    +			}
    +
    +			// this future is complete once all slot futures are complete.
    +			// the future fails once one slot future fails.
    +			final ConjunctFuture allAllocationsComplete = FutureUtils.combineAll(slotFutures);
    --- End diff --
    
    I thought about that, and one purpose of this change is to avoid many partial deployments / failures when not all resources are available.
    
    In the "FLIP-1" work, we would introduce something like "domains of tasks that schedule and fail together". We can schedule them topologically independently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101766916
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.junit.Assert.*;
    +
    +/**
    + * Tests for the utility methods in {@link FutureUtils}
    + */
    +public class FutureUtilsTest {
    +
    +	@Test
    +	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
    +		try {
    +			FutureUtils.combineAll(null);
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Collections.<Future<?>>emptyList());
    +			fail();
    +		} catch (IllegalArgumentException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Arrays.asList(
    +					new FlinkCompletableFuture<Object>(),
    +					null,
    +					new FlinkCompletableFuture<Object>()));
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +	}
    +
    +	@Test
    +	public void testConjunctFutureCompletion() throws Exception {
    +		// some futures that we combine
    +		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
    +
    +		// some future is initially completed
    +		future2.complete(new Object());
    +
    +		// build the conjunct future
    +		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
    +
    +		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
    +			@Override
    +			public void accept(Void value) {}
    +		});
    --- End diff --
    
    ah makes sense :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3295: [FLINK-5747] [distributed coordination] Eager scheduling ...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3295
  
    +1 for merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101737967
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -292,7 +305,8 @@ public ExecutionGraph(
     		this.stateTimestamps = new long[JobStatus.values().length];
     		this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
     
    -		this.timeout = timeout;
    +		this.rpcCallTimeout = timeout;
    --- End diff --
    
    Maybe we could add a null check here. I think I forgot it initially.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101765953
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    +
    +		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
    +
    +		for (Future<?> future : futures) {
    +			future.handle(conjunct.completionHandler);
    +		}
    +
    +		return conjunct;
    +	}
    +
    +	/**
    +	 * A future that is complete once multiple other futures completed. The futures are not
    +	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 * 
    +	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
    +	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
    +	 * many of the Futures are already complete.
    +	 */
    +	public interface ConjunctFuture extends CompletableFuture<Void> {
    +
    +		/**
    +		 * Gets the total number of Futures in the conjunction.
    +		 * @return The total number of Futures in the conjunction.
    +		 */
    +		int getNumFuturesTotal();
    +
    +		/**
    +		 * Gets the number of Futures in the conjunction that are already complete.
    +		 * @return The number of Futures in the conjunction that are already complete
    +		 */
    +		int getNumFuturesCompleted();
    +	}
    +
    +	/**
    +	 * The implementation of the {@link ConjunctFuture}.
    +	 * 
    +	 * <p>Implementation notice: The member fields all have package-private access, because they are
    +	 * either accessed by an inner subclass or by the enclosing class.
    +	 */
    +	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    --- End diff --
    
    Doesn't the `AtomicInteger` make it thread safe? Every call to the BiFunction will manipulate a distinct array field. And the array is fixed, so no resizing operation can take place.
    
    Btw: I think it should be `set(index, element)` instead of `add(index, element)` because of the fixed nature of the array list.
    
    Yes please do it as a follow-up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101747520
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.concurrent;
    +
    +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
    +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
    +
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.concurrent.ExecutionException;
    +
    +import static org.junit.Assert.*;
    +
    +/**
    + * Tests for the utility methods in {@link FutureUtils}
    + */
    +public class FutureUtilsTest {
    +
    +	@Test
    +	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
    +		try {
    +			FutureUtils.combineAll(null);
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Collections.<Future<?>>emptyList());
    +			fail();
    +		} catch (IllegalArgumentException ignored) {}
    +
    +		try {
    +			FutureUtils.combineAll(Arrays.asList(
    +					new FlinkCompletableFuture<Object>(),
    +					null,
    +					new FlinkCompletableFuture<Object>()));
    +			fail();
    +		} catch (NullPointerException ignored) {}
    +	}
    +
    +	@Test
    +	public void testConjunctFutureCompletion() throws Exception {
    +		// some futures that we combine
    +		CompletableFuture<Object> future1 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future2 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future3 = new FlinkCompletableFuture<>();
    +		CompletableFuture<Object> future4 = new FlinkCompletableFuture<>();
    +
    +		// some future is initially completed
    +		future2.complete(new Object());
    +
    +		// build the conjunct future
    +		ConjunctFuture result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3, future4));
    +
    +		Future<Void> resultMapped = result.thenAccept(new AcceptFunction<Void>() {
    +			@Override
    +			public void accept(Void value) {}
    +		});
    --- End diff --
    
    Why do we add this `AcceptFunction`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3295


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101754578
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ---
    @@ -88,4 +100,104 @@ public RetryException(Throwable cause) {
     			super(cause);
     		}
     	}
    +
    +	// ------------------------------------------------------------------------
    +	//  composing futures
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Creates a future that is complete once multiple other futures completed. 
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 *
    +	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
    +	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
    +	 * 
    +	 * @param futures The futures that make up the conjunction. No null entries are allowed.
    +	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
    +	 */
    +	public static ConjunctFuture combineAll(Collection<? extends Future<?>> futures) {
    +		checkNotNull(futures, "futures");
    +		checkArgument(!futures.isEmpty(), "futures is empty");
    +
    +		final ConjunctFutureImpl conjunct = new ConjunctFutureImpl(futures.size());
    +
    +		for (Future<?> future : futures) {
    +			future.handle(conjunct.completionHandler);
    +		}
    +
    +		return conjunct;
    +	}
    +
    +	/**
    +	 * A future that is complete once multiple other futures completed. The futures are not
    +	 * necessarily of the same type, which is why the type of this Future is {@code Void}.
    +	 * The ConjunctFuture fails (completes exceptionally) once one of the Futures in the
    +	 * conjunction fails.
    +	 * 
    +	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
    +	 * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how
    +	 * many of the Futures are already complete.
    +	 */
    +	public interface ConjunctFuture extends CompletableFuture<Void> {
    +
    +		/**
    +		 * Gets the total number of Futures in the conjunction.
    +		 * @return The total number of Futures in the conjunction.
    +		 */
    +		int getNumFuturesTotal();
    +
    +		/**
    +		 * Gets the number of Futures in the conjunction that are already complete.
    +		 * @return The number of Futures in the conjunction that are already complete
    +		 */
    +		int getNumFuturesCompleted();
    +	}
    +
    +	/**
    +	 * The implementation of the {@link ConjunctFuture}.
    +	 * 
    +	 * <p>Implementation notice: The member fields all have package-private access, because they are
    +	 * either accessed by an inner subclass or by the enclosing class.
    +	 */
    +	private static class ConjunctFutureImpl extends FlinkCompletableFuture<Void> implements ConjunctFuture {
    --- End diff --
    
    Interesting idea. I think the linked implementation is not yet thread safe, because the BiFunction that adds the results to the collection is called concurrently as the different original futures complete. For this particular use case, we'd need to also preserve the order. This is easy to change by simply pre-allocating a target array and setting the results to the positions (the completion function would need to get the target index).
    
    I would actually like to do that as a separate follow-up, unless you object there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101746831
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph;
    +
    +import org.apache.flink.runtime.concurrent.BiFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.instance.SimpleSlot;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import java.util.List;
    +
    +/**
    + * Utilities for dealing with the execution graphs and scheduling.
    + */
    +public class ExecutionGraphUtils {
    +
    +	/**
    +	 * Releases the slot represented by the given future. If the future is complete, the
    +	 * slot is immediately released. Otherwise, the slot is released as soon as the future
    +	 * is completed.
    +	 * 
    +	 * <p>Note that releasing the slot means cancelling any task execution currently
    +	 * associated with that slot.
    +	 * 
    +	 * @param slotFuture The future for the slot to release.
    +	 */
    +	public static void releaseSlotFuture(Future<SimpleSlot> slotFuture) {
    +		slotFuture.handle(ReleaseSlotFunction.INSTANCE);
    +	}
    +
    +	/**
    +	 * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
    +	 * For each future in that collection holds: If the future is complete, its slot is
    +	 * immediately released. Otherwise, the slot is released as soon as the future
    +	 * is completed.
    +	 * 
    +	 * <p>This methods never throws any exceptions (subclasses of {@code Exception})
    +	 * and continues to release the remaining slots if one slot release failed. We only
    +	 * catch Exceptions here (and not other throwables) because the code executed while
    +	 * releasing slot does not involve any dynamic 
    --- End diff --
    
    sentence incomplete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3295: [FLINK-5747] [distributed coordination] Eager sche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3295#discussion_r101739062
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -754,6 +759,139 @@ public void scheduleForExecution(SlotProvider slotProvider) throws JobException
     		}
     	}
     
    +	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
    +		// simply take the vertices without inputs.
    +		for (ExecutionJobVertex ejv : this.tasks.values()) {
    +			if (ejv.getJobVertex().isInputVertex()) {
    +				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * 
    +	 * 
    +	 * @param slotProvider  The resource provider from which the slots are allocated
    +	 * @param timeout       The maximum time that the deployment may take, before a
    +	 *                      TimeoutException is thrown.
    +	 */
    +	private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
    +		checkState(state == JobStatus.RUNNING, "job is not running currently");
    +
    +		// Important: reserve all the space we need up front.
    +		// that way we do not have any operation that can fail between allocating the slots
    +		// and adding them to the list. If we had a failure in between there, that would
    +		// cause the slots to get lost
    +		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
    +		final boolean queued = allowQueuedScheduling;
    +
    +		// we use this flag to handle failures in a 'finally' clause
    +		// that allows us to not go through clumsy cast-and-rethrow logic
    +		boolean successful = false;
    +
    +		try {
    +			// collecting all the slots may resize and fail in that operation without slots getting lost
    +			final ArrayList<Future<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
    +
    +			// allocate the slots (obtain all their futures
    +			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
    +				// these calls are not blocking, they only return futures
    +				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
    --- End diff --
    
    With the generalized `ConjunctFuture` we could return a collection if `Future<ExecutionAndSlots>` which could then be combined to a `ConjunctFuture<ExecutionAndSlots>`. When completed it would pass a `Collection<ExecutionAndSlots>` to the handle method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---