You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/16 13:14:11 UTC

[flink] branch master updated: [FLINK-10945] Add an InputDependencyConstraint to avoid resource deadlocks for finite stream jobs when resources are limited

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 171a3b1  [FLINK-10945] Add an InputDependencyConstraint to avoid resource deadlocks for finite stream jobs when resources are limited
171a3b1 is described below

commit 171a3b143b9d70faa6982878fed936bfee580a8f
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 6 19:37:05 2018 +0800

    [FLINK-10945] Add an InputDependencyConstraint to avoid resource deadlocks for finite stream jobs when resources are limited
    
    This commit adds a job config InputDependencyConstraint, which helps to avoid
    resource deadlocks in LAZY_FROM_SOURCES scheduling when resources are limited.
    
    The InputDependencyConstraint controls across multiple inputs when consumers are
    scheduled. Currently it supports ANY and ALL. ANY means that any input intermediate
    result partition must be consumable and ALL means that all input intermediate result
    partitions (from all inputs) need to be consumable in order to schedule the consumer task.
    
    This closes #7255.
---
 .../apache/flink/api/common/ExecutionConfig.java   |  33 +++-
 .../api/common/InputDependencyConstraint.java      |  35 ++++
 .../optimizer/plantranslate/JobGraphGenerator.java |   1 +
 .../InputChannelDeploymentDescriptor.java          |   6 +-
 .../flink/runtime/executiongraph/Execution.java    |  46 +++--
 .../runtime/executiongraph/ExecutionJobVertex.java |   5 +
 .../runtime/executiongraph/ExecutionVertex.java    |  37 ++++
 .../runtime/executiongraph/IntermediateResult.java |  12 +-
 .../IntermediateResultPartition.java               |  23 ++-
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  12 ++
 .../InputChannelDeploymentDescriptorTest.java      |   2 +
 .../ExecutionVertexInputConstraintTest.java        | 214 +++++++++++++++++++++
 .../IntermediateResultPartitionTest.java           |  99 ++++++++++
 .../api/graph/StreamingJobGraphGenerator.java      |   3 +
 14 files changed, 499 insertions(+), 29 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 16b2adf..bee67cb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
 	private boolean failTaskOnCheckpointError = true;
 
+	/** The default input dependency constraint to schedule tasks. */
+	private InputDependencyConstraint defaultInputDependencyConstraint = InputDependencyConstraint.ANY;
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -519,6 +522,30 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
+	 * Sets the default input dependency constraint for vertex scheduling. It indicates when a task
+	 * should be scheduled considering its inputs status.
+	 *
+	 * The default constraint is {@link InputDependencyConstraint#ANY}.
+	 *
+	 * @param inputDependencyConstraint The input dependency constraint.
+	 */
+	public void setDefaultInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
+		this.defaultInputDependencyConstraint = inputDependencyConstraint;
+	}
+
+	/**
+	 * Gets the default input dependency constraint for vertex scheduling. It indicates when a task
+	 * should be scheduled considering its inputs status.
+	 *
+	 * The default constraint is {@link InputDependencyConstraint#ANY}.
+	 *
+	 * @return The input dependency constraint of this job.
+	 */
+	public InputDependencyConstraint getDefaultInputDependencyConstraint() {
+		return defaultInputDependencyConstraint;
+	}
+
+	/**
 	 * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
 	 * In some cases this might be preferable. For example, when using interfaces
 	 * with subclasses that cannot be analyzed as POJO.
@@ -918,7 +945,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 				registeredKryoTypes.equals(other.registeredKryoTypes) &&
 				registeredPojoTypes.equals(other.registeredPojoTypes) &&
 				taskCancellationIntervalMillis == other.taskCancellationIntervalMillis &&
-				useSnapshotCompression == other.useSnapshotCompression;
+				useSnapshotCompression == other.useSnapshotCompression &&
+				defaultInputDependencyConstraint == other.defaultInputDependencyConstraint;
 
 		} else {
 			return false;
@@ -946,7 +974,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 			registeredKryoTypes,
 			registeredPojoTypes,
 			taskCancellationIntervalMillis,
-			useSnapshotCompression);
+			useSnapshotCompression,
+			defaultInputDependencyConstraint);
 	}
 
 	public boolean canEqual(Object obj) {
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 0000000..c7a1e6a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its inputs status.
+ */
+public enum InputDependencyConstraint {
+
+	/**
+	 * Schedule the task if any input is consumable.
+	 */
+	ANY,
+
+	/**
+	 * Schedule the task if all the inputs are consumable.
+	 */
+	ALL
+}
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 18259d3..aded190 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -246,6 +246,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 
 		// add vertices to the graph
 		for (JobVertex vertex : this.vertices.values()) {
+			vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint());
 			graph.addVertex(vertex);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30d..15bdb36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 			final ResultPartitionLocation partitionLocation;
 
 			// The producing task needs to be RUNNING or already FINISHED
