You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/06/16 23:37:28 UTC
flink git commit: [FLINK-2225] [scheduler] Excludes static code paths
from co-location constraint to avoid scheduling problems
Repository: flink
Updated Branches:
refs/heads/master d38154129 -> 5374ba97c
[FLINK-2225] [scheduler] Excludes static code paths from co-location constraint to avoid scheduling problems
This closes #843.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5374ba97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5374ba97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5374ba97
Branch: refs/heads/master
Commit: 5374ba97cd863af3062c9b6fa9f96c4b4ebe9a0a
Parents: d381541
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 16 12:27:02 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 16 23:36:48 2015 +0200
----------------------------------------------------------------------
.../optimizer/plantranslate/JobGraphGenerator.java | 5 +++--
.../org/apache/flink/runtime/instance/Instance.java | 5 +++++
.../org/apache/flink/runtime/instance/Slot.java | 4 ++--
.../instance/SlotSharingGroupAssignment.java | 7 +++++++
.../runtime/jobmanager/scheduler/Scheduler.java | 16 +++++++++-------
5 files changed, 26 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 109be20..281e425 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -527,8 +527,9 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (this.currentIteration != null) {
JobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
- // the head may still be null if we descend into the static parts first
- if (head != null) {
+ // Exclude static code paths from the co-location constraint, because otherwise
+ // their execution determines the deployment slots of the co-location group
+ if (node.isOnDynamicPath()) {
targetVertex.setStrictlyCoLocatedWith(head);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 7b48693..39caf08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -29,6 +29,8 @@ import akka.actor.ActorRef;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
@@ -36,6 +38,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
*/
public class Instance {
+ private final static Logger LOG = LoggerFactory.getLogger(Instance.class);
+
/** The lock on which to synchronize allocations and failure state changes */
private final Object instanceLock = new Object();
@@ -286,6 +290,7 @@ public class Instance {
}
if (slot.markReleased()) {
+ LOG.debug("Return allocated slot {}.", slot);
synchronized (instanceLock) {
if (isDead) {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 730e08a..341ef95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -244,11 +244,11 @@ public abstract class Slot {
@Override
public String toString() {
- return hierarchy() + " - " + instance.getId() + " - " + getStateName(status);
+ return hierarchy() + " - " + instance + " - " + getStateName(status);
}
protected String hierarchy() {
- return "(" + slotNumber + ")" + (getParent() != null ? getParent().hierarchy() : "");
+ return (getParent() != null ? getParent().hierarchy() : "") + "(" + slotNumber + ")";
}
private static String getStateName(int state) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 801e9ca..94249de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.util.AbstractID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -83,6 +85,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
*/
public class SlotSharingGroupAssignment {
+ private final static Logger LOG = LoggerFactory.getLogger(SlotSharingGroupAssignment.class);
+
/** The lock globally guards against concurrent modifications in the data structures */
private final Object lock = new Object();
@@ -485,6 +489,7 @@ public class SlotSharingGroupAssignment {
// check whether the slot is already released
if (simpleSlot.markReleased()) {
+ LOG.debug("Release simple slot {}.", simpleSlot);
AbstractID groupID = simpleSlot.getGroupID();
SharedSlot parent = simpleSlot.getParent();
@@ -581,6 +586,8 @@ public class SlotSharingGroupAssignment {
// we remove ourselves from our parent slot
if (sharedSlot.markReleased()) {
+ LOG.debug("Internally dispose empty shared slot {}.", sharedSlot);
+
int parentRemaining = parent.removeDisposedChildSlot(sharedSlot);
if (parentRemaining > 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/5374ba97/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 579a6b4..940082e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -209,7 +209,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
constraint.lockLocation();
}
- updateLocalityCounters(slotFromGroup.getLocality(), vertex, slotFromGroup.getInstance());
+ updateLocalityCounters(slotFromGroup, vertex);
return slotFromGroup;
}
@@ -279,7 +279,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
constraint.lockLocation();
}
- updateLocalityCounters(toUse.getLocality(), vertex, toUse.getInstance());
+ updateLocalityCounters(toUse, vertex);
}
catch (NoResourceAvailableException e) {
throw e;
@@ -303,7 +303,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
if (slot != null) {
- updateLocalityCounters(slot.getLocality(), vertex, slot.getInstance());
+ updateLocalityCounters(slot, vertex);
return slot;
}
else {
@@ -570,7 +570,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
}
}
- private void updateLocalityCounters(Locality locality, ExecutionVertex vertex, Instance location) {
+ private void updateLocalityCounters(SimpleSlot slot, ExecutionVertex vertex) {
+ Locality locality = slot.getLocality();
+
switch (locality) {
case UNCONSTRAINED:
this.unconstrainedAssignments++;
@@ -588,13 +590,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
if (LOG.isDebugEnabled()) {
switch (locality) {
case UNCONSTRAINED:
- LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ LOG.debug("Unconstrained assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
break;
case LOCAL:
- LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ LOG.debug("Local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
break;
case NON_LOCAL:
- LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + location);
+ LOG.debug("Non-local assignment: " + vertex.getTaskNameWithSubtaskIndex() + " --> " + slot);
break;
}
}