You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/06/16 23:37:28 UTC

flink git commit: [FLINK-2225] [scheduler] Excludes static code paths from co-location constraint to avoid scheduling problems

Repository: flink
Updated Branches:
  refs/heads/master d38154129 -> 5374ba97c


[FLINK-2225] [scheduler] Excludes static code paths from co-location constraint to avoid scheduling problems

This closes #843.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5374ba97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5374ba97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5374ba97

Branch: refs/heads/master
Commit: 5374ba97cd863af3062c9b6fa9f96c4b4ebe9a0a
Parents: d381541
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 16 12:27:02 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 16 23:36:48 2015 +0200

----------------------------------------------------------------------
 .../optimizer/plantranslate/JobGraphGenerator.java  |  5 +++--
 .../org/apache/flink/runtime/instance/Instance.java |  5 +++++
 .../org/apache/flink/runtime/instance/Slot.java     |  4 ++--
 .../instance/SlotSharingGroupAssignment.java        |  7 +++++++
 .../runtime/jobmanager/scheduler/Scheduler.java     | 16 +++++++++-------
 5 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 109be20..281e425 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -527,8 +527,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			
 			if (this.currentIteration != null) {
 				JobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
-				// the head may still be null if we descend into the static parts first
-				if (head != null) {
+				// Exclude static code paths from the co-location constraint, because otherwise
+				// their execution determines the deployment slots of the co-location group
+				if (node.isOnDynamicPath()) {
 					targetVertex.setStrictlyCoLocatedWith(head);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 7b48693..39caf08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -29,6 +29,8 @@ import akka.actor.ActorRef;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
@@ -36,6 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
  */
 public class Instance {
 
+	private final static Logger LOG = LoggerFactory.getLogger(Instance.class);
+
 	/** The lock on which to synchronize allocations and failure state changes */
 	private final Object instanceLock = new Object();
 
@@ -286,6 +290,7 @@ public class Instance {
 		}
 
 		if (slot.markReleased()) {
+			LOG.debug("Return allocated slot {}.", slot);
 			synchronized (instanceLock) {
 				if (isDead) {
 					return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 730e08a..341ef95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -244,11 +244,11 @@ public abstract class Slot {
 
 	@Override
 	public String toString() {
-		return hierarchy() + " - " + instance.getId() + " - " + getStateName(status);
+		return hierarchy() + " - " + instance + " - " + getStateName(status);
 	}
 
 	protected String hierarchy() {
-		return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : "");
+		return (getParent() != null ? getParent().hierarchy() : "") + "(" + slotNumber + ")";
 	}
 
 	private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 801e9ca..94249de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -83,6 +85,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  */
 public class SlotSharingGroupAssignment {
 
+	private final static Logger LOG = LoggerFactory.getLogger(SlotSharingGroupAssignment.class);
+
 	/** The lock globally guards against concurrent modifications in the data structures */
 	private final Object lock = new Object();
 	
@@ -485,6 +489,7 @@ public class SlotSharingGroupAssignment {
 
 				// check whether the slot is already released
 				if (simpleSlot.markReleased()) {
+					LOG.debug("Release simple slot {}.", simpleSlot);
 
 					AbstractID groupID = simpleSlot.getGroupID();
 					SharedSlot parent = simpleSlot.getParent();
@@ -581,6 +586,8 @@ public class SlotSharingGroupAssignment {
 			// we remove ourselves from our parent slot
 
 			if (sharedSlot.markReleased()) {
+				LOG.debug("Internally dispose empty shared slot {}.", sharedSlot);
+
 				int parentRemaining = parent.removeDisposedChildSlot(sharedSlot);
 				
 				if (parentRemaining > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 579a6b4..940082e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -209,7 +209,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 							constraint.lockLocation();
 						}
 						
-						updateLocalityCounters(slotFromGroup.getLocality(), vertex, slotFromGroup.getInstance());
+						updateLocalityCounters(slotFromGroup, vertex);
 						return slotFromGroup;
 					}
 					
@@ -279,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 						constraint.lockLocation();
 					}
 					
-					updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance());
+					updateLocalityCounters(toUse, vertex);
 				}
 				catch (NoResourceAvailableException e) {
 					throw e;
@@ -303,7 +303,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				
 				SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
 				if (slot != null) {
-					updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance());
+					updateLocalityCounters(slot, vertex);
 					return slot;
 				}
 				else {
@@ -570,7 +570,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 	}
 	
-	private void updateLocalityCounters(Locality locality, ExecutionVertex vertex, Instance location) {
+	private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex vertex) {
+		Locality locality = slot.getLocality();
+
 		switch (locality) {
 		case UNCONSTRAINED:
 			this.unconstrainedAssignments++;
@@ -588,13 +590,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		if (LOG.isDebugEnabled()) {
 			switch (locality) {
 				case UNCONSTRAINED:
-					LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
 					break;
 				case LOCAL:
-					LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
 					break;
 				case NON_LOCAL:
-					LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+					LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
 					break;
 			}
 		}