You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/06 15:35:24 UTC

[GitHub] zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…

zhuzhurk closed pull request #7252: [FLINK-10945] Add an InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7252
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..6def1dfa488 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -159,6 +159,9 @@
 	/** Determines if a task fails or not if there is an error in writing its checkpoint data. Default: true */
 	private boolean failTaskOnCheckpointError = true;
 
+	/** The input dependency constraint to schedule tasks. */
+	private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -518,6 +521,30 @@ public ExecutionMode getExecutionMode() {
 		return executionMode;
 	}
 
+	/**
+	 * Sets the input dependency constraint for vertex scheduling. It indicates when a task
+	 * should be scheduled considering its inputs status.
+	 *
+	 * The default constraint is {@link InputDependencyConstraint#ANY}.
+	 *
+	 * @param inputDependencyConstraint The input dependency constraint.
+	 */
+	public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
+		this.inputDependencyConstraint = inputDependencyConstraint;
+	}
+
+	/**
+	 * Gets the input dependency constraint for vertex scheduling. It indicates when a task
+	 * should be scheduled considering its inputs status.
+	 *
+	 * The default constraint is {@link InputDependencyConstraint#ANY}.
+	 *
+	 * @return The input dependency constraint of this job.
+	 */
+	public InputDependencyConstraint getInputDependencyConstraint() {
+		return inputDependencyConstraint;
+	}
+
 	/**
 	 * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
 	 * In some cases this might be preferable. For example, when using interfaces
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
new file mode 100644
index 00000000000..c7a1e6a3519
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/InputDependencyConstraint.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * This constraint indicates when a task should be scheduled considering its inputs status.
+ */
+public enum InputDependencyConstraint {
+
+	/**
+	 * Schedule the task if any input is consumable.
+	 */
+	ANY,
+
+	/**
+	 * Schedule the task if all the inputs are consumable.
+	 */
+	ALL
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 7c2b30db32a..15bdb36f1d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -101,7 +101,8 @@ public String toString() {
 			final ResultPartitionLocation partitionLocation;
 
 			// The producing task needs to be RUNNING or already FINISHED
-			if (consumedPartition.isConsumable() && producerSlot != null &&
+			if ((consumedPartition.getResultType().isPipelined() || consumedPartition.isConsumable()) &&
+				producerSlot != null &&
 					(producerState == ExecutionState.RUNNING ||
 						producerState == ExecutionState.FINISHED ||
 						producerState == ExecutionState.SCHEDULED ||
@@ -136,7 +137,8 @@ else if (producerState == ExecutionState.CANCELING
 			}
 			else {
 				String msg = String.format("Trying to eagerly schedule a task whose inputs " +
-					"are not ready (partition consumable? %s, producer state: %s, producer slot: %s).",
+					"are not ready (result type: %s, partition consumable: %s, producer state: %s, producer slot: %s).",
+						consumedPartition.getResultType(),
 						consumedPartition.isConsumable(),
 						producerState,
 						producerSlot);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e3b501e52e8..59f76502ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -747,31 +747,34 @@ else if (numConsumers == 0) {
 				consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(
 						partition, partitionExecution));
 
-				// When deploying a consuming task, its task deployment descriptor will contain all
-				// deployment information available at the respective time. It is possible that some
-				// of the partitions to be consumed have not been created yet. These are updated
-				// runtime via the update messages.
-				//
-				// TODO The current approach may send many update messages even though the consuming
-				// task has already been deployed with all necessary information. We have to check
-				// whether this is a problem and fix it, if it is.
-				CompletableFuture.supplyAsync(
-					() -> {
-						try {
-							final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
-							consumerVertex.scheduleForExecution(
-								executionGraph.getSlotProvider(),
-								executionGraph.isQueuedSchedulingAllowed(),
-								LocationPreferenceConstraint.ANY, // there must be at least one known location
-								Collections.emptySet());
-						} catch (Throwable t) {
-							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
+				// Schedule the consumer vertex if its inputs constraint is satisfied, otherwise postpone the scheduling
+				if (consumerVertex.checkInputDependencyConstraints()) {
+					// When deploying a consuming task, its task deployment descriptor will contain all
+					// deployment information available at the respective time. It is possible that some
+					// of the partitions to be consumed have not been created yet. These are updated
+					// runtime via the update messages.
+					//
+					// TODO The current approach may send many update messages even though the consuming
+					// task has already been deployed with all necessary information. We have to check
+					// whether this is a problem and fix it, if it is.
+					CompletableFuture.supplyAsync(
+						() -> {
+							try {
+								final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph();
+								consumerVertex.scheduleForExecution(
+									executionGraph.getSlotProvider(),
+									executionGraph.isQueuedSchedulingAllowed(),
+									LocationPreferenceConstraint.ANY, // there must be at least one known location
+									Collections.emptySet());
+							} catch (Throwable t) {
+								consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
 									"vertex " + consumerVertex, t));
-						}
+							}
 
-						return null;
-					},
-					executor);
+							return null;
+						},
+						executor);
+				}
 
 				// double check to resolve race conditions
 				if (consumerVertex.getExecutionState() == RUNNING) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 56315e07146..e89409f032c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -255,6 +256,9 @@
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
+	/** The input dependency constraint to schedule tasks. */
+	private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
+
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	private final AtomicInteger verticesFinished;
@@ -456,6 +460,14 @@ public ScheduleMode getScheduleMode() {
 		return scheduleMode;
 	}
 
+	public void setInputDependencyConstraint(InputDependencyConstraint inputDependencyConstraint) {
+		this.inputDependencyConstraint = inputDependencyConstraint;
+	}
+
+	public InputDependencyConstraint getInputDependencyConstraint() {
+		return inputDependencyConstraint;
+	}
+
 	public Time getAllocationTimeout() {
 		return allocationTimeout;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..c0d877dcb28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -179,6 +179,13 @@ public static ExecutionGraph buildGraph(
 		executionGraph.setScheduleMode(jobGraph.getScheduleMode());
 		executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
 
+		try {
+			executionGraph.setInputDependencyConstraint(
+                jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getInputDependencyConstraint());
+		} catch (IOException | ClassNotFoundException e) {
+			throw new JobException("Fail to deserialize execution config.", e);
+		}
+
 		try {
 			executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a0747296c53..22e02e2b13d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.InputDependencyConstraint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -686,6 +687,7 @@ void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
 
 		if (partition.getIntermediateResult().getResultType().isPipelined()) {
 			// Schedule or update receivers of this partition
+			partition.markDataProduced();
 			execution.scheduleOrUpdateConsumers(partition.getConsumers());
 		}
 		else {
@@ -726,6 +728,56 @@ void sendPartitionInfos() {
 		}
 	}
 
+	/**
+	 * Check whether the InputDependencyConstraint is satisfied for this vertex.
+	 *
+	 * @return whether the input constraint is satisfied
+	 */
+	public boolean checkInputDependencyConstraints() {
+		if (getExecutionGraph().getInputDependencyConstraint() == InputDependencyConstraint.ANY) {
+			// InputDependencyConstraint == ANY
+			for (int i = 0; i < inputEdges.length; i++) {
+				if (isInputConsumable(i)) {
+					return true;
+				}
+			}
+			return false;
+		} else {
+			// InputDependencyConstraint == ALL
+			for (int i = 0; i < inputEdges.length; i++) {
+				if (!isInputConsumable(i)) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}
+
+	/**
+	 * An input is consumable when
+	 * 1. the source result is PIPELINED and one of the result partition has produced data.
+	 * 2. the source result is BLOCKING and is FINISHED(all partitions are FINISHED).
+	 *
+	 * @return whether the input is consumable
+	 */
+	public boolean isInputConsumable(int inputNumber) {
+		IntermediateResult result = jobVertex.getInputs().get(inputNumber);
+
+		if (result.getResultType().isPipelined()) {
+			// For PIPELINED result, the input is consumable if any result partition has produced records or is finished
+			for (ExecutionEdge edge : inputEdges[inputNumber]) {
+				if (edge.getSource().hasDataProduced()) {
+					return true;
+				}
+			}
+		} else {
+			// For BLOCKING result, the input is consumable if all the partitions in the result are finished
+			return result.areAllPartitionsFinished();
+		}
+
+		return false;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Notifications from the Execution Attempt
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 313272cf86b..6e1d9ba69fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -156,19 +156,17 @@ public int getConnectionIndex() {
 
 	void resetForNewExecution() {
 		this.numberOfRunningProducers.set(numParallelProducers);
+		for (IntermediateResultPartition partition : partitions) {
+			partition.resetForNewExecution();
+		}
 	}
 
 	int decrementNumberOfRunningProducersAndGetRemaining() {
 		return numberOfRunningProducers.decrementAndGet();
 	}
 
-	boolean isConsumable() {
-		if (resultType.isPipelined()) {
-			return true;
-		}
-		else {
-			return numberOfRunningProducers.get() == 0;
-		}
+	boolean areAllPartitionsFinished() {
+		return numberOfRunningProducers.get() == 0;
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 124ceb2b6bc..56fb137eb59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -36,6 +36,11 @@
 
 	private List<List<ExecutionEdge>> consumers;
 
+	/**
+	 * Whether this partition has data produced. For pipelined result only.
+	 */
+	private boolean dataProduced = false;
+
 	public IntermediateResultPartition(IntermediateResult totalResult, ExecutionVertex producer, int partitionNumber) {
 		this.totalResult = totalResult;
 		this.producer = producer;
@@ -60,7 +65,7 @@ public IntermediateResultPartitionID getPartitionId() {
 		return partitionId;
 	}
 
-	ResultPartitionType getResultType() {
+	public ResultPartitionType getResultType() {
 		return totalResult.getResultType();
 	}
 
@@ -68,8 +73,24 @@ ResultPartitionType getResultType() {
 		return consumers;
 	}
 
+	public void markDataProduced() {
+		dataProduced = true;
+	}
+
+	public boolean hasDataProduced() {
+		return dataProduced;
+	}
+
 	public boolean isConsumable() {
-		return totalResult.isConsumable();
+		if (getResultType().isPipelined()) {
+			return dataProduced;
+		} else {
+			return totalResult.areAllPartitionsFinished();
+		}
+	}
+
+	void resetForNewExecution() {
+		dataProduced = false;
 	}
 
 	int addConsumerGroup() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
index 6aa36b70ab8..4f7417f27fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -192,6 +193,7 @@ private static ExecutionVertex mockExecutionVertex(ExecutionState state, Resourc
 
 	private static IntermediateResultPartition mockPartition(ExecutionVertex producer) {
 		IntermediateResultPartition partition = mock(IntermediateResultPartition.class);
+		when(partition.getResultType()).thenReturn(ResultPartitionType.PIPELINED);
 		when(partition.isConsumable()).thenReturn(true);
 
 		IntermediateResult result = mock(IntermediateResult.class);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
new file mode 100644
index 00000000000..8d131a0cb30
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the inputs constraint for {@link ExecutionVertex}.
+ */
+public class ExecutionVertexInputConstraintTest extends TestLogger {
+
+	@Test
+	public void testInputConsumable() throws Exception {
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+		ExecutionGraph eg = createExecutionGraph(ordered);
+
+		ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+		ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+		ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+		eg.scheduleForExecution();
+
+		// Inputs not consumable on init
+		assertFalse(ev31.isInputConsumable(0));
+		assertFalse(ev31.isInputConsumable(1));
+
+		// One pipelined input consumable on data produced
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertTrue(ev31.isInputConsumable(0));
+
+		// The blocking input not consumable if only one partition is FINISHED
+		ev21.getCurrentExecutionAttempt().markFinished();
+		assertFalse(ev31.isInputConsumable(1));
+
+		// The blocking input consumable if all partitions are FINISHED
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.isInputConsumable(1));
+
+		// Inputs not consumable after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.isInputConsumable(0));
+		assertFalse(ev31.isInputConsumable(1));
+	}
+
+	@Test
+	public void testInputConstraintANY() throws Exception {
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+		ExecutionGraph eg = createExecutionGraph(ordered);
+		eg.setInputDependencyConstraint(InputDependencyConstraint.ANY);
+
+		ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+		ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+		ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+		eg.scheduleForExecution();
+
+		// Inputs constraint not satisfied on init
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input1 consumable satisfies the constraint
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertTrue(ev31.checkInputDependencyConstraints());
+
+		// Inputs constraint not satisfied after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input2 consumable satisfies the constraint
+		waitUntilExecutionVertexState(ev21, ExecutionState.DEPLOYING, 2000L);
+		waitUntilExecutionVertexState(ev22, ExecutionState.DEPLOYING, 2000L);
+		ev21.getCurrentExecutionAttempt().markFinished();
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.isInputConsumable(1));
+	}
+
+	@Test
+	public void testInputConstraintALL() throws Exception {
+		JobVertex v1 = new JobVertex("vertex1");
+		JobVertex v2 = new JobVertex("vertex2");
+		JobVertex v3 = new JobVertex("vertex3");
+		v1.setParallelism(2);
+		v2.setParallelism(2);
+		v3.setParallelism(2);
+		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
+		v3.setInvokableClass(AbstractInvokable.class);
+		v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+		v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+		List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
+		ExecutionGraph eg = createExecutionGraph(ordered);
+		eg.setInputDependencyConstraint(InputDependencyConstraint.ALL);
+
+		ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+		ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+		ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+		ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+		ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+		ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+		eg.scheduleForExecution();
+
+		// Inputs constraint not satisfied on init
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input1 consumable does not satisfy the constraint
+		IntermediateResultPartition partition11 = ev11.getProducedPartitions().values().iterator().next();
+		ev11.scheduleOrUpdateConsumers(new ResultPartitionID(partition11.getPartitionId(),
+			ev11.getCurrentExecutionAttempt().getAttemptId()));
+		assertFalse(ev31.checkInputDependencyConstraints());
+
+		// Input2 consumable satisfies the constraint
+		ev21.getCurrentExecutionAttempt().markFinished();
+		ev22.getCurrentExecutionAttempt().markFinished();
+		assertTrue(ev31.isInputConsumable(1));
+
+		// Inputs constraint not satisfied after failover
+		ev11.fail(new Exception());
+		waitUntilJobRestarted(eg);
+		assertFalse(ev31.checkInputDependencyConstraints());
+	}
+
+	private static ExecutionGraph createExecutionGraph(List<JobVertex> ordered) throws Exception {
+		final JobID jobId = new JobID();
+		final String jobName = "Test Job Sample Name";
+
+		final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20);
+
+		ExecutionGraph eg = new ExecutionGraph(
+			new DummyJobInformation(
+				jobId,
+				jobName),
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			AkkaUtils.getDefaultTimeout(),
+			new FixedDelayRestartStrategy(1, 0),
+			new RestartAllStrategy.Factory(),
+			slotProvider);
+
+		eg.attachJobGraph(ordered);
+
+		return eg;
+	}
+
+	private void waitUntilJobRestarted(ExecutionGraph eg) throws Exception {
+		waitForAllExecutionsPredicate(eg,
+			isInExecutionState(ExecutionState.CANCELING)
+				.or(isInExecutionState(ExecutionState.CANCELED))
+				.or(isInExecutionState(ExecutionState.FAILED))
+				.or(isInExecutionState(ExecutionState.FINISHED)),
+			2000L);
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			if (ev.getCurrentExecutionAttempt().getState() == ExecutionState.CANCELING) {
+				ev.getCurrentExecutionAttempt().cancelingComplete();
+			}
+		}
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 2000L);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
new file mode 100644
index 00000000000..2cd00617d90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link IntermediateResultPartition}.
+ */
+public class IntermediateResultPartitionTest extends TestLogger {
+
+	@Test
+	public void testPipelinedPartitionConsumable() throws Exception {
+		ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
+		IntermediateResult result =
+			new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.PIPELINED);
+		ExecutionVertex vertex1 =
+			new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+		ExecutionVertex vertex2 =
+			new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1));
+
+		IntermediateResultPartition partition1 = result.getPartitions()[0];
+		IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+		// Not consumable on init
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Partition 1 consumable after data are produced
+		partition1.markDataProduced();
+		assertTrue(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Not consumable if failover happens
+		result.resetForNewExecution();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+	}
+
+	@Test
+	public void testBlockingPartitionConsumable() throws Exception {
+		ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
+		IntermediateResult result =
+			new IntermediateResult(new IntermediateDataSetID(), jobVertex, 2, ResultPartitionType.BLOCKING);
+		ExecutionVertex vertex1 =
+			new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+		ExecutionVertex vertex2 =
+			new ExecutionVertex(jobVertex, 1, new IntermediateResult[]{result}, Time.minutes(1));
+
+		IntermediateResultPartition partition1 = result.getPartitions()[0];
+		IntermediateResultPartition partition2 = result.getPartitions()[1];
+
+		// Not consumable on init
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Not consumable if only one partition is FINISHED
+		partition1.markFinished();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+
+		// Consumable after all partitions are FINISHED
+		partition2.markFinished();
+		assertTrue(partition1.isConsumable());
+		assertTrue(partition2.isConsumable());
+
+		// Not consumable if failover happens
+		result.resetForNewExecution();
+		assertFalse(partition1.isConsumable());
+		assertFalse(partition2.isConsumable());
+	}
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services