You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 19:28:44 UTC

git commit: [FLINK-953] Remove fake tail from iterations

Repository: incubator-flink
Updated Branches:
  refs/heads/master 91cfbc5aa -> 786184917


[FLINK-953] Remove fake tail from iterations

This closes #124


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/78618491
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/78618491
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/78618491

Branch: refs/heads/master
Commit: 786184917d460fad014bb9125a07d7b07957e445
Parents: 91cfbc5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 14:55:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 18:41:49 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 42 +----------------
 .../runtime/iterative/io/FakeOutputTask.java    | 47 --------------------
 .../runtime/operators/RegularPactTask.java      |  6 +--
 .../KMeansIterativeNepheleITCase.java           |  7 ---
 .../ConnectedComponentsNepheleITCase.java       | 32 -------------
 .../IterationWithChainingNepheleITCase.java     |  8 ----
 .../test/iterative/nephele/JobGraphUtils.java   | 10 -----
 .../CustomCompensatableDanglingPageRank.java    |  6 ---
 ...mpensatableDanglingPageRankWithCombiner.java |  9 +---
 .../CompensatableDanglingPageRank.java          |  6 ---
 10 files changed, 5 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index a224324..5ed3432 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -56,7 +56,6 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.io.FakeOutputTask;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
@@ -1179,21 +1178,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		tailConfig.setIsWorksetUpdate();
 		
 		// No following termination criterion
-		if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {
+		if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
 			
 			rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
 			
 			tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
-			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			
-			// create the fake output task
-			AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
-			fakeTail.setInvokableClass(FakeOutputTask.class);
-			fakeTail.setParallelism(headVertex.getParallelism());
-			this.auxVertices.add(fakeTail);
-			
-			// connect the fake tail
-			fakeTail.connectNewDataSetAsInput(rootOfStepFunctionVertex, DistributionPattern.POINTWISE);
 		}
 		
 		
@@ -1222,15 +1211,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			// Hack
 			tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
 			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
-			tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			
-			AbstractJobVertex fakeTailTerminationCriterion = new AbstractJobVertex("Fake Tail for Termination Criterion");
-			fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
-			fakeTailTerminationCriterion.setParallelism(headVertex.getParallelism());
-			this.auxVertices.add(fakeTailTerminationCriterion);
-		
-			// connect the fake tail
-			fakeTailTerminationCriterion.connectNewDataSetAsInput(rootOfTerminationCriterionVertex, DistributionPattern.POINTWISE);
 			
 			// tell the head that it needs to wait for the solution set updates
 			headConfig.setWaitForSolutionSetUpdate();
@@ -1345,16 +1325,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
 					
 					worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