-			if (consumedPartition.isConsumable() && producerSlot != null &&
+			if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) &&
+				producerSlot != null &&
 					(producerState == ExecutionState.RUNNING ||
 						producerState == ExecutionState.FINISHED ||
 						producerState == ExecutionState.SCHEDULED ||
@@ -136,7 +137,8 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 			}
 			else {
 				String msg = String.format("Trying to eagerly schedule a task whose inputs " +
-					"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+					"are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).",
+						consumedPartition.getResultType(),
 						consumedPartition.isConsumable(),
 						producerState,
 						producerSlot);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index a48352c..aee9ab1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
@@ -718,6 +719,26 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
+	private void scheduleConsumer(ExecutionVertex consumerVertex) {
+		CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
+					consumerVertex.scheduleForExecution(
+							executionGraph.getSlotProvider(),
+							executionGraph.isQueuedSchedulingAllowed(),
+							LocationPreferenceConstraint.ANY, // there must be at least one known location
+							Collections.emptySet());
+				} catch (Throwable t) {
+					consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
+							"vertex " + consumerVertex, t));
+				}
+
+				return null;
+			},
+			executor);
+	}
+
 	void scheduleOrUpdateConsumers(List<List<ExecutionEdge>> allConsumers) {
 		final int numConsumers = allConsumers.size();
 
@@ -755,23 +776,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// TODO The current approach may send many update messages even though the consuming
 				// task has already been deployed with all necessary information. We have to check
 				// whether this is a problem and fix it, if it is.
-				CompletableFuture.supplyAsync(
-					() -> {
-						try {
-							final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
-							consumerVertex.scheduleForExecution(
-								executionGraph.getSlotProvider(),
-								executionGraph.isQueuedSchedulingAllowed(),
-								LocationPreferenceConstraint.ANY, // there must be at least one known location
-								Collections.emptySet());
-						} catch (Throwable t) {
-							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
-									"vertex " + consumerVertex, t));
-						}
 
-						return null;
-					},
-					executor);
+				// Schedule the consumer vertex if its inputs constraint is satisfied, otherwise skip the scheduling.
+				// A shortcut of input constraint check is added for InputDependencyConstraint.ANY since
+				// at least one of the consumer vertex's inputs is consumable here. This is to avoid the
+				// O(N) complexity introduced by input constraint check for InputDependencyConstraint.ANY,
+				// as we do not want the default scheduling performance to be affected.
+				if (consumerVertex.getInputDependencyConstraint() == InputDependencyConstraint.ANY ||
+						consumerVertex.checkInputDependencyConstraints()) {
+					scheduleConsumer(consumerVertex);
+				}
 
 				// double check to resolve race conditions
 				if (consumerVertex.getExecutionState() == RUNNING) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ee73505..94ee45d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -366,6 +367,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		return inputs;
 	}
 
+	public InputDependencyConstraint getInputDependencyConstraint() {
+		return getJobVertex().getInputDependencyConstraint();
+	}
+
 	public Either<SerializedValue<TaskInformation>, PermanentBlobKey> getTaskInformationOrBlobKey() throws IOException {
 		// only one thread should offload the task information, so let's also let only one thread
 		// serialize the task information!
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a074729..1217217 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -61,6 +62,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -70,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
@@ -340,6 +343,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return resultPartitions;
 	}
 
+	public InputDependencyConstraint getInputDependencyConstraint() {
+		return getJobVertex().getInputDependencyConstraint();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Graph building
 	// --------------------------------------------------------------------------------------------
@@ -684,6 +691,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			throw new IllegalStateException("Unknown partition " + partitionId + ".");
 		}
 
