You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 19:28:44 UTC
git commit: [FLINK-953] Remove fake tail from iterations
Repository: incubator-flink
Updated Branches:
refs/heads/master 91cfbc5aa -> 786184917
[FLINK-953] Remove fake tail from iterations
This closes #124
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/78618491
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/78618491
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/78618491
Branch: refs/heads/master
Commit: 786184917d460fad014bb9125a07d7b07957e445
Parents: 91cfbc5
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 17 14:55:24 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Sep 21 18:41:49 2014 +0200
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 42 +----------------
.../runtime/iterative/io/FakeOutputTask.java | 47 --------------------
.../runtime/operators/RegularPactTask.java | 6 +--
.../KMeansIterativeNepheleITCase.java | 7 ---
.../ConnectedComponentsNepheleITCase.java | 32 -------------
.../IterationWithChainingNepheleITCase.java | 8 ----
.../test/iterative/nephele/JobGraphUtils.java | 10 -----
.../CustomCompensatableDanglingPageRank.java | 6 ---
...mpensatableDanglingPageRankWithCombiner.java | 9 +---
.../CompensatableDanglingPageRank.java | 6 ---
10 files changed, 5 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index a224324..5ed3432 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -56,7 +56,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.io.FakeOutputTask;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
@@ -1179,21 +1178,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
tailConfig.setIsWorksetUpdate();
// No following termination criterion
- if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {
+ if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // create the fake output task
- AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
- fakeTail.setInvokableClass(FakeOutputTask.class);
- fakeTail.setParallelism(headVertex.getParallelism());
- this.auxVertices.add(fakeTail);
-
- // connect the fake tail
- fakeTail.connectNewDataSetAsInput(rootOfStepFunctionVertex, DistributionPattern.POINTWISE);
}
@@ -1222,15 +1211,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
- tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- AbstractJobVertex fakeTailTerminationCriterion = new AbstractJobVertex("Fake Tail for Termination Criterion");
- fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
- fakeTailTerminationCriterion.setParallelism(headVertex.getParallelism());
- this.auxVertices.add(fakeTailTerminationCriterion);
-
- // connect the fake tail
- fakeTailTerminationCriterion.connectNewDataSetAsInput(rootOfTerminationCriterionVertex, DistributionPattern.POINTWISE);
// tell the head that it needs to wait for the solution set updates
headConfig.setWaitForSolutionSetUpdate();
@@ -1345,16 +1325,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
- worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // create the fake output task
- AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
- fakeTail.setInvokableClass(FakeOutputTask.class);
- fakeTail.setParallelism(headVertex.getParallelism());
- this.auxVertices.add(fakeTail);
-
- // connect the fake tail
- fakeTail.connectNewDataSetAsInput(nextWorksetVertex, DistributionPattern.POINTWISE);
}
}
{
@@ -1379,16 +1349,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
- solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- // create the fake output task
- AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
- fakeTail.setInvokableClass(FakeOutputTask.class);
- fakeTail.setParallelism(headVertex.getParallelism());
- this.auxVertices.add(fakeTail);
-
- // connect the fake tail
- fakeTail.connectNewDataSetAsInput(solutionDeltaVertex, DistributionPattern.POINTWISE);
// tell the head that it needs to wait for the solution set updates
headConfig.setWaitForSolutionSetUpdate();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
deleted file mode 100644
index 6fff2d4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/FakeOutputTask.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.iterative.io;
-
-import org.apache.flink.runtime.io.network.api.MutableRecordReader;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.types.Record;
-
-/**
- * Output task for the iteration tail
- */
-public class FakeOutputTask extends AbstractInvokable {
-
- private MutableRecordReader<Record> reader;
-
- private final Record record = new Record();
-
- @Override
- public void registerInputOutput() {
- reader = new MutableRecordReader<Record>(this);
- }
-
- @Override
- public void invoke() throws Exception {
- // ensure that input is consumed, although this task should never see any records
- while (reader.next(record)) {
- throw new IllegalStateException("This task should not receive any data");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 62eafa4..aa48b0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1241,10 +1241,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* @return The OutputCollector that data produced in this task is submitted to.
*/
public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<BufferWriter> eventualOutputs, int numOutputs)
- throws Exception
+ throws Exception
{
- if (numOutputs <= 0) {
- throw new Exception("BUG: The task must have at least one output");
+ if (numOutputs == 0) {
+ return null;
}
// get the factory for the serializer
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index edc6467..3408557 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -245,7 +245,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
tailConfig.setSpillingThresholdInput(0, 0.9f);
// output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(outputSerializer);
// the udf
@@ -284,8 +283,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
AbstractJobVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer);
- AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
-
AbstractJobVertex sync = createSync(jobGraph, numIterations, numSubTasks);
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
@@ -304,8 +301,6 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(mapper, reducer, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
new TaskConfig(reducer.getConfiguration()).setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
- JobGraphUtils.connect(reducer, fakeTailOutput, ChannelType.NETWORK, DistributionPattern.POINTWISE);
-
JobGraphUtils.connect(head, output, ChannelType.NETWORK, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
@@ -319,13 +314,11 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
head.setSlotSharingGroup(sharingGroup);
mapper.setSlotSharingGroup(sharingGroup);
reducer.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
mapper.setStrictlyCoLocatedWith(head);
reducer.setStrictlyCoLocatedWith(head);
- fakeTailOutput.setStrictlyCoLocatedWith(reducer);
return jobGraph;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
index 8cf2c69..584bba4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
@@ -349,10 +349,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
return output;
}
- private static AbstractJobVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
- return JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks);
- }
-
private static AbstractJobVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -391,7 +387,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// --------------- the tail (solution set join) ---------------
@@ -411,7 +406,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
tailConfig.setInputSerializer(serializer, 0);
// output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(serializer);
// the driver
@@ -435,7 +429,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -447,11 +440,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
tail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- fakeTail.setSlotSharingGroup(sharingGroup);
intermediate.setStrictlyCoLocatedWith(head);
tail.setStrictlyCoLocatedWith(head);
- fakeTail.setStrictlyCoLocatedWith(tail);
return jobGraph;
}
@@ -483,8 +474,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// output and auxiliaries
OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- AbstractJobVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks);
- AbstractJobVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ss join) ----------------------
@@ -532,7 +521,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
// output
- ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
ssTailConfig.setOutputSerializer(serializer);
// the driver
@@ -555,7 +543,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
wsTailConfig.setInputSerializer(serializer, 0);
// output
- wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
wsTailConfig.setOutputSerializer(serializer);
// the driver
@@ -584,9 +571,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(ssTail, ssFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(wsTail, wsFakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
SlotSharingGroup sharingGroup = new SlotSharingGroup();
@@ -599,15 +583,11 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
ssTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- wsFakeTail.setSlotSharingGroup(sharingGroup);
- ssFakeTail.setSlotSharingGroup(sharingGroup);
intermediate.setStrictlyCoLocatedWith(head);
ssJoinIntermediate.setStrictlyCoLocatedWith(head);
wsTail.setStrictlyCoLocatedWith(head);
ssTail.setStrictlyCoLocatedWith(head);
- wsFakeTail.setStrictlyCoLocatedWith(wsTail);
- ssFakeTail.setStrictlyCoLocatedWith(ssTail);
return jobGraph;
}
@@ -639,7 +619,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// output and auxiliaries
AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ws update) ----------------------
@@ -685,7 +664,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
ssTailConfig.setInputSerializer(serializer, 0);
// output
- ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
ssTailConfig.setOutputSerializer(serializer);
// the driver
@@ -712,8 +690,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(ssTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
SlotSharingGroup sharingGroup = new SlotSharingGroup();
@@ -725,12 +701,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
ssTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- fakeTail.setSlotSharingGroup(sharingGroup);
intermediate.setStrictlyCoLocatedWith(head);
wsUpdateIntermediate.setStrictlyCoLocatedWith(head);
ssTail.setStrictlyCoLocatedWith(head);
- fakeTail.setStrictlyCoLocatedWith(ssTail);
return jobGraph;
}
@@ -764,7 +738,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
// output and auxiliaries
AbstractJobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- AbstractJobVertex fakeTail = createFakeTail(jobGraph, numSubTasks);
AbstractJobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
// ------------------ the intermediate (ss update) ----------------------
@@ -808,7 +781,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
wsTailConfig.setInputSerializer(serializer, 0);
// output
- wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
wsTailConfig.setOutputSerializer(serializer);
// the driver
@@ -834,8 +806,6 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(wsTail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -848,12 +818,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
wsTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- fakeTail.setSlotSharingGroup(sharingGroup);
intermediate.setStrictlyCoLocatedWith(head);
ssJoinIntermediate.setStrictlyCoLocatedWith(head);
wsTail.setStrictlyCoLocatedWith(head);
- fakeTail.setStrictlyCoLocatedWith(wsTail);
return jobGraph;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
index aa939ed..bab6153 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
@@ -201,7 +201,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
chainedMapperConfig.setInputSerializer(serializer, 0);
- chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
chainedMapperConfig.setOutputSerializer(serializer);
chainedMapperConfig.setIsWorksetUpdate();
@@ -220,9 +219,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
}
- // - fake tail -------------------------------------------------------------------------------------------------
- AbstractJobVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks);
-
// - sync ------------------------------------------------------------------------------------------------------
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -241,8 +237,6 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-
// --------------------------------------------------------------------------------------------------------------
// 3. INSTANCE SHARING
// --------------------------------------------------------------------------------------------------------------
@@ -252,12 +246,10 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
input.setSlotSharingGroup(sharingGroup);
head.setSlotSharingGroup(sharingGroup);
tail.setSlotSharingGroup(sharingGroup);
- fakeTail.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
tail.setStrictlyCoLocatedWith(head);
- fakeTail.setStrictlyCoLocatedWith(tail);
return jobGraph;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index 1734a15..9e858d3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.iterative.io.FakeOutputTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -110,15 +109,6 @@ public class JobGraphUtils {
return sync;
}
- public static AbstractJobVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) {
- AbstractJobVertex outputVertex = new AbstractJobVertex(name);
- jobGraph.addVertex(outputVertex);
-
- outputVertex.setInvokableClass(FakeOutputTask.class);
- outputVertex.setParallelism(degreeOfParallelism);
- return outputVertex;
- }
-
public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
jobGraph.addVertex(sinkVertex);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
index 662805e..4d25591 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java
@@ -253,7 +253,6 @@ public class CustomCompensatableDanglingPageRank {
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
// output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
// the stub
@@ -274,8 +273,6 @@ public class CustomCompensatableDanglingPageRank {
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
// --------------- the auxiliaries ---------------------
-
- AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -299,7 +296,6 @@ public class CustomCompensatableDanglingPageRank {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -309,11 +305,9 @@ public class CustomCompensatableDanglingPageRank {
head.setSlotSharingGroup(sharingGroup);
intermediate.setSlotSharingGroup(sharingGroup);
tail.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setStrictlyCoLocatedWith(tail);
tail.setStrictlyCoLocatedWith(head);
intermediate.setStrictlyCoLocatedWith(head);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 072db21..fe6c0e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -266,7 +266,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
tailConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());
// output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
// the stub
@@ -287,9 +286,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
// --------------- the auxiliaries ---------------------
-
- AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
- degreeOfParallelism);
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -313,7 +309,6 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -323,11 +318,9 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
head.setSlotSharingGroup(sharingGroup);
intermediate.setSlotSharingGroup(sharingGroup);
tail.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
-
- fakeTailOutput.setStrictlyCoLocatedWith(tail);
+
tail.setStrictlyCoLocatedWith(head);
intermediate.setStrictlyCoLocatedWith(head);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/78618491/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
index 269378b..88120c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/danglingpagerank/CompensatableDanglingPageRank.java
@@ -233,7 +233,6 @@ public class CompensatableDanglingPageRank {
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
// output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(recSerializer);
// the stub
@@ -254,8 +253,6 @@ public class CompensatableDanglingPageRank {
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
// --------------- the auxiliaries ---------------------
-
- AbstractJobVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -279,7 +276,6 @@ public class CompensatableDanglingPageRank {
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
JobGraphUtils.connect(head, output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
- JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
@@ -289,11 +285,9 @@ public class CompensatableDanglingPageRank {
head.setSlotSharingGroup(sharingGroup);
intermediate.setSlotSharingGroup(sharingGroup);
tail.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setSlotSharingGroup(sharingGroup);
output.setSlotSharingGroup(sharingGroup);
sync.setSlotSharingGroup(sharingGroup);
- fakeTailOutput.setStrictlyCoLocatedWith(tail);
tail.setStrictlyCoLocatedWith(head);
intermediate.setStrictlyCoLocatedWith(head);