-					worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-					
-					// create the fake output task
-					AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
-					fakeTail.setInvokableClass(FakeOutputTask.class);
-					fakeTail.setParallelism(headVertex.getParallelism());
-					this.auxVertices.add(fakeTail);
-					
-					// connect the fake tail
-					fakeTail.connectNewDataSetAsInput(nextWorksetVertex, DistributionPattern.POINTWISE);
 				}
 			}
 			{
@@ -1379,16 +1349,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 					solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
 					
 					solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
-					solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-	
-					// create the fake output task
-					AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
-					fakeTail.setInvokableClass(FakeOutputTask.class);
-					fakeTail.setParallelism(headVertex.getParallelism());
-					this.auxVertices.add(fakeTail);
-					
-					// connect the fake tail
-					fakeTail.connectNewDataSetAsInput(solutionDeltaVertex, DistributionPattern.POINTWISE);
 					
 					// tell the head that it needs to wait for the solution set updates
 					headConfig.setWaitForSolutionSetUpdate();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
deleted file mode 100644
index 6fff2d4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.iterative.io;
-
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.types.Record;
-
-/**
- * Output task for the iteration tail
- */
-public class FakeOutputTask extends AbstractInvokable {
-
-	private MutableRecordReader<Record> reader;
-
-	private final Record record = new Record();
-
-	@Override
-	public void registerInputOutput() {
-		reader = new MutableRecordReader<Record>(this);
-	}
-
-	@Override
-	public void invoke() throws Exception {
-		// ensure that input is consumed, although this task should never see any records
-		while (reader.next(record)) {
-			throw new IllegalStateException("This task should not receive any data");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 62eafa4..aa48b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1241,10 +1241,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 * @return The OutputCollector that data produced in this task is submitted to.
 	 */
 	public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<BufferWriter> eventualOutputs, int numOutputs)
-	throws Exception
+			throws Exception
 	{
-		if (numOutputs <= 0) {
-			throw new Exception("BUG: The task must have at least one output");
+		if (numOutputs == 0) {
+			return null;
 		}
 
 		// get the factory for the serializer

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index edc6467..3408557 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -245,7 +245,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		tailConfig.setSpillingThresholdInput(0, 0.9f);
 		
 		// output
-		tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 		tailConfig.setOutputSerializer(outputSerializer);
 		
 		// the udf
@@ -284,8 +283,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		
 		AbstractJobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
 		
-		AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
-		
 		AbstractJobVertex sync = createSync(jobGraph, numIterations, numSubTasks);
 		
 		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
@@ -304,8 +301,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		JobGraphUtils.connect(mapper, reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
 		new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
 		
-		JobGraphUtils.connect(reducer, fakeTailOutput, ChannelType.NETWORK, DistributionPattern.POINTWISE);
-		
 		JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -319,13 +314,11 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		head.setSlotSharingGroup(sharingGroup);
 		mapper.setSlotSharingGroup(sharingGroup);
 		reducer.setSlotSharingGroup(sharingGroup);
-		fakeTailOutput.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		
 		mapper.setStrictlyCoLocatedWith(head);
 		reducer.setStrictlyCoLocatedWith(head);
-		fakeTailOutput.setStrictlyCoLocatedWith(reducer);
 
 		return jobGraph;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8cf2c69..584bba4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -349,10 +349,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		return output;
 	}
 
-	private static AbstractJobVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
-		return JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
-	}
-
 	private static AbstractJobVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
 		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -391,7 +387,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
 
 		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
 		AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// --------------- the tail (solution set join) ---------------
@@ -411,7 +406,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			tailConfig.setInputSerializer(serializer, 0);
 
 			// output
-			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			tailConfig.setOutputSerializer(serializer);
 
 			// the driver
@@ -435,7 +429,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
@@ -447,11 +440,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		tail.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
-		fakeTail.setSlotSharingGroup(sharingGroup);
 		
 		intermediate.setStrictlyCoLocatedWith(head);
 		tail.setStrictlyCoLocatedWith(head);
-		fakeTail.setStrictlyCoLocatedWith(tail);
 
 		return jobGraph;
 	}
@@ -483,8 +474,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// output and auxiliaries
 		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		AbstractJobVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
-		AbstractJobVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
 		AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss join) ----------------------
@@ -532,7 +521,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
 
 			// output
-			ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			ssTailConfig.setOutputSerializer(serializer);
 
 			// the driver
@@ -555,7 +543,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			wsTailConfig.setInputSerializer(serializer, 0);
 
 			// output
-			wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			wsTailConfig.setOutputSerializer(serializer);
 
 			// the driver
@@ -584,9 +571,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
 		SlotSharingGroup sharingGroup = new SlotSharingGroup();
@@ -599,15 +583,11 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		ssTail.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
-		wsFakeTail.setSlotSharingGroup(sharingGroup);
-		ssFakeTail.setSlotSharingGroup(sharingGroup);
 		
 		intermediate.setStrictlyCoLocatedWith(head);
 		ssJoinIntermediate.setStrictlyCoLocatedWith(head);
 		wsTail.setStrictlyCoLocatedWith(head);
 		ssTail.setStrictlyCoLocatedWith(head);
-		wsFakeTail.setStrictlyCoLocatedWith(wsTail);
-		ssFakeTail.setStrictlyCoLocatedWith(ssTail);
 
 		return jobGraph;
 	}
@@ -639,7 +619,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// output and auxiliaries
 		AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
 		AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ws update) ----------------------
@@ -685,7 +664,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			ssTailConfig.setInputSerializer(serializer, 0);
 
 			// output
-			ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			ssTailConfig.setOutputSerializer(serializer);
 
 			// the driver
@@ -712,8 +690,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(ssTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
 		SlotSharingGroup sharingGroup = new SlotSharingGroup();
@@ -725,12 +701,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		ssTail.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
-		fakeTail.setSlotSharingGroup(sharingGroup);
 
 		intermediate.setStrictlyCoLocatedWith(head);
 		wsUpdateIntermediate.setStrictlyCoLocatedWith(head);
 		ssTail.setStrictlyCoLocatedWith(head);
-		fakeTail.setStrictlyCoLocatedWith(ssTail);
 
 		return jobGraph;
 	}
@@ -764,7 +738,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		// output and auxiliaries
 		AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
 		AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
 
 		// ------------------ the intermediate (ss update) ----------------------
@@ -808,7 +781,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 			wsTailConfig.setInputSerializer(serializer, 0);
 
 			// output
-			wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			wsTailConfig.setOutputSerializer(serializer);
 
 			// the driver
