You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/01 01:24:22 UTC

[jira] [Commented] (FLINK-4490) Decouple Slot and Instance

    [ https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453958#comment-15453958 ] 

ASF GitHub Bot commented on FLINK-4490:
---------------------------------------

Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2447#discussion_r77101490
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java ---
    @@ -20,73 +20,125 @@
     
     import org.apache.flink.runtime.instance.SimpleSlot;
     
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * 
    + */
     public class SlotAllocationFuture {
    -	
    +
     	private final Object monitor = new Object();
    -	
    +
     	private volatile SimpleSlot slot;
    -	
    +
     	private volatile SlotAllocationFutureAction action;
    -	
    +
     	// --------------------------------------------------------------------------------------------
     
    +	/**
    +	 * Creates a future that is uncompleted.
    +	 */
     	public SlotAllocationFuture() {}
    -	
    +
    +	/**
    +	 * Creates a future that is immediately completed.
    +	 * 
    +	 * @param slot The task slot that completes the future.
    +	 */
     	public SlotAllocationFuture(SimpleSlot slot) {
     		this.slot = slot;
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
    -	
    -	public SimpleSlot waitTillAllocated() throws InterruptedException {
    -		return waitTillAllocated(0);
    -	}
    -	
    -	public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException {
    +
    +	public SimpleSlot waitTillCompleted() throws InterruptedException {
     		synchronized (monitor) {
     			while (slot == null) {
    -				monitor.wait(timeout);
    +				monitor.wait();
    +			}
    +			return slot;
    +		}
    +	}
    +
    +	public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
    +		checkArgument(timeout >= 0, "timeout may not be negative");
    +		checkNotNull(timeUnit, "timeUnit");
    +
    +		if (timeout == 0) {
    +			return waitTillCompleted();
    +		} else {
    +			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
    +			long millisToWait;
    +
    +			synchronized (monitor) {
    +				while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
    +					monitor.wait(millisToWait);
    +				}
    +
    +				if (slot != null) {
    +					return slot;
    +				} else {
    +					throw new TimeoutException();
    +				}
     			}
    -			
    +		}
    +	}
    +
    +	/**
    +	 * Gets the slot from this future. This method throws an exception, if the future has not been completed.
    +	 * This method never blocks.
    +	 * 
    +	 * @return The slot with which this future was completed.
    +	 * @throws IllegalStateException Thrown, if this method is called before the future is completed.
    +	 */
    +	public SimpleSlot get() {
    +		final SimpleSlot slot = this.slot;
    +		if (slot != null) {
     			return slot;
    +		} else {
    +			throw new IllegalStateException("The future is not complete - not slot available");
    --- End diff --
    
    Can we throw a explicitly exception like SlotNotReadyException instead of RuntimeException? 


> Decouple Slot and Instance
> --------------------------
>
>                 Key: FLINK-4490
>                 URL: https://issues.apache.org/jira/browse/FLINK-4490
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>            Reporter: Kurt Young
>             Fix For: 1.2.0
>
>
> Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} holding {{Slot}}, it makes sense because it reflects how many resources it can provide and how many are using. 
> But it's not very necessary for {{Slot}} to hold {{Instance}} which it belongs to. It only needs to hold some connection information and gateway to talk to. Another downside for {{Slot}} holding {{Instance}} is that {{Instance}} actually contains some allocate/de-allocation logicals, it will be difficult if we want to do some allocation refactor without letting {{Slot}} noticed. 
> We should abstract the connection information of {{Instance}} to let {{Slot}} holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of instance's akka gateway, maybe we can just adding the akka gateway to the {{InstanceConnectionInfo}})



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)