You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by KurtYoung <gi...@git.apache.org> on 2016/09/01 01:24:02 UTC

[GitHub] flink pull request #2447: [FLINK-4490] [distributed coordination] Decouple t...

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2447#discussion_r77101490
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java ---
    @@ -20,73 +20,125 @@
     
     import org.apache.flink.runtime.instance.SimpleSlot;
     
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * 
    + */
     public class SlotAllocationFuture {
    -	
    +
     	private final Object monitor = new Object();
    -	
    +
     	private volatile SimpleSlot slot;
    -	
    +
     	private volatile SlotAllocationFutureAction action;
    -	
    +
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Creates a future that is uncompleted.
    +	 */
     	public SlotAllocationFuture() {}
    -	
    +
    +	/**
    +	 * Creates a future that is immediately completed.
    +	 * 
    +	 * @param slot The task slot that completes the future.
    +	 */
     	public SlotAllocationFuture(SimpleSlot slot) {
     		this.slot = slot;
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
    -	
    -	public SimpleSlot waitTillAllocated() throws InterruptedException {
    -		return waitTillAllocated(0);
    -	}
    -	
    -	public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException {
    +
    +	public SimpleSlot waitTillCompleted() throws InterruptedException {
     		synchronized (monitor) {
     			while (slot == null) {
    -				monitor.wait(timeout);
    +				monitor.wait();
    +			}
    +			return slot;
    +		}
    +	}
    +
    +	public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
    +		checkArgument(timeout >= 0, "timeout may not be negative");
    +		checkNotNull(timeUnit, "timeUnit");
    +
    +		if (timeout == 0) {
    +			return waitTillCompleted();
    +		} else {
    +			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
    +			long millisToWait;
    +
    +			synchronized (monitor) {
    +				while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
    +					monitor.wait(millisToWait);
    +				}
    +
    +				if (slot != null) {
    +					return slot;
    +				} else {
    +					throw new TimeoutException();
    +				}
     			}
    -			
    +		}
    +	}
    +
    +	/**
    +	 * Gets the slot from this future. This method throws an exception, if the future has not been completed.
    +	 * This method never blocks.
    +	 * 
    +	 * @return The slot with which this future was completed.
    +	 * @throws IllegalStateException Thrown, if this method is called before the future is completed.
    +	 */
    +	public SimpleSlot get() {
    +		final SimpleSlot slot = this.slot;
    +		if (slot != null) {
     			return slot;
    +		} else {
    +			throw new IllegalStateException("The future is not complete - not slot available");
    --- End diff --
    
    Can we throw a explicitly exception like SlotNotReadyException instead of RuntimeException? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---