You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/11/10 20:53:40 UTC

[1/3] flink git commit: [FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling

Repository: flink
Updated Branches:
  refs/heads/release-1.1 9a19ca115 -> 0bd8e0279


[FLINK-5040] [jobmanager] Set correct input channel types with eager scheduling

This closes #2784.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55c506f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55c506f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55c506f2

Branch: refs/heads/release-1.1
Commit: 55c506f2ee58f70d9220d507256146df2a434381
Parents: b5a4cb6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Nov 9 18:25:06 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 21:53:30 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |  12 +-
 .../ResultPartitionDeploymentDescriptor.java    |  24 ++-
 .../runtime/executiongraph/ExecutionVertex.java |  14 +-
 .../io/network/partition/ResultPartition.java   |  10 +-
 .../flink/runtime/jobgraph/ScheduleMode.java    |  10 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../InputChannelDeploymentDescriptorTest.java   | 202 +++++++++++++++++++
 ...ResultPartitionDeploymentDescriptorTest.java |   6 +-
 .../ExecutionVertexDeploymentTest.java          |  60 +++++-
 .../network/partition/ResultPartitionTest.java  |  90 +++++++++
 .../consumer/LocalInputChannelTest.java         |   5 +-
 .../runtime/jobgraph/ScheduleModeTest.java      |  37 ++++
 .../runtime/taskmanager/TaskManagerTest.java    |   9 +-
 .../flink/runtime/taskmanager/TaskTest.java     |   5 +-
 15 files changed, 454 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index e00a480..6b87e69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -85,7 +85,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 	 * Creates an input channel deployment descriptor for each partition.
 	 */
 	public static InputChannelDeploymentDescriptor[] fromEdges(
-			ExecutionEdge[] edges, SimpleSlot consumerSlot) {
+			ExecutionEdge[] edges, SimpleSlot consumerSlot, boolean allowLazyDeployment) {
 
 		final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
 
@@ -101,8 +101,10 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 
 			// The producing task needs to be RUNNING or already FINISHED
 			if (consumedPartition.isConsumable() && producerSlot != null &&
-					(producerState == ExecutionState.RUNNING
-							|| producerState == ExecutionState.FINISHED)) {
+					(producerState == ExecutionState.RUNNING ||
+						producerState == ExecutionState.FINISHED ||
+						producerState == ExecutionState.SCHEDULED ||
+						producerState == ExecutionState.DEPLOYING)) {
 
 				final Instance partitionInstance = producerSlot.getInstance();
 
@@ -119,9 +121,11 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 					partitionLocation = ResultPartitionLocation.createRemote(connectionId);
 				}
 			}
-			else {
+			else if (allowLazyDeployment) {
 				// The producing task might not have registered the partition yet
 				partitionLocation = ResultPartitionLocation.createUnknown();
+			} else {
+				throw new IllegalStateException("Trying to eagerly schedule a task whose inputs are not ready.");
 			}
 
 			final ResultPartitionID consumedPartitionId = new ResultPartitionID(

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index ecdacbb..2ecde80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -47,12 +47,16 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 	/** The number of subpartitions. */
 	private final int numberOfSubpartitions;
+	
+	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
+	private final boolean lazyScheduling;
 
 	public ResultPartitionDeploymentDescriptor(
-		IntermediateDataSetID resultId,
-		IntermediateResultPartitionID partitionId,
-		ResultPartitionType partitionType,
-		int numberOfSubpartitions) {
+			IntermediateDataSetID resultId,
+			IntermediateResultPartitionID partitionId,
+			ResultPartitionType partitionType,
+			int numberOfSubpartitions,
+			boolean lazyScheduling) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionId = checkNotNull(partitionId);
@@ -60,6 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
+		this.lazyScheduling = lazyScheduling;
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -78,6 +83,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return numberOfSubpartitions;
 	}
 
+	public boolean allowLazyScheduling() {
+		return lazyScheduling;
+	}
+
 	@Override
 	public String toString() {
 		return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
@@ -87,7 +96,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 	// ------------------------------------------------------------------------
 
-	public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition) {
+	public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) {
 
 		final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
 		final IntermediateResultPartitionID partitionId = partition.getPartitionId();
@@ -102,14 +111,13 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
 
 			if (partition.getConsumers().size() > 1) {
-				new IllegalStateException("Currently, only a single consumer group per partition is supported.");
+				throw new IllegalStateException("Currently, only a single consumer group per partition is supported.");
 			}
 
 			numberOfSubpartitions = partition.getConsumers().get(0).size();
 		}
 
 		return new ResultPartitionDeploymentDescriptor(
-				resultId, partitionId, partitionType, numberOfSubpartitions
-		);
+				resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 309548d..c101548 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -638,18 +638,20 @@ public class ExecutionVertex implements Serializable {
 			int attemptNumber) {
 
 		// Produced intermediate results
-		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
+		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<>(resultPartitions.size());
+		
+		// Consumed intermediate results
+		List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<>(inputEdges.length);
+		
+		boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
 		for (IntermediateResultPartition partition : resultPartitions.values()) {
-			producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition));
+			producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
 		}
 
-		// Consumed intermediate results
-		List<InputGateDeploymentDescriptor> consumedPartitions = new ArrayList<InputGateDeploymentDescriptor>();
-
 		for (ExecutionEdge[] edges : inputEdges) {
 			InputChannelDeploymentDescriptor[] partitions = InputChannelDeploymentDescriptor
-					.fromEdges(edges, targetSlot);
+					.fromEdges(edges, targetSlot, lazyScheduling);
 
 			// If the produced partition has multiple consumers registered, we
 			// need to request the one matching our sub task index.

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index a60f95d..c30f333 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -93,6 +93,8 @@ public class ResultPartition implements BufferPoolOwner {
 
 	private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
+	private final boolean sendScheduleOrUpdateConsumersMessage;
+
 	// - Runtime state --------------------------------------------------------
 
 	private final AtomicBoolean isReleased = new AtomicBoolean();
@@ -129,7 +131,8 @@ public class ResultPartition implements BufferPoolOwner {
 		ResultPartitionManager partitionManager,
 		ResultPartitionConsumableNotifier partitionConsumableNotifier,
 		IOManager ioManager,
-		IOMode defaultIoMode) {
+		IOMode defaultIoMode,
+		boolean sendScheduleOrUpdateConsumersMessage) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
@@ -138,6 +141,7 @@ public class ResultPartition implements BufferPoolOwner {
 		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
+		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
 
 		// Create the subpartitions.
 		switch (partitionType) {
@@ -417,8 +421,8 @@ public class ResultPartition implements BufferPoolOwner {
 	/**
 	 * Notifies pipelined consumers of this result partition once.
 	 */
-	private void notifyPipelinedConsumers() throws IOException {
-		if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
+	private void notifyPipelinedConsumers() {
+		if (sendScheduleOrUpdateConsumersMessage && !hasNotifiedPipelinedConsumers && partitionType.isPipelined()) {
 			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
 
 			hasNotifiedPipelinedConsumers = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
index 330519d..78b7b45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/ScheduleMode.java
@@ -30,6 +30,14 @@ public enum ScheduleMode {
 	/**
 	 * Schedule tasks all at once instead of lazy deployment of receiving tasks.
 	 */
-	ALL
+	ALL;
+
+	/**
+	 * Returns whether we are allowed to deploy consumers lazily.
+	 */
+	public boolean allowLazyDeployment() {
+		return this != ALL;
+	}
 
 }
+

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dd14aaf..2179fc1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -336,7 +336,8 @@ public class Task implements Runnable {
 					networkEnvironment.getPartitionManager(),
 					networkEnvironment.getPartitionConsumableNotifier(),
 					ioManager,
-					networkEnvironment.getDefaultIOMode());
+					networkEnvironment.getDefaultIOMode(),
+					desc.allowLazyScheduling());
 
 			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
 
@@ -1088,7 +1089,11 @@ public class Task implements Runnable {
 			final SingleInputGate inputGate = inputGatesById.get(resultId);
 
 			if (inputGate != null) {
-				if (partitionState == ExecutionState.RUNNING) {
+				if (partitionState == ExecutionState.RUNNING ||
+					partitionState == ExecutionState.FINISHED ||
+					partitionState == ExecutionState.SCHEDULED ||
+					partitionState == ExecutionState.DEPLOYING) {
+
 					// Retrigger the partition request
 					inputGate.retriggerPartitionRequest(partitionId);
 				}
@@ -1245,7 +1250,6 @@ public class Task implements Runnable {
 			try {
 				if (watchDogThread != null) {
 					watchDogThread.start();
-					logger.info("Started cancellation watch dog");
 				}
 
 				// the user-defined cancel method may throw errors.

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 106ffb6..41218c9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -855,7 +855,7 @@ class JobManager(
           if (execution != null) execution.getState else null
         case None =>
           // Nothing to do. This is not an error, because the request is received when a sending
-          // task fails during a remote partition request.
+          // task fails or is not yet available during a remote partition request.
           log.debug(s"Cannot find execution graph for job $jobId.")
 
           null

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
new file mode 100644
index 0000000..cda0f4d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptorTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.junit.Test;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class InputChannelDeploymentDescriptorTest {
+
+	/**
+	 * Tests the deployment descriptors for local, remote, and unknown partition
+	 * locations (with lazy deployment allowed and all execution states for the
+	 * producers).
+	 */
+	@Test
+	public void testMixedLocalRemoteUnknownDeployment() throws Exception {
+		boolean allowLazyDeployment = true;
+
+		ExecutionVertex consumer = mock(ExecutionVertex.class);
+		SimpleSlot consumerSlot = mockSlot(createConnInfo(5000));
+
+		// Local and remote channel are only allowed for certain execution
+		// states.
+		for (ExecutionState state : ExecutionState.values()) {
+			// Local partition
+			ExecutionVertex localProducer = mockExecutionVertex(state, consumerSlot);
+			IntermediateResultPartition localPartition = mockPartition(localProducer);
+			ResultPartitionID localPartitionId = new ResultPartitionID(localPartition.getPartitionId(), localProducer.getCurrentExecutionAttempt().getAttemptId());
+			ExecutionEdge localEdge = new ExecutionEdge(localPartition, consumer, 0);
+
+			// Remote partition
+			InstanceConnectionInfo connInfo = createConnInfo(6000);
+			ExecutionVertex remoteProducer = mockExecutionVertex(state, mockSlot(connInfo)); // new slot
+			IntermediateResultPartition remotePartition = mockPartition(remoteProducer);
+			ResultPartitionID remotePartitionId = new ResultPartitionID(remotePartition.getPartitionId(), remoteProducer.getCurrentExecutionAttempt().getAttemptId());
+			ConnectionID remoteConnectionId = new ConnectionID(connInfo, 0);
+			ExecutionEdge remoteEdge = new ExecutionEdge(remotePartition, consumer, 1);
+
+			// Unknown partition
+			ExecutionVertex unknownProducer = mockExecutionVertex(state, null); // no assigned resource
+			IntermediateResultPartition unknownPartition = mockPartition(unknownProducer);
+			ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+			ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2);
+
+			InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
+				new ExecutionEdge[]{localEdge, remoteEdge, unknownEdge},
+				consumerSlot,
+				allowLazyDeployment);
+
+			assertEquals(3, desc.length);
+
+			// These states are allowed
+			if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHED ||
+				state == ExecutionState.SCHEDULED || state == ExecutionState.DEPLOYING) {
+
+				// Create local or remote channels
+				assertEquals(localPartitionId, desc[0].getConsumedPartitionId());
+				assertTrue(desc[0].getConsumedPartitionLocation().isLocal());
+				assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+				assertEquals(remotePartitionId, desc[1].getConsumedPartitionId());
+				assertTrue(desc[1].getConsumedPartitionLocation().isRemote());
+				assertEquals(remoteConnectionId, desc[1].getConsumedPartitionLocation().getConnectionId());
+			} else {
+				// Unknown (lazy deployment allowed)
+				assertEquals(localPartitionId, desc[0].getConsumedPartitionId());
+				assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+				assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+				assertEquals(remotePartitionId, desc[1].getConsumedPartitionId());
+				assertTrue(desc[1].getConsumedPartitionLocation().isUnknown());
+				assertNull(desc[1].getConsumedPartitionLocation().getConnectionId());
+			}
+
+			assertEquals(unknownPartitionId, desc[2].getConsumedPartitionId());
+			assertTrue(desc[2].getConsumedPartitionLocation().isUnknown());
+			assertNull(desc[2].getConsumedPartitionLocation().getConnectionId());
+		}
+	}
+
+	@Test
+	public void testUnknownChannelWithoutLazyDeploymentThrows() throws Exception {
+		ExecutionVertex consumer = mock(ExecutionVertex.class);
+		SimpleSlot consumerSlot = mock(SimpleSlot.class);
+
+		// Unknown partition
+		ExecutionVertex unknownProducer = mockExecutionVertex(ExecutionState.CREATED, null); // no assigned resource
+		IntermediateResultPartition unknownPartition = mockPartition(unknownProducer);
+		ResultPartitionID unknownPartitionId = new ResultPartitionID(unknownPartition.getPartitionId(), unknownProducer.getCurrentExecutionAttempt().getAttemptId());
+		ExecutionEdge unknownEdge = new ExecutionEdge(unknownPartition, consumer, 2);
+
+		// This should work if lazy deployment is allowed
+		boolean allowLazyDeployment = true;
+
+		InputChannelDeploymentDescriptor[] desc = InputChannelDeploymentDescriptor.fromEdges(
+			new ExecutionEdge[]{unknownEdge},
+			consumerSlot,
+			allowLazyDeployment);
+
+		assertEquals(1, desc.length);
+
+		assertEquals(unknownPartitionId, desc[0].getConsumedPartitionId());
+		assertTrue(desc[0].getConsumedPartitionLocation().isUnknown());
+		assertNull(desc[0].getConsumedPartitionLocation().getConnectionId());
+
+
+		try {
+			// Fail if lazy deployment is *not* allowed
+			allowLazyDeployment = false;
+
+			InputChannelDeploymentDescriptor.fromEdges(
+				new ExecutionEdge[]{unknownEdge},
+				consumerSlot,
+				allowLazyDeployment);
+
+			fail("Did not throw expected IllegalStateException");
+		} catch (IllegalStateException ignored) {
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static InstanceConnectionInfo createConnInfo(int port) {
+		return new InstanceConnectionInfo(InetAddress.getLoopbackAddress(), port);
+	}
+
+	private static SimpleSlot mockSlot(InstanceConnectionInfo connInfo) {
+		SimpleSlot slot = mock(SimpleSlot.class);
+		Instance instance = mock(Instance.class);
+		when(slot.getInstance()).thenReturn(instance);
+		when(instance.getInstanceConnectionInfo()).thenReturn(connInfo);
+
+		return slot;
+	}
+
+	private static ExecutionVertex mockExecutionVertex(ExecutionState state, SimpleSlot slot) {
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+
+		Execution exec = mock(Execution.class);
+		when(exec.getState()).thenReturn(state);
+		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+		when(exec.getAssignedResource()).thenReturn(slot);
+		when(vertex.getCurrentAssignedResource()).thenReturn(slot);
+
+		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+		return vertex;
+	}
+
+	private static IntermediateResultPartition mockPartition(ExecutionVertex producer) {
+		IntermediateResultPartition partition = mock(IntermediateResultPartition.class);
+		when(partition.isConsumable()).thenReturn(true);
+
+		IntermediateResult result = mock(IntermediateResult.class);
+		when(result.getConnectionIndex()).thenReturn(0);
+
+		when(partition.getIntermediateResult()).thenReturn(result);
+		when(partition.getPartitionId()).thenReturn(new IntermediateResultPartitionID());
+
+		when(partition.getProducer()).thenReturn(producer);
+
+		return partition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1986eae..4223b49 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ResultPartitionDeploymentDescriptorTest {
 
@@ -44,8 +45,8 @@ public class ResultPartitionDeploymentDescriptorTest {
 						resultId,
 						partitionId,
 						partitionType,
-						numberOfSubpartitions
-				);
+						numberOfSubpartitions,
+						true);
 
 		ResultPartitionDeploymentDescriptor copy =
 				CommonTestUtils.createCopySerializable(orig);
@@ -54,5 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
 		assertEquals(partitionId, copy.getPartitionId());
 		assertEquals(partitionType, copy.getPartitionType());
 		assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
+		assertTrue(copy.allowLazyScheduling());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 81ec6c9..1f5c915 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -18,19 +18,37 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.*;
-
-import static org.junit.Assert.*;
-
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ERROR_MESSAGE;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleFailingActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ExecutionVertexDeploymentTest {
 
@@ -331,4 +349,34 @@ public class ExecutionVertexDeploymentTest {
 			fail(e.getMessage());
 		}
 	}
-}
\ No newline at end of file
+
+	/**
+	 * Tests that the lazy scheduling flag is correctly forwarded to the produced partition descriptors.
+	 */
+	@Test
+	public void testTddProducedPartitionsLazyScheduling() throws Exception {
+		TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
+		ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context);
+		IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
+		ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, new FiniteDuration(1, TimeUnit.MINUTES));
+
+		Slot root = mock(Slot.class);
+		when(root.getSlotNumber()).thenReturn(1);
+		SimpleSlot slot = mock(SimpleSlot.class);
+		when(slot.getRoot()).thenReturn(root);
+
+		for (ScheduleMode mode : ScheduleMode.values()) {
+			vertex.getExecutionGraph().setScheduleMode(mode);
+
+			TaskDeploymentDescriptor tdd = vertex.createDeploymentDescriptor(new ExecutionAttemptID(), slot, null, null, 1);
+
+			List<ResultPartitionDeploymentDescriptor> producedPartitions = tdd.getProducedPartitions();
+
+			assertEquals(1, producedPartitions.size());
+			ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0);
+			assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling());
+		}
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
new file mode 100644
index 0000000..302b667
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ResultPartitionTest {
+
+	/**
+	 * Tests the schedule or update consumers message sending behaviour depending on the relevant flags.
+	 */
+	@Test
+	public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
+		{
+			// Pipelined, send message => notify
+			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
+			partition.add(TestBufferFactory.createBuffer(), 0);
+			verify(notifier, times(1)).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class));
+		}
+
+		{
+			// Pipelined, don't send message => don't notify
+			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false);
+			partition.add(TestBufferFactory.createBuffer(), 0);
+			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class));
+		}
+
+		{
+			// Blocking, send message => don't notify
+			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true);
+			partition.add(TestBufferFactory.createBuffer(), 0);
+			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class));
+		}
+
+		{
+			// Blocking, don't send message => don't notify
+			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false);
+			partition.add(TestBufferFactory.createBuffer(), 0);
+			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static ResultPartition createPartition(
+		ResultPartitionConsumableNotifier notifier,
+		ResultPartitionType type,
+		boolean sendScheduleOrUpdateConsumersMessage) {
+		return new ResultPartition(
+			"TestTask",
+			new JobID(),
+			new ResultPartitionID(),
+			type,
+			1,
+			mock(ResultPartitionManager.class),
+			notifier,
+			mock(IOManager.class),
+			IOManager.IOMode.SYNC,
+			sendScheduleOrUpdateConsumersMessage);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 88a3ff5..ee28b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -119,11 +119,12 @@ public class LocalInputChannelTest {
 					jobId,
 					partitionIds[i],
 					ResultPartitionType.PIPELINED,
-				parallelism,
+					parallelism,
 					partitionManager,
 					partitionConsumableNotifier,
 					ioManager,
-					ASYNC);
+					ASYNC,
+					true);
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
new file mode 100644
index 0000000..aa5d12c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/ScheduleModeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ScheduleModeTest {
+
+	/**
+	 * Test that schedule modes set the lazy deployment flag correctly.
+	 */
+	@Test
+	public void testAllowLazyDeployment() throws Exception {
+		assertTrue(ScheduleMode.FROM_SOURCES.allowLazyDeployment());
+		assertTrue(ScheduleMode.BACKTRACKING.allowLazyDeployment());
+		assertFalse(ScheduleMode.ALL.allowLazyDeployment());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 779a17d..431cbb8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -1430,9 +1430,8 @@ public class TaskManagerTest extends TestLogger {
 				new IntermediateDataSetID(),
 				new IntermediateResultPartitionID(),
 				ResultPartitionType.PIPELINED,
-				1
-				// don't deploy eagerly but with the first completed memory buffer
-			);
+				1,
+				true);
 
 			final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
 				"TestTask", 0, 1, 0, new Configuration(), new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/55c506f2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 403836c..b5056ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -542,6 +542,9 @@ public class TaskTest extends TestLogger {
 		}
 
 		expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
+		expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
+		expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
+		expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
 
 		expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
 		expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
@@ -557,7 +560,7 @@ public class TaskTest extends TestLogger {
 			assertEquals(expected.get(state), newTaskState);
 		}
 
-		verify(inputGate, times(1)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
+		verify(inputGate, times(4)).retriggerPartitionRequest(eq(partitionId.getPartitionId()));
 	}
 
 	/**


[3/3] flink git commit: [FLINK-5040] [taskmanager] Adjust partition request backoffs

Posted by uc...@apache.org.
[FLINK-5040] [taskmanager] Adjust partition request backoffs

The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bd8e027
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bd8e027
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bd8e027

Branch: refs/heads/release-1.1
Commit: 0bd8e027934fc34302c5ddb48f9e9aa448a58721
Parents: 55c506f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 21:53:31 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    | 11 +++
 .../ResultPartitionDeploymentDescriptor.java    | 10 +--
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../partition/consumer/SingleInputGate.java     |  8 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../NetworkEnvironmentConfiguration.scala       | 12 +--
 .../flink/runtime/taskmanager/TaskManager.scala |  8 ++
 ...ResultPartitionDeploymentDescriptorTest.java |  2 +-
 .../ExecutionVertexDeploymentTest.java          |  2 +-
 .../io/network/NetworkEnvironmentTest.java      |  3 +-
 .../partition/consumer/SingleInputGateTest.java | 83 ++++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java | 10 +--
 .../runtime/taskmanager/TaskManagerTest.java    |  4 +
 13 files changed, 135 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d9ccb35..1431eae 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -216,6 +216,11 @@ public final class ConfigConstants {
 	 */
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
+	/** Minimum backoff for partition requests of input channels. */
+	public static final String NETWORK_REQUEST_BACKOFF_INITIAL_KEY = "taskmanager.net.request-backoff.initial";
+
+	public static final String NETWORK_REQUEST_BACKOFF_MAX_KEY = "taskmanager.net.request-backoff.max";
+
 	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
 	 */
@@ -823,6 +828,12 @@ public final class ConfigConstants {
 	 */
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
 
+	/** Initial backoff for partition requests of input channels. */
+	public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL = 100;
+
+	/** Maximum backoff for partition requests of input channels. */
+	public static final int DEFAULT_NETWORK_REQUEST_BACKOFF_MAX = 10000;
+
 	/**
 	 * Default size of memory segments in the network stack and the memory manager.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..3edd279 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,14 +49,14 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 	private final int numberOfSubpartitions;
 	
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
-	private final boolean lazyScheduling;
+	private final boolean sendScheduleOrUpdateConsumersMessage;
 
 	public ResultPartitionDeploymentDescriptor(
 			IntermediateDataSetID resultId,
 			IntermediateResultPartitionID partitionId,
 			ResultPartitionType partitionType,
 			int numberOfSubpartitions,
-			boolean lazyScheduling) {
+			boolean sendScheduleOrUpdateConsumersMessage) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionId = checkNotNull(partitionId);
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
-		this.lazyScheduling = lazyScheduling;
+		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return numberOfSubpartitions;
 	}
 
-	public boolean allowLazyScheduling() {
-		return lazyScheduling;
+	public boolean sendScheduleOrUpdateConsumersMessage() {
+		return sendScheduleOrUpdateConsumersMessage;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 11661cc..d3715ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -148,7 +148,7 @@ public class NetworkEnvironment {
 	}
 
 	public Tuple2<Integer, Integer> getPartitionRequestInitialAndMaxBackoff() {
-		return configuration.partitionRequestInitialAndMaxBackoff();
+		return configuration.partitionRequestInitialMaxBackoff();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..212aade 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -496,6 +497,13 @@ public class SingleInputGate implements InputGate {
 
 	// ------------------------------------------------------------------------
 
+	@VisibleForTesting
+	Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+		return inputChannels;
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Creates an input gate and all of its input channels.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 2179fc1..56aea1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -337,7 +337,7 @@ public class Task implements Runnable {
 					networkEnvironment.getPartitionConsumableNotifier(),
 					ioManager,
 					networkEnvironment.getDefaultIOMode(),
-					desc.allowLazyScheduling());
+					desc.sendScheduleOrUpdateConsumersMessage());
 
 			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 619da96..b7fa140 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -24,9 +24,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-  numNetworkBuffers: Int,
-  networkBufferSize: Int,
-  memoryType: MemoryType,
-  ioMode: IOMode,
-  nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+    numNetworkBuffers: Int,
+    networkBufferSize: Int,
+    memoryType: MemoryType,
+    ioMode: IOMode,
+    partitionRequestInitialMaxBackoff : (Integer, Integer),
+    nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c6759c1..40ae234 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -2137,11 +2137,19 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
+    val initialRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_INITIAL)
+    val maxRequestBackoff = configuration.getInteger(
+      ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY,
+      ConfigConstants.DEFAULT_NETWORK_REQUEST_BACKOFF_MAX)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      (initialRequestBackoff, maxRequestBackoff),
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
 		assertEquals(partitionId, copy.getPartitionId());
 		assertEquals(partitionType, copy.getPartitionType());
 		assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
-		assertTrue(copy.allowLazyScheduling());
+		assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 1f5c915..b3e6b63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -374,7 +374,7 @@ public class ExecutionVertexDeploymentTest {
 
 			assertEquals(1, producedPartitions.size());
 			ResultPartitionDeploymentDescriptor desc = producedPartitions.get(0);
-			assertEquals(mode.allowLazyDeployment(), desc.allowLazyScheduling());
+			assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a659be3..ca4b7fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -62,8 +62,7 @@ public class NetworkEnvironmentTest {
 			NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration());
 			NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
 					NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP,
-					IOManager.IOMode.SYNC, new Some<>(nettyConf),
-					new Tuple2<>(0, 0));
+					IOManager.IOMode.SYNC, new Tuple2<>(0, 0), new Some<>(nettyConf));
 
 			NetworkEnvironment env = new NetworkEnvironment(
 				TestingUtils.defaultExecutionContext(),

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..9c8be81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -43,9 +46,12 @@ import org.junit.Test;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -269,6 +275,83 @@ public class SingleInputGateTest {
 	}
 
 	/**
+	 * Tests request back off configuration is correctly forwarded to the channels.
+	 */
+	@Test
+	public void testRequestBackoffConfiguration() throws Exception {
+		ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+			new ResultPartitionID(),
+			new ResultPartitionID(),
+			new ResultPartitionID()
+		};
+
+		InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
+			// Local
+			new InputChannelDeploymentDescriptor(
+				partitionIds[0],
+				ResultPartitionLocation.createLocal()),
+			// Remote
+			new InputChannelDeploymentDescriptor(
+				partitionIds[1],
+				ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+			// Unknown
+			new InputChannelDeploymentDescriptor(
+				partitionIds[2],
+				ResultPartitionLocation.createUnknown())};
+
+		InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+
+		int initialBackoff = 137;
+		int maxBackoff = 1001;
+
+		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+		when(netEnv.getPartitionManager()).thenReturn(new ResultPartitionManager());
+		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
+		when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+		when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new Tuple2<>(initialBackoff, maxBackoff));
+		when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+
+		SingleInputGate gate = SingleInputGate.create(
+			"TestTask",
+			new JobID(),
+			new ExecutionAttemptID(),
+			gateDesc,
+			netEnv,
+			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+
+		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+
+		assertEquals(3, channelMap.size());
+		InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+		assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+		InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+		assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+
+		InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+		assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+
+		InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
+		for (InputChannel ch : channels) {
+			assertEquals(0, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+			assertFalse(ch.increaseBackoff());
+		}
+	}
+
+	/**
 	 * Returns whether the stack trace represents a Thread in a blocking queue
 	 * poll call.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 60bf8e7..b4c456c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
@@ -42,7 +40,6 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
-
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -54,6 +51,10 @@ import scala.concurrent.duration.FiniteDuration;
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TaskManagerComponentsStartupShutdownTest {
 
 	/**
@@ -98,8 +99,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 					config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
-					new Tuple2<Integer, Integer>(0, 0));
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, new Tuple2<>(0, 0), Option.<NettyConfig>empty());
 
 			final InstanceConnectionInfo connectionInfo = new InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bd8e027/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 431cbb8..f2fd859 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -903,6 +903,8 @@ public class TaskManagerTest extends TestLogger {
 				final int dataPort = NetUtils.getAvailablePort();
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
 				taskManager = TestingUtils.createTaskManager(
 						system,
@@ -999,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
 
 				final int dataPort = NetUtils.getAvailablePort();
 				final Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_INITIAL_KEY, 100);
+				config.setInteger(ConfigConstants.NETWORK_REQUEST_BACKOFF_MAX_KEY, 200);
 
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
 


[2/3] flink git commit: Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"

Posted by uc...@apache.org.
Revert "[FLINK-3232] [runtime] Add option to eagerly deploy channels"

The reverted commit did not really fix anything, but hid the problem by
brute force, sending many more schedule or update consumers messages.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b5a4cb6c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b5a4cb6c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b5a4cb6c

Branch: refs/heads/release-1.1
Commit: b5a4cb6cc9ba7099045f145a2fe3c58567253b4f
Parents: 9a19ca1
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Nov 10 14:01:22 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Nov 10 21:53:30 2016 +0100

----------------------------------------------------------------------
 .../ResultPartitionDeploymentDescriptor.java    | 32 ++-----
 .../executiongraph/ExecutionJobVertex.java      |  3 +-
 .../executiongraph/IntermediateResult.java      | 17 +---
 .../runtime/io/network/NetworkEnvironment.java  | 13 ---
 .../io/network/partition/ResultPartition.java   | 40 ++------
 .../runtime/jobgraph/IntermediateDataSet.java   | 23 -----
 .../flink/runtime/jobgraph/JobVertex.java       | 12 +--
 .../apache/flink/runtime/taskmanager/Task.java  |  1 -
 .../NetworkEnvironmentConfiguration.scala       |  1 +
 ...ResultPartitionDeploymentDescriptorTest.java |  6 +-
 .../ExecutionGraphDeploymentTest.java           |  2 +-
 .../io/network/NetworkEnvironmentTest.java      | 98 --------------------
 .../consumer/LocalInputChannelTest.java         |  3 +-
 .../runtime/jobgraph/JobTaskVertexTest.java     | 40 ++------
 .../runtime/taskmanager/TaskManagerTest.java    |  8 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  9 +-
 16 files changed, 41 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index e72d468..ecdacbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -48,20 +48,11 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 	/** The number of subpartitions. */
 	private final int numberOfSubpartitions;
 
-	/**
-	 * Flag indicating whether to eagerly deploy consumers.
-	 *
-	 * <p>If <code>true</code>, the consumers are deployed as soon as the
-	 * runtime result is registered at the result manager of the task manager.
-	 */
-	private final boolean eagerlyDeployConsumers;
-
 	public ResultPartitionDeploymentDescriptor(
-			IntermediateDataSetID resultId,
-			IntermediateResultPartitionID partitionId,
-			ResultPartitionType partitionType,
-			int numberOfSubpartitions,
-			boolean eagerlyDeployConsumers) {
+		IntermediateDataSetID resultId,
+		IntermediateResultPartitionID partitionId,
+		ResultPartitionType partitionType,
+		int numberOfSubpartitions) {
 
 		this.resultId = checkNotNull(resultId);
 		this.partitionId = checkNotNull(partitionId);
@@ -69,7 +60,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
-		this.eagerlyDeployConsumers = eagerlyDeployConsumers;
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -88,16 +78,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		return numberOfSubpartitions;
 	}
 
-	/**
-	 * Returns whether consumers should be deployed eagerly (as soon as they
-	 * are registered at the result manager of the task manager).
-	 *
-	 * @return Whether consumers should be deployed eagerly
-	 */
-	public boolean getEagerlyDeployConsumers() {
-		return eagerlyDeployConsumers;
-	}
-
 	@Override
 	public String toString() {
 		return String.format("ResultPartitionDeploymentDescriptor [result id: %s, "
@@ -129,7 +109,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
 		}
 
 		return new ResultPartitionDeploymentDescriptor(
-				resultId, partitionId, partitionType, numberOfSubpartitions,
-				partition.getIntermediateResult().getEagerlyDeployConsumers());
+				resultId, partitionId, partitionType, numberOfSubpartitions
+		);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 9e175f1..65259ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -147,8 +147,7 @@ public class ExecutionJobVertex implements Serializable {
 					result.getId(),
 					this,
 					numTaskVertices,
-					result.getResultType(),
-					result.getEagerlyDeployConsumers());
+					result.getResultType());
 		}
 
 		// create all task vertices

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 9d57014..67b1fe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -46,14 +46,11 @@ public class IntermediateResult {
 
 	private final ResultPartitionType resultType;
 
-	private final boolean eagerlyDeployConsumers;
-
 	public IntermediateResult(
-			IntermediateDataSetID id,
-			ExecutionJobVertex producer,
-			int numParallelProducers,
-			ResultPartitionType resultType,
-			boolean eagerlyDeployConsumers) {
+		IntermediateDataSetID id,
+		ExecutionJobVertex producer,
+		int numParallelProducers,
+		ResultPartitionType resultType) {
 
 		this.id = checkNotNull(id);
 		this.producer = checkNotNull(producer);
@@ -71,8 +68,6 @@ public class IntermediateResult {
 
 		// The runtime type for this produced result
 		this.resultType = checkNotNull(resultType);
-
-		this.eagerlyDeployConsumers = eagerlyDeployConsumers;
 	}
 
 	public void setPartition(int partitionNumber, IntermediateResultPartition partition) {
@@ -108,10 +103,6 @@ public class IntermediateResult {
 		return resultType;
 	}
 
-	public boolean getEagerlyDeployConsumers() {
-		return eagerlyDeployConsumers;
-	}
-
 	public int registerConsumer() {
 		final int index = numConsumers;
 		numConsumers++;

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 30d2e38..11661cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -277,8 +277,6 @@ public class NetworkEnvironment {
 			throw new IllegalStateException("Unequal number of writers and partitions.");
 		}
 
-		ResultPartitionConsumableNotifier jobManagerNotifier;
-
 		synchronized (lock) {
 			if (isShutdown) {
 				throw new IllegalStateException("NetworkEnvironment is shut down");
@@ -340,17 +338,6 @@ public class NetworkEnvironment {
 					}
 				}
 			}
-
-			// Copy the reference to prevent races with concurrent shut downs
-			jobManagerNotifier = partitionConsumableNotifier;
-		}
-
-		for (ResultPartition partition : producedPartitions) {
-			// Eagerly notify consumers if required.
-			if (partition.getEagerlyDeployConsumers()) {
-				jobManagerNotifier.notifyPartitionConsumable(
-						partition.getJobId(), partition.getPartitionId());
-			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 7c109f3..a60f95d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -28,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,14 +86,6 @@ public class ResultPartition implements BufferPoolOwner {
 	/** Type of this partition. Defines the concrete subpartition implementation to use. */
 	private final ResultPartitionType partitionType;
 
-	/**
-	 * Flag indicating whether to eagerly deploy consumers.
-	 *
-	 * <p>If <code>true</code>, the consumers are deployed as soon as the
-	 * runtime result is registered at the result manager of the task manager.
-	 */
-	private final boolean eagerlyDeployConsumers;
-
 	/** The subpartitions of this partition. At least one. */
 	private final ResultSubpartition[] subpartitions;
 
@@ -129,22 +121,20 @@ public class ResultPartition implements BufferPoolOwner {
 	private long totalNumberOfBytes;
 
 	public ResultPartition(
-			String owningTaskName,
-			JobID jobId,
-			ResultPartitionID partitionId,
-			ResultPartitionType partitionType,
-			boolean eagerlyDeployConsumers,
-			int numberOfSubpartitions,
-			ResultPartitionManager partitionManager,
-			ResultPartitionConsumableNotifier partitionConsumableNotifier,
-			IOManager ioManager,
-			IOMode defaultIoMode) {
+		String owningTaskName,
+		JobID jobId,
+		ResultPartitionID partitionId,
+		ResultPartitionType partitionType,
+		int numberOfSubpartitions,
+		ResultPartitionManager partitionManager,
+		ResultPartitionConsumableNotifier partitionConsumableNotifier,
+		IOManager ioManager,
+		IOMode defaultIoMode) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
-		this.eagerlyDeployConsumers = eagerlyDeployConsumers;
 		this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
 		this.partitionManager = checkNotNull(partitionManager);
 		this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
@@ -211,16 +201,6 @@ public class ResultPartition implements BufferPoolOwner {
 		return subpartitions.length;
 	}
 
-	/**
-	 * Returns whether consumers should be deployed eagerly (as soon as they
-	 * are registered at the result manager of the task manager).
-	 *
-	 * @return Whether consumers should be deployed eagerly
-	 */
-	public boolean getEagerlyDeployConsumers() {
-		return eagerlyDeployConsumers;
-	}
-
 	public BufferProvider getBufferProvider() {
 		return bufferPool;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
index c30c78e..fdc5d1f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java
@@ -88,29 +88,6 @@ public class IntermediateDataSet implements java.io.Serializable {
 		return resultType;
 	}
 
-	/**
-	 * Sets the flag indicating whether to eagerly deploy consumers (default:
-	 * <code>false</code>).
-	 *
-	 * @param eagerlyDeployConsumers If <code>true</code>, the consumers are
-	 *                               deployed as soon as the runtime result is
-	 *                               registered at the result manager of the
-	 *                               task manager. Default is <code>false</code>.
-	 */
-	public void setEagerlyDeployConsumers(boolean eagerlyDeployConsumers) {
-		this.eagerlyDeployConsumers = eagerlyDeployConsumers;
-	}
-
-	/**
-	 * Returns whether consumers should be deployed eagerly (as soon as they
-	 * are registered at the result manager of the task manager).
-	 *
-	 * @return Whether consumers should be deployed eagerly
-	 */
-	public boolean getEagerlyDeployConsumers() {
-		return eagerlyDeployConsumers;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	
 	public void addConsumer(JobEdge edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 379a42a..47dcb36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -357,7 +357,7 @@ public class JobVertex implements java.io.Serializable {
 	}
 
 	public JobEdge connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) {
-		return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED, false);
+		return connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
 	}
 
 	public JobEdge connectNewDataSetAsInput(
@@ -365,17 +365,7 @@ public class JobVertex implements java.io.Serializable {
 			DistributionPattern distPattern,
 			ResultPartitionType partitionType) {
 
-		return connectNewDataSetAsInput(input, distPattern, partitionType, false);
-	}
-
-	public JobEdge connectNewDataSetAsInput(
-			JobVertex input,
-			DistributionPattern distPattern,
-			ResultPartitionType partitionType,
-			boolean eagerlyDeployConsumers) {
-
 		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
-		dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
 
 		JobEdge edge = new JobEdge(dataSet, this, distPattern);
 		this.inputs.add(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 00046b7..dd14aaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -332,7 +332,6 @@ public class Task implements Runnable {
 					jobId,
 					partitionId,
 					desc.getPartitionType(),
-					desc.getEagerlyDeployConsumers(),
 					desc.getNumberOfSubpartitions(),
 					networkEnvironment.getPartitionManager(),
 					networkEnvironment.getPartitionConsumableNotifier(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 065211c..619da96 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager
 
 import org.apache.flink.core.memory.MemoryType
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
+
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index d2fcc7b..1986eae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -38,15 +38,14 @@ public class ResultPartitionDeploymentDescriptorTest {
 		IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 		ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
 		int numberOfSubpartitions = 24;
-		boolean eagerlyDeployConsumers = true;
 
 		ResultPartitionDeploymentDescriptor orig =
 				new ResultPartitionDeploymentDescriptor(
 						resultId,
 						partitionId,
 						partitionType,
-						numberOfSubpartitions,
-						eagerlyDeployConsumers);
+						numberOfSubpartitions
+				);
 
 		ResultPartitionDeploymentDescriptor copy =
 				CommonTestUtils.createCopySerializable(orig);
@@ -55,6 +54,5 @@ public class ResultPartitionDeploymentDescriptorTest {
 		assertEquals(partitionId, copy.getPartitionId());
 		assertEquals(partitionType, copy.getPartitionType());
 		assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
-		assertEquals(eagerlyDeployConsumers, copy.getEagerlyDeployConsumers());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 332c8cd..cfbde6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -312,7 +312,7 @@ public class ExecutionGraphDeploymentTest {
 		v1.setInvokableClass(BatchTask.class);
 		v2.setInvokableClass(BatchTask.class);
 
-		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false);
+		v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
 
 		// execution graph that executes actions synchronously
 		ExecutionGraph eg = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index fca3ceb..a659be3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -18,29 +18,19 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.junit.Test;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
-import scala.concurrent.impl.Promise;
 
 import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
@@ -51,13 +41,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class NetworkEnvironmentTest {
 
@@ -151,85 +134,4 @@ public class NetworkEnvironmentTest {
 			fail(e.getMessage());
 		}
 	}
-
-
-	/**
-	 * Registers a task with an eager and non-eager partition at the network
-	 * environment and verifies that there is exactly on schedule or update
-	 * message to the job manager for the eager partition.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testEagerlyDeployConsumers() throws Exception {
-		// Mock job manager => expected interactions will be verified
-		ActorGateway jobManager = mock(ActorGateway.class);
-		when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
-				.thenReturn(new Promise.DefaultPromise<>().future());
-
-		// Network environment setup
-		NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration(
-				20,
-				1024,
-				MemoryType.HEAP,
-				IOManager.IOMode.SYNC,
-				Some.<NettyConfig>empty(),
-				new Tuple2<>(0, 0));
-
-		NetworkEnvironment env = new NetworkEnvironment(
-				TestingUtils.defaultExecutionContext(),
-				new FiniteDuration(30, TimeUnit.SECONDS),
-				config);
-
-		// Associate the environment with the mock actors
-		env.associateWithTaskManagerAndJobManager(
-				jobManager,
-				DummyActorGateway.INSTANCE);
-
-		// Register mock task
-		JobID jobId = new JobID();
-
-		ResultPartition[] partitions = new ResultPartition[2];
-		partitions[0] = createPartition("p1", jobId, true, env);
-		partitions[1] = createPartition("p2", jobId, false, env);
-
-		ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
-		writers[0] = new ResultPartitionWriter(partitions[0]);
-		writers[1] = new ResultPartitionWriter(partitions[1]);
-
-		Task mockTask = mock(Task.class);
-		when(mockTask.getAllInputGates()).thenReturn(new SingleInputGate[0]);
-		when(mockTask.getAllWriters()).thenReturn(writers);
-		when(mockTask.getProducedPartitions()).thenReturn(partitions);
-
-		env.registerTask(mockTask);
-
-		// Verify
-		ResultPartitionID eagerPartitionId = partitions[0].getPartitionId();
-
-		verify(jobManager, times(1)).ask(
-				eq(new ScheduleOrUpdateConsumers(jobId, eagerPartitionId)),
-				any(FiniteDuration.class));
-	}
-
-	/**
-	 * Helper to create a mock result partition.
-	 */
-	private static ResultPartition createPartition(
-			String name,
-			JobID jobId,
-			boolean eagerlyDeployConsumers,
-			NetworkEnvironment env) {
-
-		return new ResultPartition(
-				name,
-				jobId,
-				new ResultPartitionID(),
-				ResultPartitionType.PIPELINED,
-				eagerlyDeployConsumers,
-				1,
-				env.getPartitionManager(),
-				env.getPartitionConsumableNotifier(),
-				mock(IOManager.class),
-				env.getDefaultIOMode());
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f91a4ba..88a3ff5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -119,8 +119,7 @@ public class LocalInputChannelTest {
 					jobId,
 					partitionIds[i],
 					ResultPartitionType.PIPELINED,
-					false,
-					parallelism,
+				parallelism,
 					partitionManager,
 					partitionConsumableNotifier,
 					ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index c3ba909..4f2d807 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import java.io.IOException;
-
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.InitializeOnMaster;
@@ -29,11 +27,16 @@ import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class JobTaskVertexTest {
@@ -131,35 +134,6 @@ public class JobTaskVertexTest {
 		}
 	}
 
-	/**
-	 * Verifies correct setting of eager deploy settings.
-	 */
-	@Test
-	public void testEagerlyDeployConsumers() throws Exception {
-		JobVertex producer = new JobVertex("producer");
-
-		{
-			JobVertex consumer = new JobVertex("consumer");
-			JobEdge edge = consumer.connectNewDataSetAsInput(
-					producer, DistributionPattern.ALL_TO_ALL);
-			assertFalse(edge.getSource().getEagerlyDeployConsumers());
-		}
-
-		{
-			JobVertex consumer = new JobVertex("consumer");
-			JobEdge edge = consumer.connectNewDataSetAsInput(
-					producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-			assertFalse(edge.getSource().getEagerlyDeployConsumers());
-		}
-
-		{
-			JobVertex consumer = new JobVertex("consumer");
-			JobEdge edge = consumer.connectNewDataSetAsInput(
-					producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED, true);
-			assertTrue(edge.getSource().getEagerlyDeployConsumers());
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 	
 	private static final class TestingOutputFormat extends DiscardingOutputFormat<Object> implements InitializeOnMaster {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 53117d0..779a17d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -630,7 +630,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -775,7 +775,7 @@ public class TaskManagerTest extends TestLogger {
 				IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
 
 				List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
-				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, false));
+				irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1));
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
@@ -1430,8 +1430,8 @@ public class TaskManagerTest extends TestLogger {
 				new IntermediateDataSetID(),
 				new IntermediateResultPartitionID(),
 				ResultPartitionType.PIPELINED,
-				1,
-				false // don't deploy eagerly but with the first completed memory buffer
+				1
+				// don't deploy eagerly but with the first completed memory buffer
 			);
 
 			final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/b5a4cb6c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a63b089..20f5981 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -371,20 +371,17 @@ public class StreamingJobGraphGenerator {
 			downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED,
-				true);
+				ResultPartitionType.PIPELINED);
 		} else if (partitioner instanceof RescalePartitioner){
 			downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED,
-				true);
+				ResultPartitionType.PIPELINED);
 		} else {
 			downStreamVertex.connectNewDataSetAsInput(
 					headVertex,
 					DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED,
-					true);
+					ResultPartitionType.PIPELINED);
 		}
 
 		if (LOG.isDebugEnabled()) {