+		partition.markDataProduced();
+
 		if (partition.getIntermediateResult().getResultType().isPipelined()) {
 			// Schedule or update receivers of this partition
 			execution.scheduleOrUpdateConsumers(partition.getConsumers());
@@ -726,6 +735,34 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
+	/**
+	 * Check whether the InputDependencyConstraint is satisfied for this vertex.
+	 *
+	 * @return whether the input constraint is satisfied
+	 */
+	boolean checkInputDependencyConstraints() {
+		if (getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
+			// InputDependencyConstraint == ANY
+			return IntStream.range(0, inputEdges.length).anyMatch(this::isInputConsumable);
+		} else {
+			// InputDependencyConstraint == ALL
+			return IntStream.range(0, inputEdges.length).allMatch(this::isInputConsumable);
+		}
+	}
+
+	/**
+	 * Get whether an input of the vertex is consumable.
+	 * An input is consumable when when any partition in it is consumable.
+	 *
+	 * Note that a BLOCKING result partition is only consumable when all partitions in the result are FINISHED.
+	 *
+	 * @return whether the input is consumable
+	 */
+	boolean isInputConsumable(int inputNumber) {
+		return Arrays.stream(inputEdges[inputNumber]).map(ExecutionEdge::getSource).anyMatch(
+				IntermediateResultPartition::isConsumable);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 313272c..6e1d9ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -156,19 +156,17 @@ public class IntermediateResult {
 
 	void resetForNewExecution() {
 		this.numberOfRunningProducers.set(numParallelProducers);
+		for (IntermediateResultPartition partition : partitions) {
+			partition.resetForNewExecution();
+		}
 	}
 
 	int decrementNumberOfRunningProducersAndGetRemaining() {
 		return numberOfRunningProducers.decrementAndGet();
 	}
 
-	boolean isConsumable() {
-		if (resultType.isPipelined()) {
-			return true;
-		}
-		else {
-			return numberOfRunningProducers.get() == 0;
-		}
+	boolean areAllPartitionsFinished() {
+		return numberOfRunningProducers.get() == 0;
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 124ceb2..d4e85cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -36,6 +36,11 @@ public class IntermediateResultPartition {
 
 	private List<List<ExecutionEdge>> consumers;
 
+	/**
+	 * Whether this partition has produced some data.
+	 */
+	private boolean hasDataProduced = false;
+
 	public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) {
 		this.totalResult = totalResult;
 		this.producer = producer;
@@ -60,7 +65,7 @@ public class IntermediateResultPartition {
 		return partitionId;
 	}
 
-	ResultPartitionType getResultType() {
+	public ResultPartitionType getResultType() {
 		return totalResult.getResultType();
 	}
 
@@ -68,8 +73,20 @@ public class IntermediateResultPartition {
 		return consumers;
 	}
 
+	public void markDataProduced() {
+		hasDataProduced = true;
+	}
+
 	public boolean isConsumable() {
-		return totalResult.isConsumable();
+		if (getResultType().isPipelined()) {
+			return hasDataProduced;
+		} else {
+			return totalResult.areAllPartitionsFinished();
+		}
+	}
+
+	void resetForNewExecution() {
+		hasDataProduced = false;
 	}
 
 	int addConsumerGroup() {
@@ -94,6 +111,8 @@ public class IntermediateResultPartition {
 			throw new IllegalStateException("Tried to mark a non-blocking result partition as finished");
 		}
 
+		hasDataProduced = true;
+
 		final int refCnt = totalResult.decrementNumberOfRunningProducersAndGetRemaining();
 
 		if (refCnt == 0) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 1fe95eb..c78d707 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobgraph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
@@ -112,6 +113,9 @@ public class JobVertex implements java.io.Serializable {
 	 * to be included in the JSON plan */
 	private String resultOptimizerProperties;
 
+	/** The input dependency constraint to schedule this vertex. */
+	private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -557,6 +561,14 @@ public class JobVertex implements java.io.Serializable {
 		this.resultOptimizerProperties = resultOptimizerProperties;
 	}
 
+	public InputDependencyConstraint getInputDependencyConstraint() {
+		return inputDependencyConstraint;
+	}
+
+	public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
+		this.inputDependencyConstraint = inputDependencyConstraint;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 6aa36b7..4f7417f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -192,6 +193,7 @@ public class InputChannelDeploymentDescriptorTest {
 
 	private static IntermediateResultPartition mockPartition(ExecutionVertex producer) {
 		IntermediateResultPartition partition = mock(IntermediateResultPartition.class);
+		when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
 		when(partition.isConsumable()).thenReturn(true);
 
 		IntermediateResult result = mock(IntermediateResult.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
new file mode 100644
index 0000000..3c7354a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the inputs constraint for {@link ExecutionVertex}.
+ */
+public class ExecutionVertexInputConstraintTest extends TestLogger {
+
+	@Test
+	public void testInputConsumable() throws Exception {
+		List<JobVertex> vertices = createOrderedVertices();
+		ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL);
+		ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
+		ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[1];
+
+		eg.scheduleForExecution();
+
+		// Inputs not consumable on init
+		assertFalse(ev31.isInputConsumable(0));
+		assertFalse(ev31.isInputConsumable(1));
+
+		// One pipelined input consumable on data produced
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertTrue(ev31.isInputConsumable(0));
+		// Input0 of ev32 is not consumable. It consumes the same PIPELINED result with ev31 but not the same partition
+		assertFalse(ev32.isInputConsumable(0));
+
+		// The blocking input not consumable if only one partition is FINISHED
+		ev21.getCurrentExecutionAttempt().markFinished();
+		assertFalse(ev31.isInputConsumable(1));
+
+		// The blocking input consumable if all partitions are FINISHED
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.isInputConsumable(1));
+
+		// Inputs not consumable after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.isInputConsumable(0));
+		assertFalse(ev31.isInputConsumable(1));
+	}
+
+	@Test
+	public void testInputConstraintANY() throws Exception {
+		List<JobVertex> vertices = createOrderedVertices();
+		ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ANY);
+		ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
+		ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
+
+		eg.scheduleForExecution();
+
+		// Inputs constraint not satisfied on init
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input1 consumable satisfies the constraint
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertTrue(ev31.checkInputDependencyConstraints());
+
+		// Inputs constraint not satisfied after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input2 consumable satisfies the constraint
+		waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L);
+		waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L);
+		ev21.getCurrentExecutionAttempt().markFinished();
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.checkInputDependencyConstraints());
+	}
+
+	@Test
+	public void testInputConstraintALL() throws Exception {
+		List<JobVertex> vertices = createOrderedVertices();
+		ExecutionGraph eg = createExecutionGraph(vertices, InputDependencyConstraint.ALL);
+		ExecutionVertex ev11 = eg.getJobVertex(vertices.get(0).getID()).getTaskVertices()[0];
+		ExecutionVertex ev21 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(vertices.get(1).getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(vertices.get(2).getID()).getTaskVertices()[0];
+
+		eg.scheduleForExecution();
+
+		// Inputs constraint not satisfied on init
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input1 consumable does not satisfy the constraint
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input2 consumable satisfies the constraint
+		ev21.getCurrentExecutionAttempt().markFinished();
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.checkInputDependencyConstraints());
+
+		// Inputs constraint not satisfied after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.checkInputDependencyConstraints());
+	}
+
+	private static List<JobVertex> createOrderedVertices() {
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		return Arrays.asList(v1, v2, v3);
+	}
+
+	private static ExecutionGraph createExecutionGraph(
+			List<JobVertex> orderedVertices,
+			InputDependencyConstraint inputDependencyConstraint) throws Exception {
+
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+		final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20);
+
+		for (JobVertex vertex : orderedVertices) {
+			vertex.setInputDependencyConstraint(inputDependencyConstraint);
+		}
+
+		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			new FixedDelayRestartStrategy(1, 0),
+			new RestartAllStrategy.Factory(),
+			slotProvider);
+		eg.attachJobGraph(orderedVertices);
+
+		return eg;
+	}
+
+	private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception {
+		waitForAllExecutionsPredicate(eg,
+			isInExecutionState(ExecutionState.CANCELING)
+				.or(isInExecutionState(ExecutionState.CANCELED))
+				.or(isInExecutionState(ExecutionState.FAILED))
+				.or(isInExecutionState(ExecutionState.FINISHED)),
+			2000L);
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) {
+				ev.getCurrentExecutionAttempt().cancelingComplete();
+			}
+		}
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
new file mode 100644
index 0000000..e20c0bd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link IntermediateResultPartition}.
+ */
+public class IntermediateResultPartitionTest extends TestLogger {
+
+	@Test
+	public void testPipelinedPartitionConsumable() throws Exception {
+		IntermediateResult result = createResult(ResultPartitionType.PIPELINED, 2);
+		IntermediateResultPartition partition1 = result.getPartitions()[0];
+		IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+		// Not consumable on init
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Partition 1 consumable after data are produced
+		partition1.markDataProduced();
+		assertTrue(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Not consumable if failover happens
+		result.resetForNewExecution();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+	}
+
+	@Test
+	public void testBlockingPartitionConsumable() throws Exception {
+		IntermediateResult result = createResult(ResultPartitionType.BLOCKING, 2);
+		IntermediateResultPartition partition1 = result.getPartitions()[0];
+		IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+		// Not consumable on init
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Not consumable if only one partition is FINISHED
+		partition1.markFinished();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Consumable after all partitions are FINISHED
+		partition2.markFinished();
+		assertTrue(partition1.isConsumable());
+		assertTrue(partition2.isConsumable());
+
+		// Not consumable if failover happens
+		result.resetForNewExecution();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+	}
+
+	private static IntermediateResult createResult(
+			ResultPartitionType resultPartitionType,
+			int producerCount) throws Exception {
+
+		ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
+		IntermediateResult result =
+				new IntermediateResult(new IntermediateDataSetID(), jobVertex, producerCount, resultPartitionType);
+		for (int i = 0; i < producerCount; i++) {
+			// Generate result partition in the result
+			new ExecutionVertex(jobVertex, i, new IntermediateResult[]{result}, Time.minutes(1));
+		}
+
+		return result;
+	}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6921302..bc0377b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -404,6 +404,9 @@ public class StreamingJobGraphGenerator {
 			LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
 		}
 
+		// TODO: inherit InputDependencyConstraint from the head operator
+		jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
+
 		jobVertices.put(streamNodeId, jobVertex);
 		builtVertices.add(streamNodeId);
 		jobGraph.addVertex(jobVertex);