@@ -834,8 +806,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(wsTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
 		
@@ -848,12 +818,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
 		wsTail.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
-		fakeTail.setSlotSharingGroup(sharingGroup);
 
 		intermediate.setStrictlyCoLocatedWith(head);
 		ssJoinIntermediate.setStrictlyCoLocatedWith(head);
 		wsTail.setStrictlyCoLocatedWith(head);
-		fakeTail.setStrictlyCoLocatedWith(wsTail);
 
 		return jobGraph;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index aa939ed..bab6153 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -201,7 +201,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 			chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
 			chainedMapperConfig.setInputSerializer(serializer, 0);
 
-			chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			chainedMapperConfig.setOutputSerializer(serializer);
 
 			chainedMapperConfig.setIsWorksetUpdate();
@@ -220,9 +219,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 			outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
 		}
 
-		// - fake tail -------------------------------------------------------------------------------------------------
-		AbstractJobVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
-
 		// - sync ------------------------------------------------------------------------------------------------------
 		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -241,8 +237,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 
-		JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
 		// --------------------------------------------------------------------------------------------------------------
 		// 3. INSTANCE SHARING
 		// --------------------------------------------------------------------------------------------------------------
@@ -252,12 +246,10 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
 		input.setSlotSharingGroup(sharingGroup);
 		head.setSlotSharingGroup(sharingGroup);
 		tail.setSlotSharingGroup(sharingGroup);
-		fakeTail.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
 		
 		tail.setStrictlyCoLocatedWith(head);
-		fakeTail.setStrictlyCoLocatedWith(tail);
 
 		return jobGraph;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 1734a15..9e858d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.iterative.io.FakeOutputTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -110,15 +109,6 @@ public class JobGraphUtils {
 		return sync;
 	}
 
-	public static AbstractJobVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) {
-		AbstractJobVertex outputVertex = new AbstractJobVertex(name);
-		jobGraph.addVertex(outputVertex);
-		
-		outputVertex.setInvokableClass(FakeOutputTask.class);
-		outputVertex.setParallelism(degreeOfParallelism);
-		return outputVertex;
-	}
-
 	public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
 		OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
 		jobGraph.addVertex(sinkVertex);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 662805e..4d25591 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -253,7 +253,6 @@ public class CustomCompensatableDanglingPageRank {
 		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
 		
 		// output
-		tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 		tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
 		
 		// the stub
@@ -274,8 +273,6 @@ public class CustomCompensatableDanglingPageRank {
 		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
 		
 		// --------------- the auxiliaries ---------------------
-		
-		AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
 
 		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -299,7 +296,6 @@ public class CustomCompensatableDanglingPageRank {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		
@@ -309,11 +305,9 @@ public class CustomCompensatableDanglingPageRank {
 		head.setSlotSharingGroup(sharingGroup);
 		intermediate.setSlotSharingGroup(sharingGroup);
 		tail.setSlotSharingGroup(sharingGroup);
-		fakeTailOutput.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
 		
-		fakeTailOutput.setStrictlyCoLocatedWith(tail);
 		tail.setStrictlyCoLocatedWith(head);
 		intermediate.setStrictlyCoLocatedWith(head);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 072db21..fe6c0e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -266,7 +266,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
 		
 		// output
-		tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 		tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
 		
 		// the stub
@@ -287,9 +286,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
 		
 		// --------------- the auxiliaries ---------------------
-		
-		AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
-			degreeOfParallelism);
 
 		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -313,7 +309,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		
@@ -323,11 +318,9 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		head.setSlotSharingGroup(sharingGroup);
 		intermediate.setSlotSharingGroup(sharingGroup);
 		tail.setSlotSharingGroup(sharingGroup);
-		fakeTailOutput.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
-		
-		fakeTailOutput.setStrictlyCoLocatedWith(tail);
+
 		tail.setStrictlyCoLocatedWith(head);
 		intermediate.setStrictlyCoLocatedWith(head);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index 269378b..88120c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -233,7 +233,6 @@ public class CompensatableDanglingPageRank {
 		tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
 		
 		// output
-		tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 		tailConfig.setOutputSerializer(recSerializer);
 		
 		// the stub
@@ -254,8 +253,6 @@ public class CompensatableDanglingPageRank {
 		outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
 		
 		// --------------- the auxiliaries ---------------------
-		
-		AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
 
 		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -279,7 +276,6 @@ public class CompensatableDanglingPageRank {
 		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
 
 		JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-		JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 
 		JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
 		
@@ -289,11 +285,9 @@ public class CompensatableDanglingPageRank {
 		head.setSlotSharingGroup(sharingGroup);
 		intermediate.setSlotSharingGroup(sharingGroup);
 		tail.setSlotSharingGroup(sharingGroup);
-		fakeTailOutput.setSlotSharingGroup(sharingGroup);
 		output.setSlotSharingGroup(sharingGroup);
 		sync.setSlotSharingGroup(sharingGroup);
 		
-		fakeTailOutput.setStrictlyCoLocatedWith(tail);
 		tail.setStrictlyCoLocatedWith(head);
 		intermediate.setStrictlyCoLocatedWith(head);