You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2017/07/14 06:09:00 UTC

[jira] [Comment Edited] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly

    [ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086915#comment-16086915 ] 

Sihua Zhou edited comment on FLINK-7153 at 7/14/17 6:08 AM:
------------------------------------------------------------

Hi [~StephanEwen],

Here is my solution.

For  problem 1, We need to record the `Future<SimplSlot>` in `Execution` as soon as  `Execution.allocateSlotForExecution()` got a SimpleSlot successfully. So i add a private field `Future<SimpleSlot> assignedFutureResource` with the getter method for `Execution` class, the getter method will be called by `ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1 can be solved. This need to modify `Executioin.class` and `ExecutionVertex.class`.

For problem 2, i think we need change the allocate strategy into two steps,
*  Step 1, Only allocate from SlothGroup base on inputs, in this step every ExectionVertex that can be allocate with local partition will be allocate successfully.  
*  Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify `ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment` and `ScheduledUnit.class` which need to carry the `onlyAllocateBasePreferLocation` flag. 

After all, there are 7 classes need to be modified.

The bref code looks like this:

# *Execution.class*
{code:java}
class Execution {

  private volatile Future<SimpleSlot> assignedFutureResource; 
  //the onlyAllocateBasePreferLocation parameter will be used for problem 2.
  public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued, boolean onlyAllocateBasePreferLocation)
			throws IllegalExecutionStateException {
		/*
		*/
		if (transitionState(CREATED, SCHEDULED)) {

			ScheduledUnit toSchedule = locationConstraint == null ?
					new ScheduledUnit(this, sharingGroup) :
					new ScheduledUnit(this, sharingGroup, locationConstraint);
					
			toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);

			//record the assign info by update assignedFutureResource
			assignedFutureResource = slotProvider.allocateSlot(toSchedule, queued);
			if (assignedFutureResource == null) {
				transitionState(SCHEDULED, CREATED);
			}
			return assignedFutureResource;
		}
		/*
		*/
	}
	
	public Future<SimpleSlot> getAllocateFutureSlot() {
      return assignedFutureResource;
	}
}
{code}


# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
  public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
		/*
		*/
		//try to look-up futureSlot if getCurrentAssignedResource() return null
		SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
						
		if (sourceSlot == null) {
				Future<SimpleSlot> futureSlot = sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
				if (futureSlot != null) {
					sourceSlot = futureSlot.get();
				}
		}
		/*
		*/
	}
}
{code}

# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
	public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
		final List<ExecutionAndSlot> slots = new ArrayList<>(this.taskVertices.length);
		//step 1
		slots.addAll(allocateResourcesForAllHepler(resourceProvider, queued, true));
		//step 2
		slots.addAll(allocateResourcesForAllHepler(resourceProvider, queued, false));
		return slots.toArray(new ExecutionAndSlot[this.taskVertices.length]);
	}
	
	
	//In addition to adding an onlyAllocateBaseOnInputs parameter, it's similar the old allocateResourcesForAll function.
	public List<ExecutionAndSlot> allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued, boolean onlyAllocateBaseOnInputs) {

		final List<ExecutionAndSlot> slots = new ArrayList<>(taskVertices.length);
		final List<ExecutionVertex> verticeList = Lists.newArrayList(this.taskVertices);

		final SlotSharingGroup sharingGroup = this.getSlotSharingGroup();

		if (sharingGroup == null) {
			if (onlyAllocateBaseOnInputs) {
				return slots;
			}
		} else {
			if (queued) {
				throw new IllegalArgumentException(
					"A task with a vertex sharing group was scheduled in a queued fashion.");
			}
		}

		for (int i = 0; i < verticeList.size(); ++i) {
			boolean successful = false;
			try {
				final ExecutionVertex vertex = verticeList.get(i);
				final Execution exec = vertex.getCurrentExecutionAttempt();
				if (exec.hasPreAllocateSlot()) {
					successful = true;
					continue;
				}
				Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued, onlyAllocateBaseOnInputs);
				if (future != null) {
					slots.add(new ExecutionAndSlot(exec, future));
				}
				successful = true;
			} finally {
				if (!successful) {
					// this is the case if an exception was thrown
					for (ExecutionAndSlot slot: slots) {
						ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
					}
				}
			}
		}

		return slots;
	}	
}
{code}

# *Scheduler.class*
{code:java}
class Scheduler {	
	//In scheduleTask(), we need to return null, when try to allocate base on inputs failed.
	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {

		synchronized (globalLock) {
			.................
			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();			
			if (sharingUnit != null) {
				//
				boolean isOnlyAllocateByPreferInputs = task.getOnlyAllocateByPreferInputs();
				if (isOnlyAllocateByPreferInputs && slotFromGroup == null) {
					return null;
				}
				//
			}
			.......
		}
	}
}
{code}

# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
  
  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly, boolean onlyAllocateBaseOnInputs) {	
  /*
  */
  if (preferredLocations != null) {
		for (TaskManagerLocation location : preferredLocations) {
				// set the flag that we failed a preferred location. If one will be found,
				// we return early anyways and skip the flag evaluation
				didNotGetPreferred = true;
				SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
				if (slot != null && slot.isAlive()) {
					return new Tuple2<>(slot, Locality.LOCAL);
				}
			}
		}

		if (onlyAllocateBaseOnInputs) {
			return null;
		}    
  }
  /*
  */
}
{code}

Thank you very much for your time to review my solution. Can you assign this issue to me, i'm willing to contribute to flink so much.

Thanks,
Sihua ZHou


was (Author: sihuazhou):
Hi [~StephanEwen],

Here is my solution.

For  problem 1, We need to record the `Future<SimplSlot>` in `Execution` as soon as  `Execution.allocateSlotForExecution()` got a SimpleSlot successfully. So i add a private field `Future<SimpleSlot> assignedFutureResource` with the getter method for `Execution` class, the getter method will be called by `ExecutionVertex.getPreferredLocationsBasedOnInputs()`. After that, problem 1 can be solved. This need to modify `Executioin.class` and `ExecutionVertex.class`.

For problem 2, i think we need change the allocate strategy into two steps,
*  Step 1, Only allocate from SlothGroup base on inputs, in this step every ExectionVertex that can be allocate with local partition will be allocate successfully.  
*  Step 2, Do normal allocation for the remain ExecutionVertex.
With the above two steps, problem 2 can be solved. This need to modify `ExecutionJobVertex.class` , `Scheduler.class`, `SlotSharingGroupAssignment` and `ScheduledUnit.class` which need to carry the `onlyAllocateBasePreferLocation` flag. 

After all, there are 7 classes need to be modified.

The bref code looks like this:

# *Execution.class*
{code:java}
class Execution {

  private volatile Future<SimpleSlot> assignedFutureResource; 
  //the onlyAllocateBasePreferLocation parameter will be used for problem 2.
  public Future<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued, boolean onlyAllocateBasePreferLocation)
			throws IllegalExecutionStateException {
		/*
		*/
		if (transitionState(CREATED, SCHEDULED)) {

			ScheduledUnit toSchedule = locationConstraint == null ?
					new ScheduledUnit(this, sharingGroup) :
					new ScheduledUnit(this, sharingGroup, locationConstraint);
					
			toSchedule.setOnlyAllocateBasePreferInputs(onlyAllocateBasePreferLocation);

			//record the assign info by update assignedFutureResource
			assignedFutureResource = slotProvider.allocateSlot(toSchedule, queued);
			if (assignedFutureResource == null) {
				transitionState(SCHEDULED, CREATED);
			}
			return assignedFutureResource;
		}
		/*
		*/
	}
	
	public Future<SimpleSlot> getAllocateFutureSlot() {
      return assignedFutureResource;
	}
}
{code}


# *ExecutionVertex.class*
{code:java}
class ExecutionVertex {
  public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
		/*
		*/
		//try to look-up futureSlot if getCurrentAssignedResource() return null
		SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
						
		if (sourceSlot == null) {
				Future<SimpleSlot> futureSlot = sources[k].getSource().getProducer().getCurrentExecutionAttempt().getAllocateFutureSlot();
				if (futureSlot != null) {
					sourceSlot = futureSlot.get();
				}
		}
		/*
		*/
	}
}
{code}

# *ExecutionJobVertex.class*
{code:java}
class ExecutionJobVertex {
	public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
		final List<ExecutionAndSlot> slots = new ArrayList<>(this.taskVertices.length);
		//step 1
		slots.addAll(allocateResourcesForAllHepler(resourceProvider, queued, true));
		//step 2
		slots.addAll(allocateResourcesForAllHepler(resourceProvider, queued, false));
		return slots.toArray(new ExecutionAndSlot[this.taskVertices.length]);
	}
	
	
	//In addition to adding an onlyAllocateBaseOnInputs parameter, it's similar the old allocateResourcesForAll function.
	public List<ExecutionAndSlot> allocateResourcesForAllHepler(SlotProvider resourceProvider, boolean queued, boolean onlyAllocateBaseOnInputs) {

		final List<ExecutionAndSlot> slots = new ArrayList<>(taskVertices.length);
		final List<ExecutionVertex> verticeList = Lists.newArrayList(this.taskVertices);

		final SlotSharingGroup sharingGroup = this.getSlotSharingGroup();

		if (sharingGroup == null) {
			if (onlyAllocateBaseOnInputs) {
				return slots;
			}
		} else {
			if (queued) {
				throw new IllegalArgumentException(
					"A task with a vertex sharing group was scheduled in a queued fashion.");
			}
		}

		for (int i = 0; i < verticeList.size(); ++i) {
			boolean successful = false;
			try {
				final ExecutionVertex vertex = verticeList.get(i);
				final Execution exec = vertex.getCurrentExecutionAttempt();
				if (exec.hasPreAllocateSlot()) {
					successful = true;
					continue;
				}
				Future<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued, onlyAllocateBaseOnInputs);
				if (future != null) {
					slots.add(new ExecutionAndSlot(exec, future));
				}
				successful = true;
			} finally {
				if (!successful) {
					// this is the case if an exception was thrown
					for (ExecutionAndSlot slot: slots) {
						ExecutionGraphUtils.releaseSlotFuture(slot.slotFuture);
					}
				}
			}
		}

		return slots;
	}	
}
{code}

# *Scheduler.class*
{code:java}
class Scheduler {	
	//In scheduleTask(), we need to return null, when try to allocate base on inputs failed.
	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {

		synchronized (globalLock) {
			.................
			SlotSharingGroup sharingUnit = task.getSlotSharingGroup();			
			if (sharingUnit != null) {
				//
				boolean isOnlyAllocateByPreferInputs = task.getOnlyAllocateByPreferInputs();
				if (isOnlyAllocateByPreferInputs && slotFromGroup == null) {
					return null;
				}
				//
			}
			.......
		}
	}
}
{code}

# *SlotSharingGroupAssignment.class*
{code:java}
class SlotSharingGroupAssignment {
  
  private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly, boolean onlyAllocateBaseOnInputs) {	
  /*
  */
  if (preferredLocations != null) {
			for (TaskManagerLocation location : preferredLocations) {

				// set the flag that we failed a preferred location. If one will be found,
				// we return early anyways and skip the flag evaluation
				didNotGetPreferred = true;

				SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
				if (slot != null && slot.isAlive()) {
					return new Tuple2<>(slot, Locality.LOCAL);
				}
			}
		}

		if (onlyAllocateBaseOnInputs) {
			return null;
		}    
  }
  /*
  */
}
{code}

Thank you very much for your time to review my solution. Can you assign this issue to me, i'm willing to contribute to flink so much.

Thanks,
Sihua ZHou

> Eager Scheduling can't allocate source for ExecutionGraph correctly
> -------------------------------------------------------------------
>
>                 Key: FLINK-7153
>                 URL: https://issues.apache.org/jira/browse/FLINK-7153
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Stephan Ewen
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> The ExecutionGraph.scheduleEager() function allocate for ExecutionJobVertex one by one via calling ExecutionJobVertex.allocateResourcesForAll(), here is two problem about it:
> 1. The ExecutionVertex.getPreferredLocationsBasedOnInputs will always return empty, cause `sourceSlot` always be null until `ExectionVertex` has been deployed via 'Execution.deployToSlot()'. So allocate resource base on prefered location can't work correctly, we need to set the slot info for `Execution` as soon as Execution.allocateSlotForExecution() called successfully?
> 2. Current allocate strategy can't allocate the slot optimize.  Here is the test case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three remote partition. But actually, it should be 2 local partition and 2 remote partition. 
> The causes of the above problems is becuase that the current allocate strategy is allocate the resource for execution one by one(if the execution can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 
> If we change the allocate strategy to two step will solve this problem, below is the Pseudo code:
> {code}
> for (ExecutionJobVertex ejv: getVerticesTopologically) {
> //step 1: try to allocate from SlothGroup base on inputs one by one (which only allocate resource base on location).
> //step 2: allocate for the remain execution.
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)