You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/04/27 10:28:05 UTC

flink git commit: [FLINK-1930] Separate output buffer pool and result partition life cycle

Repository: flink
Updated Branches:
  refs/heads/master 8791d4f02 -> 88638de75


[FLINK-1930] Separate output buffer pool and result partition life cycle

The problem: when a pipelined result is only consumed partially, the buffer pool
associated with the result partition will be destroyed too early. If there is a
pipelined producer online, which is still producing data for this partition, it
will run into an IllegalStateException.

The solution: by separating the life-cycle of the result partition and the
associated buffer pool this cannot happen anymore. The result buffer pool is
only destroyed after the producing task is finished, which is independent of
the state of the result partition.

Furthermore, this commit squashes the following commits:

- [FLINK-1930] [tests] Add test for FLINK-1930
- [tests] Move iterative tests to correct package

This closes #624.

Close the following unrelated PRs:

This closes #112.
This closes #134.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88638de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88638de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88638de7

Branch: refs/heads/master
Commit: 88638de75bab461138e74fd2193223c5a3bc768c
Parents: 8791d4f
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Apr 24 16:58:27 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Apr 27 10:25:07 2015 +0200

----------------------------------------------------------------------
 .../runtime/execution/RuntimeEnvironment.java   |   4 +-
 .../runtime/io/network/NetworkEnvironment.java  |  31 +---
 .../api/writer/ResultPartitionWriter.java       |  11 --
 .../io/network/partition/ResultPartition.java   |  40 +++--
 .../partition/consumer/SingleInputGate.java     |  17 +-
 .../PartialConsumePipelinedResultTest.java      | 155 +++++++++++++++++++
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../partition/consumer/SingleInputGateTest.java |   7 +-
 .../partition/consumer/TestSingleInputGate.java |   5 +-
 .../partition/consumer/UnionInputGateTest.java  |   7 +-
 ...nIncompleteDynamicPathConsumptionITCase.java |  92 +++++++++++
 ...onIncompleteStaticPathConsumptionITCase.java |  92 +++++++++++
 ...nIncompleteDynamicPathConsumptionITCase.java |  92 -----------
 ...onIncompleteStaticPathConsumptionITCase.java |  92 -----------
 14 files changed, 407 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index d567156..081d4bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -130,6 +130,7 @@ public class RuntimeEnvironment implements Environment, Runnable {
 				ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), owner.getExecutionId());
 
 				this.producedPartitions[i] = new ResultPartition(
+						this,
 						owner.getJobID(),
 						partitionId,
 						desc.getPartitionType(),
@@ -148,7 +149,8 @@ public class RuntimeEnvironment implements Environment, Runnable {
 			this.inputGates = new SingleInputGate[consumedPartitions.size()];
 
 			for (int i = 0; i < inputGates.length; i++) {
-				inputGates[i] = SingleInputGate.create(consumedPartitions.get(i), networkEnvironment);
+				inputGates[i] = SingleInputGate.create(
+						this, consumedPartitions.get(i), networkEnvironment);
 
 				// The input gates are organized by key for task updates/channel updates at runtime
 				inputGatesById.put(inputGates[i].getConsumedResultId(), inputGates[i]);

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index dbf1586..af55ebf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -337,6 +337,13 @@ public class NetworkEnvironment {
 				}
 			}
 
+			ResultPartition[] partitions = task.getProducedPartitions();
+			if (partitions != null) {
+				for (ResultPartition partition : partitions) {
+					partition.destroyBufferPool();
+				}
+			}
+
 			final SingleInputGate[] inputGates = task.getInputGates();
 
 			if (inputGates != null) {
@@ -354,30 +361,6 @@ public class NetworkEnvironment {
 		}
 	}
 
-	public boolean hasReleasedAllResources() {
-		String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
-				networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(),
-				networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(),
-				taskEventDispatcher.getNumberOfRegisteredWriters());
-
-		boolean success = networkBufferPool.getTotalNumberOfMemorySegments() == networkBufferPool.getNumberOfAvailableMemorySegments() &&
-				networkBufferPool.getNumberOfRegisteredBufferPools() == 0 &&
-				connectionManager.getNumberOfActiveConnections() == 0 &&
-				taskEventDispatcher.getNumberOfRegisteredWriters() == 0;
-
-		if (success) {
-			String successMsg = "Network environment did release all resources: " + msg;
-			LOG.debug(successMsg);
-		}
-		else {
-			String errMsg = "Network environment did *not* release all resources: " + msg;
-
-			LOG.error(errMsg);
-		}
-
-		return success;
-	}
-
 	/**
 	 * Tries to shut down all network I/O components.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index ac28519..1192dbb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -36,9 +36,6 @@ import java.io.IOException;
  * <p>
  * The {@link ResultPartitionWriter} is the runtime API for producing results. It
  * supports two kinds of data to be sent: buffers and events.
- * <p>
- * <strong>Important</strong>: When working directly with this API, it is
- * necessary to call {@link #finish()} after all data has been produced.
  */
 public final class ResultPartitionWriter implements EventListener<TaskEvent> {
 
@@ -92,18 +89,10 @@ public final class ResultPartitionWriter implements EventListener<TaskEvent> {
 		}
 	}
 
-	public void finish() throws IOException, InterruptedException {
-		partition.finish();
-	}
-
 	// ------------------------------------------------------------------------
 	// Event handling
 	// ------------------------------------------------------------------------
 
-	public TaskEventHandler getTaskEventHandler() {
-		return taskEventHandler;
-	}
-
 	public void subscribeToEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
 		taskEventHandler.subscribe(eventListener, eventType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 88020a4..f06c8fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -76,6 +77,9 @@ public class ResultPartition implements BufferPoolOwner {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartition.class);
 
+	/** The owning environment. Mainly for debug purposes. */
+	private final Environment owner;
+
 	private final JobID jobId;
 
 	private final ResultPartitionID partitionId;
@@ -116,6 +120,7 @@ public class ResultPartition implements BufferPoolOwner {
 	private long totalNumberOfBytes;
 
 	public ResultPartition(
+			Environment owner,
 			JobID jobId,
 			ResultPartitionID partitionId,
 			ResultPartitionType partitionType,
@@ -125,6 +130,7 @@ public class ResultPartition implements BufferPoolOwner {
 			IOManager ioManager,
 			IOMode defaultIoMode) {
 
+		this.owner = checkNotNull(owner);
 		this.jobId = checkNotNull(jobId);
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
@@ -156,7 +162,7 @@ public class ResultPartition implements BufferPoolOwner {
 		// Initially, partitions should be consumed once before release.
 		pin();
 
-		LOG.debug("Initialized {}", this);
+		LOG.debug("{}: Initialized {}", owner.getTaskNameWithSubtasks(), this);
 	}
 
 	/**
@@ -275,29 +281,29 @@ public class ResultPartition implements BufferPoolOwner {
 	 */
 	public void release() {
 		if (isReleased.compareAndSet(false, true)) {
-			LOG.debug("Releasing {}", this);
-
-			try {
-				for (ResultSubpartition subpartition : subpartitions) {
-					try {
-						synchronized (subpartition) {
-							subpartition.release();
-						}
-					}
-					// Catch this in order to ensure that release is called on all subpartitions
-					catch (Throwable t) {
-						LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
+			LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this);
+
+			// Release all subpartitions
+			for (ResultSubpartition subpartition : subpartitions) {
+				try {
+					synchronized (subpartition) {
+						subpartition.release();
 					}
 				}
-			}
-			finally {
-				if (bufferPool != null) {
-					bufferPool.lazyDestroy();
+				// Catch this in order to ensure that release is called on all subpartitions
+				catch (Throwable t) {
+					LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
 				}
 			}
 		}
 	}
 
+	public void destroyBufferPool() {
+		if (bufferPool != null) {
+			bufferPool.lazyDestroy();
+		}
+	}
+
 	/**
 	 * Returns the requested subpartition.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 867a4b9..b0d138a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
@@ -100,6 +101,9 @@ public class SingleInputGate implements InputGate {
 	/** Lock object to guard partition requests and runtime channel updates. */
 	private final Object requestLock = new Object();
 
+	/** The owning environment. Mainly for debug purposes. */
+	private final Environment owner;
+
 	/**
 	 * The ID of the consumed intermediate result. Each input gate consumes partitions of the
 	 * intermediate result specified by this ID. This ID also identifies the input gate at the
@@ -148,7 +152,13 @@ public class SingleInputGate implements InputGate {
 
 	private int numberOfUninitializedChannels;
 
-	public SingleInputGate(IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels) {
+	public SingleInputGate(
+			Environment owner,
+			IntermediateDataSetID consumedResultId,
+			int consumedSubpartitionIndex,
+			int numberOfInputChannels) {
+
+		this.owner = checkNotNull(owner);
 		this.consumedResultId = checkNotNull(consumedResultId);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
@@ -255,6 +265,8 @@ public class SingleInputGate implements InputGate {
 		synchronized (requestLock) {
 			if (!isReleased) {
 				try {
+					LOG.debug("{}: Releasing {}.", owner.getTaskNameWithSubtasks(), this);
+
 					for (InputChannel inputChannel : inputChannels.values()) {
 						try {
 							inputChannel.releaseAllResources();
@@ -398,6 +410,7 @@ public class SingleInputGate implements InputGate {
 	 * Creates an input gate and all of its input channels.
 	 */
 	public static SingleInputGate create(
+			Environment owner,
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment) {
 
@@ -409,7 +422,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-				consumedResultId, consumedSubpartitionIndex, icdd.length);
+				owner, consumedResultId, consumedSubpartitionIndex, icdd.length);
 
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
new file mode 100644
index 0000000..821826a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class PartialConsumePipelinedResultTest {
+
+	// Test configuration
+	private final static int NUMBER_OF_TMS = 1;
+	private final static int NUMBER_OF_SLOTS_PER_TM = 1;
+	private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM;
+
+	private final static int NUMBER_OF_NETWORK_BUFFERS = 128;
+
+	private static TestingCluster flink;
+	private static ActorSystem jobClient;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		final Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS);
+
+		flink = new TestingCluster(config, true);
+
+		jobClient = JobClient.startJobClientActorSystem(flink.configuration());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		flink.stop();
+	}
+
+	/**
+	 * Tests a fix for FLINK-1930.
+	 *
+	 * <p> When consuming a pipelined result only partially, is is possible that local channels
+	 * release the buffer pool, which is associated with the result partition, too early.  If the
+	 * producer is still producing data when this happens, it runs into an IllegalStateException,
+	 * because of the destroyed buffer pool.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-1930">FLINK-1930</a>
+	 */
+	@Test
+	public void testPartialConsumePipelinedResultReceiver() throws Exception {
+		final AbstractJobVertex sender = new AbstractJobVertex("Sender");
+		sender.setInvokableClass(SlowBufferSender.class);
+		sender.setParallelism(PARALLELISM);
+
+		final AbstractJobVertex receiver = new AbstractJobVertex("Receiver");
+		receiver.setInvokableClass(SingleBufferReceiver.class);
+		receiver.setParallelism(PARALLELISM);
+
+		// The partition needs to be pipelined, otherwise the original issue does not occur, because
+		// the sender and receiver are not online at the same time.
+		receiver.connectNewDataSetAsInput(
+				sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+		final JobGraph jobGraph = new JobGraph(
+				"Partial Consume of Pipelined Result", sender, receiver);
+
+		final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(
+				sender.getID(), receiver.getID());
+
+		sender.setSlotSharingGroup(slotSharingGroup);
+		receiver.setSlotSharingGroup(slotSharingGroup);
+
+		JobClient.submitJobAndWait(
+				jobClient,
+				flink.getJobManager(),
+				jobGraph,
+				TestingUtils.TESTING_DURATION(),
+				false);
+	}
+
+	// ---------------------------------------------------------------------------------------------
+
+	/**
+	 * Sends a fixed number of buffers and sleeps in-between sends.
+	 */
+	public static class SlowBufferSender extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+			// Nothing to do
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
+
+			for (int i = 0; i < 8; i++) {
+				final Buffer buffer = writer.getBufferProvider().requestBufferBlocking();
+				writer.writeBuffer(buffer, 0);
+
+				Thread.sleep(50);
+			}
+		}
+	}
+
+	/**
+	 * Reads a single buffer and recycles it.
+	 */
+	public static class SingleBufferReceiver extends AbstractInvokable {
+
+		@Override
+		public void registerInputOutput() {
+			// Nothing to do
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final BufferReader reader = new BufferReader(getEnvironment().getInputGate(0));
+
+			final Buffer buffer = reader.getNextBuffer();
+
+			buffer.recycle();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 32fe1a7..d13db05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -92,6 +93,7 @@ public class LocalInputChannelTest {
 			partitionIds[i] = new ResultPartitionID();
 
 			final ResultPartition partition = new ResultPartition(
+					mock(Environment.class),
 					jobId,
 					partitionIds[i],
 					ResultPartitionType.PIPELINED,
@@ -220,7 +222,10 @@ public class LocalInputChannelTest {
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
 			this.inputGate = new SingleInputGate(
-					new IntermediateDataSetID(), subpartitionIndex, numberOfInputChannels);
+					mock(Environment.class),
+					new IntermediateDataSetID(),
+					subpartitionIndex,
+					numberOfInputChannels);
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 66eeee0..be66aeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -58,7 +60,8 @@ public class SingleInputGateTest {
 	@Test(timeout = 120 * 1000)
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
-		final SingleInputGate inputGate = new SingleInputGate(new IntermediateDataSetID(), 0, 2);
+		final SingleInputGate inputGate = new SingleInputGate(
+				mock(Environment.class), new IntermediateDataSetID(), 0, 2);
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -104,7 +107,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate(resultId, 0, 2);
+		final SingleInputGate inputGate = new SingleInputGate(mock(Environment.class), resultId, 0, 2);
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 5033b3d..efc02de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -29,6 +30,7 @@ import java.util.List;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 /**
@@ -47,7 +49,8 @@ public class TestSingleInputGate {
 	public TestSingleInputGate(int numberOfInputChannels, boolean initialize) {
 		checkArgument(numberOfInputChannels >= 1);
 
-		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+		this.inputGate = spy(new SingleInputGate(
+				mock(Environment.class), new IntermediateDataSetID(), 0, numberOfInputChannels));
 
 		this.inputChannels = new TestInputChannel[numberOfInputChannels];
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 18e56ba..90132ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class UnionInputGateTest {
 
@@ -37,8 +39,9 @@ public class UnionInputGateTest {
 	@Test(timeout = 120 * 1000)
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
-		final SingleInputGate ig1 = new SingleInputGate(new IntermediateDataSetID(), 0, 3);
-		final SingleInputGate ig2 = new SingleInputGate(new IntermediateDataSetID(), 0, 5);
+		final Environment env = mock(Environment.class);
+		final SingleInputGate ig1 = new SingleInputGate(env, new IntermediateDataSetID(), 0, 3);
+		final SingleInputGate ig2 = new SingleInputGate(env, new IntermediateDataSetID(), 0, 5);
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
new file mode 100644
index 0000000..48788a5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteDynamicPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// the test data is constructed such that the merge join zig zag
+		// has an early out, leaving elements on the dynamic path input unconsumed
+		
+		DataSet<Path> edges = env.fromElements(
+				new Path(1, 2),
+				new Path(1, 4),
+				new Path(3, 6),
+				new Path(3, 8),
+				new Path(1, 10),
+				new Path(1, 12),
+				new Path(3, 14),
+				new Path(3, 16),
+				new Path(1, 18),
+				new Path(1, 20) );
+		
+		IterativeDataSet<Path> currentPaths = edges.iterate(10);
+		
+		DataSet<Path> newPaths = currentPaths
+				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+					.with(new PathConnector())
+				.union(currentPaths).distinct("from", "to");
+		
+		DataSet<Path> result = currentPaths.closeWith(newPaths);
+		
+		result.output(new DiscardingOutputFormat<Path>());
+		
+		env.execute();
+	}
+	
+	private static class PathConnector implements JoinFunction<Path, Path, Path> {
+		
+		@Override
+		public Path join(Path path, Path edge)  {
+			return new Path(path.from, edge.to);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Path {
+		
+		public long from;
+		public long to;
+		
+		public Path() {}
+		
+		public Path(long from, long to) {
+			this.from = from;
+			this.to = to;
+		}
+		
+		@Override
+		public String toString() {
+			return "(" + from + "," + to + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
new file mode 100644
index 0000000..b42e86b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/IterationIncompleteStaticPathConsumptionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.iterative;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
+	
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	
+		// the test data is constructed such that the merge join zig zag
+		// has an early out, leaving elements on the static path input unconsumed
+		
+		DataSet<Path> edges = env.fromElements(
+				new Path(2, 1),
+				new Path(4, 1),
+				new Path(6, 3),
+				new Path(8, 3),
+				new Path(10, 1),
+				new Path(12, 1),
+				new Path(14, 3),
+				new Path(16, 3),
+				new Path(18, 1),
+				new Path(20, 1) );
+		
+		IterativeDataSet<Path> currentPaths = edges.iterate(10);
+		
+		DataSet<Path> newPaths = currentPaths
+				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
+					.with(new PathConnector())
+				.union(currentPaths).distinct("from", "to");
+		
+		DataSet<Path> result = currentPaths.closeWith(newPaths);
+		
+		result.output(new DiscardingOutputFormat<Path>());
+		
+		env.execute();
+	}
+	
+	private static class PathConnector implements JoinFunction<Path, Path, Path> {
+		
+		@Override
+		public Path join(Path path, Path edge)  {
+			return new Path(path.from, edge.to);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Path {
+		
+		public long from;
+		public long to;
+		
+		public Path() {}
+		
+		public Path(long from, long to) {
+			this.from = from;
+			this.to = to;
+		}
+		
+		@Override
+		public String toString() {
+			return "(" + from + "," + to + ")";
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
deleted file mode 100644
index 8fa9a91..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteDynamicPathConsumptionITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class IterationIncompleteDynamicPathConsumptionITCase extends JavaProgramTestBase {
-	
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// the test data is constructed such that the merge join zig zag
-		// has an early out, leaving elements on the dynamic path input unconsumed
-		
-		DataSet<Path> edges = env.fromElements(
-				new Path(1, 2),
-				new Path(1, 4),
-				new Path(3, 6),
-				new Path(3, 8),
-				new Path(1, 10),
-				new Path(1, 12),
-				new Path(3, 14),
-				new Path(3, 16),
-				new Path(1, 18),
-				new Path(1, 20) );
-		
-		IterativeDataSet<Path> currentPaths = edges.iterate(10);
-		
-		DataSet<Path> newPaths = currentPaths
-				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
-					.with(new PathConnector())
-				.union(currentPaths).distinct("from", "to");
-		
-		DataSet<Path> result = currentPaths.closeWith(newPaths);
-		
-		result.output(new DiscardingOutputFormat<Path>());
-		
-		env.execute();
-	}
-	
-	private static class PathConnector implements JoinFunction<Path, Path, Path> {
-		
-		@Override
-		public Path join(Path path, Path edge)  {
-			return new Path(path.from, edge.to);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public static class Path {
-		
-		public long from;
-		public long to;
-		
-		public Path() {}
-		
-		public Path(long from, long to) {
-			this.from = from;
-			this.to = to;
-		}
-		
-		@Override
-		public String toString() {
-			return "(" + from + "," + to + ")";
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88638de7/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
deleted file mode 100644
index 24cba20..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/IterationIncompleteStaticPathConsumptionITCase.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class IterationIncompleteStaticPathConsumptionITCase extends JavaProgramTestBase {
-	
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-	
-		// the test data is constructed such that the merge join zig zag
-		// has an early out, leaving elements on the static path input unconsumed
-		
-		DataSet<Path> edges = env.fromElements(
-				new Path(2, 1),
-				new Path(4, 1),
-				new Path(6, 3),
-				new Path(8, 3),
-				new Path(10, 1),
-				new Path(12, 1),
-				new Path(14, 3),
-				new Path(16, 3),
-				new Path(18, 1),
-				new Path(20, 1) );
-		
-		IterativeDataSet<Path> currentPaths = edges.iterate(10);
-		
-		DataSet<Path> newPaths = currentPaths
-				.join(edges, JoinHint.REPARTITION_SORT_MERGE).where("to").equalTo("from")
-					.with(new PathConnector())
-				.union(currentPaths).distinct("from", "to");
-		
-		DataSet<Path> result = currentPaths.closeWith(newPaths);
-		
-		result.output(new DiscardingOutputFormat<Path>());
-		
-		env.execute();
-	}
-	
-	private static class PathConnector implements JoinFunction<Path, Path, Path> {
-		
-		@Override
-		public Path join(Path path, Path edge)  {
-			return new Path(path.from, edge.to);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public static class Path {
-		
-		public long from;
-		public long to;
-		
-		public Path() {}
-		
-		public Path(long from, long to) {
-			this.from = from;
-			this.to = to;
-		}
-		
-		@Override
-		public String toString() {
-			return "(" + from + "," + to + ")";
-		}
-	}
-}