You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/30 23:44:40 UTC
[5/5] flink git commit: [FLINK-1681] [tests] Remove outdated
'nephele' iteration tests.
[FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7a57ebe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7a57ebe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7a57ebe
Branch: refs/heads/master
Commit: a7a57ebea6d8f60abba4fe2559af05d316112ca4
Parents: 0ba5355
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 30 18:29:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 30 22:39:17 2015 +0200
----------------------------------------------------------------------
.../apache/flink/test/util/TestBaseUtils.java | 45 +
.../BulkIterationWithAllReducerITCase.java | 6 +-
.../test/iterative/DanglingPageRankITCase.java | 392 ++++++++-
.../flink/test/iterative/PageRankITCase.java | 54 --
.../test/iterative/nephele/ConfigUtils.java | 64 --
.../ConnectedComponentsNepheleITCase.java | 837 -------------------
.../nephele/DanglingPageRankNepheleITCase.java | 75 --
...nglingPageRankWithCombinerNepheleITCase.java | 63 --
.../IterationWithChainingNepheleITCase.java | 296 -------
.../test/iterative/nephele/JobGraphUtils.java | 106 ---
.../CustomCompensatableDanglingPageRank.java | 315 -------
...mpensatableDanglingPageRankWithCombiner.java | 329 --------
.../CustomCompensatableDotProductCoGroup.java | 130 ---
.../CustomCompensatableDotProductMatch.java | 80 --
.../CustomCompensatingMap.java | 82 --
.../CustomImprovedAdjacencyListInputFormat.java | 66 --
...stomImprovedDanglingPageRankInputFormat.java | 66 --
.../CustomPageWithRankOutFormat.java | 45 -
.../CustomRankCombiner.java | 57 --
.../types/VertexWithAdjacencyList.java | 83 --
.../VertexWithAdjacencyListComparator.java | 148 ----
...ertexWithAdjacencyListComparatorFactory.java | 39 -
.../VertexWithAdjacencyListSerializer.java | 112 ---
...ertexWithAdjacencyListSerializerFactory.java | 56 --
.../types/VertexWithRank.java | 65 --
.../types/VertexWithRankAndDangling.java | 76 --
.../VertexWithRankAndDanglingComparator.java | 153 ----
...texWithRankAndDanglingComparatorFactory.java | 39 -
.../VertexWithRankAndDanglingSerializer.java | 84 --
...texWithRankAndDanglingSerializerFactory.java | 56 --
.../types/VertexWithRankComparator.java | 151 ----
.../types/VertexWithRankComparatorFactory.java | 39 -
...xWithAdjacencyListPairComparatorFactory.java | 91 --
...ngToVertexWithRankPairComparatorFactory.java | 91 --
.../types/VertexWithRankSerializer.java | 81 --
.../types/VertexWithRankSerializerFactory.java | 56 --
.../danglingpagerank/AsciiLongArrayView.java | 166 ----
.../nephele/danglingpagerank/BooleanValue.java | 57 --
.../CompensatableDanglingPageRank.java | 295 -------
.../CompensatableDotProductCoGroup.java | 137 ---
.../CompensatableDotProductMatch.java | 102 ---
.../danglingpagerank/CompensatingMap.java | 88 --
.../DanglingPageGenerateRankInputFormat.java | 62 --
.../DiffL1NormConvergenceCriterion.java | 44 -
.../ImprovedAdjacencyListInputFormat.java | 74 --
.../ImprovedDanglingPageRankInputFormat.java | 73 --
.../nephele/danglingpagerank/LongArrayView.java | 88 --
.../nephele/danglingpagerank/PageRankStats.java | 124 ---
.../PageRankStatsAggregator.java | 84 --
.../danglingpagerank/PageWithRankOutFormat.java | 47 --
.../test/recordJobs/graph/DanglingPageRank.java | 105 ---
.../test/recordJobs/graph/SimplePageRank.java | 194 -----
.../graph/pageRankUtil/AsciiLongArrayView.java | 163 ----
.../DanglingPageRankInputFormat.java | 78 --
.../DiffL1NormConvergenceCriterion.java | 44 -
.../graph/pageRankUtil/DotProductCoGroup.java | 129 ---
.../graph/pageRankUtil/DotProductMatch.java | 63 --
.../ImprovedAdjacencyListInputFormat.java | 76 --
.../graph/pageRankUtil/LongArrayView.java | 89 --
.../graph/pageRankUtil/PageRankStats.java | 105 ---
.../pageRankUtil/PageRankStatsAggregator.java | 73 --
.../pageRankUtil/PageWithRankOutFormat.java | 51 --
62 files changed, 412 insertions(+), 6727 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index ce02267..87fab25 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -588,4 +588,49 @@ public class TestBaseUtils extends TestLogger {
return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
}
+
+ public static class TupleComparator<T extends Tuple> implements Comparator<T> {
+
+ @Override
+ public int compare(T o1, T o2) {
+ if (o1 == null || o2 == null) {
+ throw new IllegalArgumentException("Cannot compare null tuples");
+ }
+ else if (o1.getArity() != o2.getArity()) {
+ return o1.getArity() - o2.getArity();
+ }
+ else {
+ for (int i = 0; i < o1.getArity(); i++) {
+ Object val1 = o1.getField(i);
+ Object val2 = o2.getField(i);
+
+ int cmp;
+ if (val1 != null && val2 != null) {
+ cmp = compareValues(val1, val2);
+ }
+ else {
+ cmp = val1 == null ? (val2 == null ? 0 : -1) : 1;
+ }
+
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return 0;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <X extends Comparable<X>> int compareValues(Object o1, Object o2) {
+ if (o1 instanceof Comparable && o2 instanceof Comparable) {
+ X c1 = (X) o1;
+ X c2 = (X) o2;
+ return c1.compareTo(c2);
+ }
+ else {
+ throw new IllegalArgumentException("Cannot compare tuples with non comparable elements");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index f4f2c18..d55a63f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -64,10 +64,8 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
@Override
public void open(Configuration parameters) {
- Collection<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
- synchronized (bc) {
- this.bcValue = bc.isEmpty() ? null : bc.iterator().next();
- }
+ List<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
+ this.bcValue = bc.isEmpty() ? null : bc.get(0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index e2d095d..53496e2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -18,35 +18,373 @@
package org.apache.flink.test.iterative;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.iterative.nephele.DanglingPageRankNepheleITCase;
-import org.apache.flink.test.recordJobs.graph.DanglingPageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
-public class DanglingPageRankITCase extends RecordAPITestBase {
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
- protected String pagesPath;
- protected String edgesPath;
- protected String resultPath;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@RunWith(Parameterized.class)
+@SuppressWarnings({"serial", "unchecked"})
+public class DanglingPageRankITCase extends MultipleProgramsTestBase {
+
+ private static final String AGGREGATOR_NAME = "pagerank.aggregator";
+
+
+ public DanglingPageRankITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testDanglingPageRank() {
+ try {
+ final int NUM_ITERATIONS = 25;
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Boolean>> vertices = env.fromElements(
+ new Tuple2<>(1L, false),
+ new Tuple2<>(2L, false),
+ new Tuple2<>(5L, false),
+ new Tuple2<>(3L, true),
+ new Tuple2<>(4L, false)
+ );
+
+ DataSet<PageWithLinks> edges = env.fromElements(
+ new PageWithLinks(2L, new long[] { 1 }),
+ new PageWithLinks(5L, new long[] { 2, 4 }),
+ new PageWithLinks(4L, new long[] { 3, 2 }),
+ new PageWithLinks(1L, new long[] { 4, 2, 3 })
+ );
+
+
+ final long numVertices = vertices.count();
+ final long numDanglingVertices = vertices
+ .filter(
+ new FilterFunction<Tuple2<Long, Boolean>>() {
+ @Override
+ public boolean filter(Tuple2<Long, Boolean> value) {
+ return value.f1;
+ }
+ })
+ .count();
+
+
+ DataSet<PageWithRankAndDangling> verticesWithInitialRank = vertices
+ .map(new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>() {
+
+ @Override
+ public PageWithRankAndDangling map(Tuple2<Long, Boolean> value) {
+ return new PageWithRankAndDangling(value.f0, 1.0 / numVertices, value.f1);
+ }
+ });
+
+ IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(NUM_ITERATIONS);
+
+ iteration.getAggregators().registerAggregationConvergenceCriterion(
+ AGGREGATOR_NAME,
+ new PageRankStatsAggregator(),
+ new DiffL1NormConvergenceCriterion());
+
+ DataSet<PageWithRank> partialRanks = iteration.join(edges).where("pageId").equalTo("pageId").with(
+ new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>() {
+
+ @Override
+ public void join(PageWithRankAndDangling page,
+ PageWithLinks links,
+ Collector<PageWithRank> out) {
+
+ double rankToDistribute = page.rank / (double) links.targets.length;
+ PageWithRank output = new PageWithRank(0L, rankToDistribute);
+
+ for (long target : links.targets) {
+ output.pageId = target;
+ out.collect(output);
+ }
+ }
+ }
+ );
+
+ DataSet<PageWithRankAndDangling> newRanks =
+ iteration.coGroup(partialRanks).where("pageId").equalTo("pageId").with(
+ new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>() {
+
+ private static final double BETA = 0.85;
+
+ private final double randomJump = (1.0 - BETA) / numVertices;
+ private PageRankStatsAggregator aggregator;
+ private double danglingRankFactor;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
+
+ aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
+
+ if (currentIteration == 1) {
+ danglingRankFactor = BETA * (double) numDanglingVertices /
+ ((double) numVertices * (double) numVertices);
+ } else {
+ PageRankStats previousAggregate = getIterationRuntimeContext()
+ .getPreviousIterationAggregate(AGGREGATOR_NAME);
+ danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
+ }
+ }
+
+ @Override
+ public void coGroup(Iterable<PageWithRankAndDangling> currentPages,
+ Iterable<PageWithRank> partialRanks,
+ Collector<PageWithRankAndDangling> out) {
+
+ // compute the next rank
+ long edges = 0;
+ double summedRank = 0;
+ for (PageWithRank partial : partialRanks) {
+ summedRank += partial.rank;
+ edges++;
+ }
+ double rank = BETA * summedRank + randomJump + danglingRankFactor;
+
+ // current rank, for stats and convergence
+ PageWithRankAndDangling currentPage = currentPages.iterator().next();
+ double currentRank = currentPage.rank;
+ boolean isDangling = currentPage.dangling;
+
+ // maintain statistics to compensate for probability loss on dangling nodes
+ double danglingRankToAggregate = isDangling ? rank : 0;
+ long danglingVerticesToAggregate = isDangling ? 1 : 0;
+ double diff = Math.abs(currentRank - rank);
+ aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges);
+
+ currentPage.rank = rank;
+ out.collect(currentPage);
+ }
+ });
+
+ List<PageWithRankAndDangling> result = iteration.closeWith(newRanks).collect();
+
+ double totalRank = 0.0;
+ for (PageWithRankAndDangling r : result) {
+ totalRank += r.rank;
+ assertTrue(r.pageId >= 1 && r.pageId <= 5);
+ assertTrue(r.pageId != 3 || r.dangling);
+ }
+
+ assertEquals(1.0, totalRank, 0.001);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // custom types
+ // ------------------------------------------------------------------------
+
+ public static class PageWithRank {
+
+ public long pageId;
+ public double rank;
+
+ public PageWithRank() {}
+
+ public PageWithRank(long pageId, double rank) {
+ this.pageId = pageId;
+ this.rank = rank;
+ }
+ }
+
+ public static class PageWithRankAndDangling {
+
+ public long pageId;
+ public double rank;
+ public boolean dangling;
+
+ public PageWithRankAndDangling() {}
+
+ public PageWithRankAndDangling(long pageId, double rank, boolean dangling) {
+ this.pageId = pageId;
+ this.rank = rank;
+ this.dangling = dangling;
+ }
+
+ @Override
+ public String toString() {
+ return "PageWithRankAndDangling{" +
+ "pageId=" + pageId +
+ ", rank=" + rank +
+ ", dangling=" + dangling +
+ '}';
+ }
+ }
+
+ public static class PageWithLinks {
+
+ public long pageId;
+ public long[] targets;
+
+ public PageWithLinks() {}
+
+ public PageWithLinks(long pageId, long[] targets) {
+ this.pageId = pageId;
+ this.targets = targets;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // statistics
+ // ------------------------------------------------------------------------
+
+ public static class PageRankStats implements Value {
+
+ private double diff;
+ private double rank;
+ private double danglingRank;
+ private long numDanglingVertices;
+ private long numVertices;
+ private long edges;
+
+ public PageRankStats() {}
+
+ public PageRankStats(
+ double diff, double rank, double danglingRank,
+ long numDanglingVertices, long numVertices, long edges) {
+
+ this.diff = diff;
+ this.rank = rank;
+ this.danglingRank = danglingRank;
+ this.numDanglingVertices = numDanglingVertices;
+ this.numVertices = numVertices;
+ this.edges = edges;
+ }
+
+ public double diff() {
+ return diff;
+ }
+
+ public double rank() {
+ return rank;
+ }
+
+ public double danglingRank() {
+ return danglingRank;
+ }
+
+ public long numDanglingVertices() {
+ return numDanglingVertices;
+ }
+
+ public long numVertices() {
+ return numVertices;
+ }
+
+ public long edges() {
+ return edges;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeDouble(diff);
+ out.writeDouble(rank);
+ out.writeDouble(danglingRank);
+ out.writeLong(numDanglingVertices);
+ out.writeLong(numVertices);
+ out.writeLong(edges);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ diff = in.readDouble();
+ rank = in.readDouble();
+ danglingRank = in.readDouble();
+ numDanglingVertices = in.readLong();
+ numVertices = in.readLong();
+ edges = in.readLong();
+ }
+
+ @Override
+ public String toString() {
+ return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
+ "], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
+ "]";
+ }
+ }
- @Override
- protected void preSubmit() throws Exception {
- pagesPath = createTempFile("pages.txt", DanglingPageRankNepheleITCase.TEST_VERTICES);
- edgesPath = createTempFile("edges.txt", DanglingPageRankNepheleITCase.TEST_EDGES);
- resultPath = getTempFilePath("results");
- }
-
- @Override
- protected Plan getTestJob() {
- DanglingPageRank pr = new DanglingPageRank();
- Plan plan = pr.getPlan(
- String.valueOf(parallelism),
- pagesPath,
- edgesPath,
- resultPath,
- "25", // max iterations
- "5", // num vertices
- "1"); // num dangling vertices
- return plan;
+ public static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
+
+ private double diff;
+ private double rank;
+ private double danglingRank;
+ private long numDanglingVertices;
+ private long numVertices;
+ private long edges;
+
+ @Override
+ public PageRankStats getAggregate() {
+ return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges);
+ }
+
+ public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
+ long verticesDelta, long edgesDelta) {
+ diff += diffDelta;
+ rank += rankDelta;
+ danglingRank += danglingRankDelta;
+ numDanglingVertices += danglingVerticesDelta;
+ numVertices += verticesDelta;
+ edges += edgesDelta;
+ }
+
+ @Override
+ public void aggregate(PageRankStats pageRankStats) {
+ diff += pageRankStats.diff();
+ rank += pageRankStats.rank();
+ danglingRank += pageRankStats.danglingRank();
+ numDanglingVertices += pageRankStats.numDanglingVertices();
+ numVertices += pageRankStats.numVertices();
+ edges += pageRankStats.edges();
+ }
+
+ @Override
+ public void reset() {
+ diff = 0;
+ rank = 0;
+ danglingRank = 0;
+ numDanglingVertices = 0;
+ numVertices = 0;
+ edges = 0;
+ }
+ }
+
+ public static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
+
+ private static final double EPSILON = 0.00005;
+
+ @Override
+ public boolean isConverged(int iteration, PageRankStats pageRankStats) {
+ return pageRankStats.diff() < EPSILON;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
deleted file mode 100644
index 946d89b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.graph.SimplePageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class PageRankITCase extends RecordAPITestBase {
-
- private static final String VERTICES = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n";
-
- private static final String EDGES = "1 2\n2 3\n3 4\n4 5\n5 6\n6 7\n7 8\n8 9\n9 10\n10 1\n";
-
- protected String pagesPath;
- protected String edgesPath;
- protected String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- pagesPath = createTempFile("pages.txt", VERTICES);
- edgesPath = createTempFile("edges.txt", EDGES);
- resultPath = getTempFilePath("results");
- }
-
- @Override
- protected Plan getTestJob() {
- SimplePageRank pr = new SimplePageRank();
- Plan plan = pr.getPlan(
- String.valueOf(parallelism),
- pagesPath,
- edgesPath,
- resultPath,
- "5", // max iterations
- "10"); // num vertices
- return plan;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
deleted file mode 100644
index c9eea3d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.configuration.Configuration;
-
-public class ConfigUtils {
-
- private ConfigUtils() {
- }
-
- public static int asInteger(String key, Configuration parameters) {
- int value = parameters.getInteger(key, -1);
- if (value == -1) {
- throw new IllegalStateException();
- }
- return value;
- }
-
- public static double asDouble(String key, Configuration parameters) {
- double value = Double.parseDouble(parameters.getString(key, String.valueOf(Double.NaN)));
- if (Double.isNaN(value)) {
- throw new IllegalStateException();
- }
- return value;
- }
-
- public static long asLong(String key, Configuration parameters) {
- long value = parameters.getLong(key, Long.MIN_VALUE);
- if (value == Long.MIN_VALUE) {
- throw new IllegalStateException();
- }
- return value;
- }
-
- public static Set<Integer> asIntSet(String key, Configuration parameters) {
- String[] tokens = parameters.getString(key, "").split(",");
- Set<Integer> failingWorkers = new HashSet<Integer>(tokens.length);
- for (String token : tokens) {
- failingWorkers.add(Integer.parseInt(token));
- }
- return failingWorkers;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
deleted file mode 100644
index 7a3639b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ /dev/null
@@ -1,837 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.io.BufferedReader;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
-import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.MinimumComponentIDReduce;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.UpdateComponentIdMatch;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests the various variants of iteration state updates for workset iterations:
- * - unified solution set and workset tail update
- * - separate solution set and workset tail updates
- * - intermediate workset update and solution set tail
- * - intermediate solution set update and workset tail
- */
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
-
- private static final long SEED = 0xBADC0FFEEBEEFL;
-
- private static final int NUM_VERTICES = 1000;
-
- private static final int NUM_EDGES = 10000;
-
- private static final int ITERATION_ID = 1;
-
- private static final long MEM_PER_CONSUMER = 3;
-
- private static final int parallelism = 4;
-
- private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
-
- protected String verticesPath;
-
- protected String edgesPath;
-
- protected String resultPath;
-
- public ConnectedComponentsNepheleITCase(Configuration config) {
- super(config);
- setTaskManagerNumSlots(parallelism);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() {
- Configuration config1 = new Configuration();
- config1.setInteger("testcase", 1);
-
- Configuration config2 = new Configuration();
- config2.setInteger("testcase", 2);
-
- Configuration config3 = new Configuration();
- config3.setInteger("testcase", 3);
-
- Configuration config4 = new Configuration();
- config4.setInteger("testcase", 4);
-
- return toParameterList(config1, config2, config3, config4);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
- edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
- resultPath = getTempFilePath("results");
- }
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- int maxIterations = 100;
-
- int type = config.getInteger("testcase", 0);
- switch (type) {
- case 1:
- return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
- case 2:
- return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
- case 3:
- return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, parallelism,
- maxIterations);
- case 4:
- return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, parallelism,
- maxIterations);
- default:
- throw new RuntimeException("Broken test configuration");
- }
- }
-
- @Override
- protected void postSubmit() throws Exception {
- for (BufferedReader reader : getResultReader(resultPath)) {
- ConnectedComponentsData.checkOddEvenResult(reader);
- }
- }
-
- public static final class IdDuplicator extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- record.setField(1, record.getField(0, LongValue.class));
- out.collect(record);
- }
-
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // Invariant vertices across all variants
- // -----------------------------------------------------------------------------------------------------------------
-
- private static InputFormatVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
- TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
- {
- @SuppressWarnings("unchecked")
- CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
- InputFormatVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
- jobGraph, numSubTasks);
- TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
- {
- verticesInputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- verticesInputConfig.setOutputSerializer(serializer);
-
- // chained mapper that duplicates the id
- TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
- chainedMapperConfig.setStubWrapper(new UserCodeClassWrapper<IdDuplicator>(IdDuplicator.class));
- chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
- chainedMapperConfig.setInputSerializer(serializer, 0);
-
- chainedMapperConfig.setOutputSerializer(serializer);
- chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- chainedMapperConfig.setOutputComparator(comparator, 0);
- chainedMapperConfig.setOutputComparator(comparator, 1);
-
- verticesInputConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "ID Duplicator");
- }
-
- return verticesInput;
- }
-
- private static InputFormatVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks,
- TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
- {
- // edges
- @SuppressWarnings("unchecked")
- CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class);
- InputFormatVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
- numSubTasks);
- TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
- {
- edgesInputConfig.setOutputSerializer(serializer);
- edgesInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- edgesInputConfig.setOutputComparator(comparator, 0);
- }
-
- return edgesInput;
- }
-
- private static JobVertex createIterationHead(JobGraph jobGraph, int numSubTasks,
- TypeSerializerFactory<?> serializer,
- TypeComparatorFactory<?> comparator,
- TypePairComparatorFactory<?, ?> pairComparator) {
-
- JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)", jobGraph, numSubTasks);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- {
- headConfig.setIterationId(ITERATION_ID);
-
- // initial input / workset
- headConfig.addInputToGroup(0);
- headConfig.setInputSerializer(serializer, 0);
- headConfig.setInputComparator(comparator, 0);
- headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
- headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-
- // regular plan input (second input to the join)
- headConfig.addInputToGroup(1);
- headConfig.setInputSerializer(serializer, 1);
- headConfig.setInputComparator(comparator, 1);
- headConfig.setInputLocalStrategy(1, LocalStrategy.NONE);
- headConfig.setInputCached(1, true);
- headConfig.setRelativeInputMaterializationMemory(1, MEM_FRAC_PER_CONSUMER);
-
- // initial solution set input
- headConfig.addInputToGroup(2);
- headConfig.setInputSerializer(serializer, 2);
- headConfig.setInputComparator(comparator, 2);
- headConfig.setInputLocalStrategy(2, LocalStrategy.NONE);
- headConfig.setIterationHeadSolutionSetInputIndex(2);
-
- headConfig.setSolutionSetSerializer(serializer);
- headConfig.setSolutionSetComparator(comparator);
-
- // back channel / iterations
- headConfig.setIsWorksetIteration();
- headConfig.setRelativeBackChannelMemory(MEM_FRAC_PER_CONSUMER);
- headConfig.setRelativeSolutionSetMemory(MEM_FRAC_PER_CONSUMER );
-
- // output into iteration
- headConfig.setOutputSerializer(serializer);
- headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- headConfig.setOutputComparator(comparator, 0);
-
- // final output
- TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
- headFinalOutConfig.setOutputSerializer(serializer);
- headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
- // the sync
- headConfig.setIterationHeadIndexOfSyncOutput(2);
-
- // the driver
- headConfig.setDriver(BuildSecondCachedJoinDriver.class);
- headConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- headConfig.setStubWrapper(
- new UserCodeClassWrapper<NeighborWithComponentIDJoin>(NeighborWithComponentIDJoin.class));
- headConfig.setDriverComparator(comparator, 0);
- headConfig.setDriverComparator(comparator, 1);
- headConfig.setDriverPairComparator(pairComparator);
- headConfig.setRelativeMemoryDriver(MEM_FRAC_PER_CONSUMER);
-
- headConfig.addIterationAggregator(
- WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
- }
-
- return head;
- }
-
- private static JobVertex createIterationIntermediate(JobGraph jobGraph, int numSubTasks,
- TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
- {
- // --------------- the intermediate (reduce to min id) ---------------
- JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Find Min Component-ID", jobGraph, numSubTasks);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
- {
- intermediateConfig.setIterationId(ITERATION_ID);
-
- intermediateConfig.addInputToGroup(0);
- intermediateConfig.setInputSerializer(serializer, 0);
- intermediateConfig.setInputComparator(comparator, 0);
- intermediateConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
- intermediateConfig.setRelativeMemoryInput(0, MEM_FRAC_PER_CONSUMER);
- intermediateConfig.setFilehandlesInput(0, 64);
- intermediateConfig.setSpillingThresholdInput(0, 0.85f);
-
- intermediateConfig.setOutputSerializer(serializer);
- intermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
- intermediateConfig.setDriver(GroupReduceDriver.class);
- intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- intermediateConfig.setDriverComparator(comparator, 0);
- intermediateConfig.setStubWrapper(
- new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingClassReduceFunction(MinimumComponentIDReduce.class)));
- }
-
- return intermediate;
- }
-
- private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
- TypeSerializerFactory<?> serializer) {
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
- TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
- {
-
- outputConfig.addInputToGroup(0);
- outputConfig.setInputSerializer(serializer, 0);
-
- outputConfig.setStubWrapper(new UserCodeClassWrapper<CsvOutputFormat>(CsvOutputFormat.class));
- outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, resultPath);
-
- Configuration outputUserConfig = outputConfig.getStubParameters();
- outputUserConfig.setString(CsvOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
- outputUserConfig.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, " ");
- outputUserConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, LongValue.class);
- outputUserConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
- outputUserConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, LongValue.class);
- outputUserConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
- outputUserConfig.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
- }
-
- return output;
- }
-
- private static JobVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
- JobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setNumberOfIterations(maxIterations);
- syncConfig.setIterationId(ITERATION_ID);
- syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
- new LongSumAggregator());
- syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
- new WorksetEmptyConvergenceCriterion());
-
- return sync;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // Unified solution set and workset tail update
- // -----------------------------------------------------------------------------------------------------------------
-
- public JobGraph createJobGraphUnifiedTails(
- String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
- {
- // -- init -------------------------------------------------------------------------------------------------
- final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
- @SuppressWarnings("unchecked")
- final TypeComparatorFactory<?> comparator =
- new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
- final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
- JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
-
- // -- invariant vertices -----------------------------------------------------------------------------------
- InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
- JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-
- JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
- OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
- // --------------- the tail (solution set join) ---------------
- JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, numSubTasks);
- TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
- {
- tailConfig.setIterationId(ITERATION_ID);
-
- tailConfig.setIsWorksetIteration();
- tailConfig.setIsWorksetUpdate();
-
- tailConfig.setIsSolutionSetUpdate();
- tailConfig.setIsSolutionSetUpdateWithoutReprobe();
-
- // inputs and driver
- tailConfig.addInputToGroup(0);
- tailConfig.setInputSerializer(serializer, 0);
-
- // output
- tailConfig.setOutputSerializer(serializer);
-
- // the driver
- tailConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
- tailConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- tailConfig.setDriverComparator(comparator, 0);
- tailConfig.setDriverPairComparator(pairComparator);
-
- tailConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
- }
-
- // -- edges ------------------------------------------------------------------------------------------------
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
- JobGraphUtils.connect(intermediate, tail, DistributionPattern.POINTWISE);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- vertices.setSlotSharingGroup(sharingGroup);
- edges.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- tail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- intermediate.setStrictlyCoLocatedWith(head);
- tail.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-
- public JobGraph createJobGraphSeparateTails(
- String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
- {
- // -- init -------------------------------------------------------------------------------------------------
- final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
- @SuppressWarnings("unchecked")
- final TypeComparatorFactory<?> comparator =
- new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
- final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
- JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
-
- // input
- InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
- // head
- JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- headConfig.setWaitForSolutionSetUpdate();
-
- // intermediate
- JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
- // output and auxiliaries
- OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
- // ------------------ the intermediate (ss join) ----------------------
- JobVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Solution Set Join", jobGraph, numSubTasks);
- TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
- {
- ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
-
- // inputs
- ssJoinIntermediateConfig.addInputToGroup(0);
- ssJoinIntermediateConfig.setInputSerializer(serializer, 0);
-
- // output
- ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- ssJoinIntermediateConfig.setOutputComparator(comparator, 0);
- ssJoinIntermediateConfig.setOutputComparator(comparator, 1);
-
- ssJoinIntermediateConfig.setOutputSerializer(serializer);
-
- // driver
- ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
- ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- ssJoinIntermediateConfig.setDriverComparator(comparator, 0);
- ssJoinIntermediateConfig.setDriverPairComparator(pairComparator);
-
- ssJoinIntermediateConfig.setStubWrapper(
- new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
- }
-
- // -------------------------- ss tail --------------------------------
- JobVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail",
- jobGraph, numSubTasks);
- TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
- {
- ssTailConfig.setIterationId(ITERATION_ID);
- ssTailConfig.setIsSolutionSetUpdate();
- ssTailConfig.setIsWorksetIteration();
-
- // inputs and driver
- ssTailConfig.addInputToGroup(0);
- ssTailConfig.setInputSerializer(serializer, 0);
- ssTailConfig.setInputAsynchronouslyMaterialized(0, true);
- ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
-
- // output
- ssTailConfig.setOutputSerializer(serializer);
-
- // the driver
- ssTailConfig.setDriver(CollectorMapDriver.class);
- ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- ssTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
- }
-
- // -------------------------- ws tail --------------------------------
- JobVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
- jobGraph, numSubTasks);
- TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
- {
- wsTailConfig.setIterationId(ITERATION_ID);
- wsTailConfig.setIsWorksetIteration();
- wsTailConfig.setIsWorksetUpdate();
-
- // inputs and driver
- wsTailConfig.addInputToGroup(0);
- wsTailConfig.setInputSerializer(serializer, 0);
-
- // output
- wsTailConfig.setOutputSerializer(serializer);
-
- // the driver
- wsTailConfig.setDriver(CollectorMapDriver.class);
- wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- wsTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
- }
-
- // --------------- the wiring ---------------------
-
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
- JobGraphUtils.connect(intermediate, ssJoinIntermediate, DistributionPattern.POINTWISE);
- ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(ssJoinIntermediate, ssTail, DistributionPattern.POINTWISE);
- ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(ssJoinIntermediate, wsTail, DistributionPattern.POINTWISE);
- wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- vertices.setSlotSharingGroup(sharingGroup);
- edges.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- ssJoinIntermediate.setSlotSharingGroup(sharingGroup);
- wsTail.setSlotSharingGroup(sharingGroup);
- ssTail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- intermediate.setStrictlyCoLocatedWith(head);
- ssJoinIntermediate.setStrictlyCoLocatedWith(head);
- wsTail.setStrictlyCoLocatedWith(head);
- ssTail.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-
- public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(
- String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
- {
- // -- init -------------------------------------------------------------------------------------------------
- final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
- @SuppressWarnings("unchecked")
- final TypeComparatorFactory<?> comparator =
- new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
- final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
- JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
-
- // input
- InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
- // head
- JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- headConfig.setWaitForSolutionSetUpdate();
-
- // intermediate
- JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
- // output and auxiliaries
- JobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
- // ------------------ the intermediate (ws update) ----------------------
- JobVertex wsUpdateIntermediate =
- JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph, numSubTasks);
- TaskConfig wsUpdateConfig = new TaskConfig(wsUpdateIntermediate.getConfiguration());
- {
- wsUpdateConfig.setIterationId(ITERATION_ID);
- wsUpdateConfig.setIsWorksetIteration();
- wsUpdateConfig.setIsWorksetUpdate();
-
- // inputs
- wsUpdateConfig.addInputToGroup(0);
- wsUpdateConfig.setInputSerializer(serializer, 0);
-
- // output
- wsUpdateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- wsUpdateConfig.setOutputComparator(comparator, 0);
-
- wsUpdateConfig.setOutputSerializer(serializer);
-
- // driver
- wsUpdateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
- wsUpdateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- wsUpdateConfig.setDriverComparator(comparator, 0);
- wsUpdateConfig.setDriverPairComparator(pairComparator);
-
- wsUpdateConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(
- UpdateComponentIdMatch.class));
- }
-
- // -------------------------- ss tail --------------------------------
- JobVertex ssTail =
- JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, numSubTasks);
- TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
- {
- ssTailConfig.setIterationId(ITERATION_ID);
- ssTailConfig.setIsSolutionSetUpdate();
- ssTailConfig.setIsWorksetIteration();
-
- // inputs and driver
- ssTailConfig.addInputToGroup(0);
- ssTailConfig.setInputSerializer(serializer, 0);
-
- // output
- ssTailConfig.setOutputSerializer(serializer);
-
- // the driver
- ssTailConfig.setDriver(CollectorMapDriver.class);
- ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- ssTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
- }
-
- // edges
-
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
- JobGraphUtils.connect(intermediate, wsUpdateIntermediate,
- DistributionPattern.POINTWISE);
- wsUpdateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(wsUpdateIntermediate, ssTail, DistributionPattern.POINTWISE);
- ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- vertices.setSlotSharingGroup(sharingGroup);
- edges.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- wsUpdateIntermediate.setSlotSharingGroup(sharingGroup);
- ssTail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- intermediate.setStrictlyCoLocatedWith(head);
- wsUpdateIntermediate.setStrictlyCoLocatedWith(head);
- ssTail.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-
- // -----------------------------------------------------------------------------------------------------------------
- // Intermediate solution set update and workset tail
- // -----------------------------------------------------------------------------------------------------------------
-
- public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(
- String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
- {
- // -- init -------------------------------------------------------------------------------------------------
- final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
- @SuppressWarnings("unchecked")
- final TypeComparatorFactory<?> comparator =
- new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
- final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
- JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
-
- // input
- InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
- InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
- // head
- JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-
- // intermediate
- JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
- TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
- // output and auxiliaries
- JobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
- JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
- // ------------------ the intermediate (ss update) ----------------------
- JobVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
- "Solution Set Update", jobGraph, numSubTasks);
- TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
- {
- ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
- ssJoinIntermediateConfig.setIsSolutionSetUpdate();
- ssJoinIntermediateConfig.setIsSolutionSetUpdateWithoutReprobe();
-
- // inputs
- ssJoinIntermediateConfig.addInputToGroup(0);
- ssJoinIntermediateConfig.setInputSerializer(serializer, 0);
-
- // output
- ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- ssJoinIntermediateConfig.setOutputComparator(comparator, 0);
-
- ssJoinIntermediateConfig.setOutputSerializer(serializer);
-
- // driver
- ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
- ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- ssJoinIntermediateConfig.setDriverComparator(comparator, 0);
- ssJoinIntermediateConfig.setDriverPairComparator(pairComparator);
-
- ssJoinIntermediateConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
- }
-
- // -------------------------- ws tail --------------------------------
- JobVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, numSubTasks);
- TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
- {
- wsTailConfig.setIterationId(ITERATION_ID);
- wsTailConfig.setIsWorksetIteration();
- wsTailConfig.setIsWorksetUpdate();
-
- // inputs and driver
- wsTailConfig.addInputToGroup(0);
- wsTailConfig.setInputSerializer(serializer, 0);
-
- // output
- wsTailConfig.setOutputSerializer(serializer);
-
- // the driver
- wsTailConfig.setDriver(CollectorMapDriver.class);
- wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- wsTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
- }
-
- // --------------- the wiring ---------------------
-
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
- JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
- JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
- intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
- JobGraphUtils.connect(intermediate, ssJoinIntermediate, DistributionPattern.POINTWISE);
- ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(ssJoinIntermediate, wsTail, DistributionPattern.POINTWISE);
- wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
- vertices.setSlotSharingGroup(sharingGroup);
- edges.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- intermediate.setSlotSharingGroup(sharingGroup);
- ssJoinIntermediate.setSlotSharingGroup(sharingGroup);
- wsTail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- intermediate.setStrictlyCoLocatedWith(head);
- ssJoinIntermediate.setStrictlyCoLocatedWith(head);
- wsTail.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-
- public static final class DummyMapper extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record rec, Collector<Record> out) {
- out.collect(rec);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
deleted file mode 100644
index 516309c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDanglingPageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
-
- public static final String TEST_VERTICES = "1\n" +
- "2\n" +
- "5\n" +
- "3 1\n" +
- "4";
-
- public static final String TEST_EDGES = "2 1\n" +
- "5 2 4\n" +
- "4 3 2\n" +
- "1 4 2 3";
-
- protected String pagesWithRankPath;
- protected String edgesPath;
- protected String resultPath;
-
- public DanglingPageRankNepheleITCase(){
- setTaskManagerNumSlots(parallelism);
- }
-
-
- @Override
- protected void preSubmit() throws Exception {
- this.pagesWithRankPath = createTempFile("pagesWithRank", TEST_VERTICES);
- this.edgesPath = createTempFile("edges", TEST_EDGES);
- this.resultPath = getTempDirPath("result");
- }
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- String[] parameters = new String[] {
- Integer.valueOf(parallelism).toString(),
- pagesWithRankPath,
- edgesPath,
- resultPath,
- "<none>",
- "2",
- "5",
- "5",
- "30",
- "5",
- "1",
- "0",
- "100",
- "0"
- };
-
- return CustomCompensatableDanglingPageRank.getJobGraph(parameters);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
deleted file mode 100644
index ba22ce5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDanglingPageRankWithCombiner;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase {
-
- protected String pagesWithRankPath;
- protected String edgesPath;
- protected String resultPath;
-
- public DanglingPageRankWithCombinerNepheleITCase(){
- setTaskManagerNumSlots(parallelism);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- this.pagesWithRankPath = createTempFile("pagesWithRank", DanglingPageRankNepheleITCase.TEST_VERTICES);
- this.edgesPath = createTempFile("edges", DanglingPageRankNepheleITCase.TEST_EDGES);
- this.resultPath = getTempDirPath("result");
- }
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- String[] parameters = new String[] {
- Integer.valueOf(parallelism).toString(),
- pagesWithRankPath,
- edgesPath,
- resultPath,
- "<none>",
- "2",
- "5",
- "3",
- "30",
- "5",
- "1",
- "0",
- "100",
- "0"
- };
-
- return CustomCompensatableDanglingPageRankWithCombiner.getJobGraph(parameters);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
deleted file mode 100644
index 7a3135c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.IterationWithChainingITCase;
-import org.apache.flink.test.recordJobs.kmeans.udfs.CoordVector;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Tests chained iteration tails.
- * <p>
- * GitHub issue #123 reports a problem with chaining of tasks to iteration tails. The initial fix worked around the
- * issue by having the compiler *not* chain tasks to an iteration tail. The existing IterationWithChainingITCase only
- * tests this compiler behavior. The JobGraph and bypasses the compiler to test the original chaining problem.
- * <p>
- * A chained mapper after the iteration tail (dummy reduce) increments the given input points in each iteration. The
- * final result will only be correct, if the chained mapper is successfully executed.
- *
- * {@link IterationWithChainingITCase}
- */
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
-
- private static final String INPUT_STRING = "0|%d.25|\n" + "1|%d.25|\n";
-
- private String dataPath;
-
- private String resultPath;
-
- public IterationWithChainingNepheleITCase(Configuration config) {
- super(config);
- setTaskManagerNumSlots(parallelism);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- String initialInput = String.format(INPUT_STRING, 1, 2);
- dataPath = createTempFile("data_points.txt", initialInput);
- resultPath = getTempFilePath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- int maxIterations = config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
- String result = String.format(INPUT_STRING, 1 + maxIterations, 2 + maxIterations);
- compareResultsByLinesInMemory(result, resultPath);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() {
- Configuration config = new Configuration();
- config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", parallelism);
- config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
- return toParameterList(config);
- }
-
- @Override
- protected JobGraph getJobGraph() throws Exception {
- int numSubTasks = config.getInteger("ChainedMapperNepheleITCase#NoSubtasks", 1);
- int maxIterations = config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
-
- return getTestJobGraph(dataPath, resultPath, numSubTasks, maxIterations);
- }
-
- private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSubTasks, int maxIterations) {
-
- final JobGraph jobGraph = new JobGraph("Iteration Tail with Chaining");
-
- final TypeSerializerFactory<Record> serializer = RecordSerializerFactory.get();
-
- @SuppressWarnings("unchecked")
- final TypeComparatorFactory<Record> comparator =
- new RecordComparatorFactory(new int[] { 0 }, new Class[] { IntValue.class });
-
- final int ITERATION_ID = 1;
-
- // --------------------------------------------------------------------------------------------------------------
- // 1. VERTICES
- // --------------------------------------------------------------------------------------------------------------
-
- // - input -----------------------------------------------------------------------------------------------------
- InputFormatVertex input = JobGraphUtils.createInput(
- new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
- TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
- {
- inputConfig.setOutputSerializer(serializer);
- inputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- }
-
- // - head ------------------------------------------------------------------------------------------------------
- JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
- TaskConfig headConfig = new TaskConfig(head.getConfiguration());
- {
- headConfig.setIterationId(ITERATION_ID);
-
- // input to iteration head
- headConfig.addInputToGroup(0);
- headConfig.setInputSerializer(serializer, 0);
- headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
- headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-
- // output into iteration
- headConfig.setOutputSerializer(serializer);
- headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
- headConfig.setOutputComparator(comparator, 0);
-
- // final output
- TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
- headFinalOutConfig.setOutputSerializer(serializer);
- headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
- // the sync
- headConfig.setIterationHeadIndexOfSyncOutput(2);
-
- // driver
- headConfig.setDriver(CollectorMapDriver.class);
- headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- headConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-
- // back channel
- headConfig.setRelativeBackChannelMemory(1.0);
- }
-
- // - tail ------------------------------------------------------------------------------------------------------
- JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks);
- TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
- {
- tailConfig.setIterationId(ITERATION_ID);
-
- // inputs and driver
- tailConfig.addInputToGroup(0);
- tailConfig.setInputSerializer(serializer, 0);
-
- // output
- tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
- tailConfig.setOutputSerializer(serializer);
-
- // the driver
- tailConfig.setDriver(GroupReduceDriver.class);
- tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- tailConfig.setDriverComparator(comparator, 0);
- tailConfig.setStubWrapper(new UserCodeClassWrapper<DummyReducer>(DummyReducer.class));
-
- // chained mapper
- TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
- chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
- chainedMapperConfig.setStubWrapper(new UserCodeClassWrapper<IncrementCoordinatesMapper>(
- IncrementCoordinatesMapper.class));
-
- chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
- chainedMapperConfig.setInputSerializer(serializer, 0);
-
- chainedMapperConfig.setOutputSerializer(serializer);
-
- chainedMapperConfig.setIsWorksetUpdate();
-
- tailConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "Chained ID Mapper");
- }
-
- // - output ----------------------------------------------------------------------------------------------------
- OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
- TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
- {
- outputConfig.addInputToGroup(0);
- outputConfig.setInputSerializer(serializer, 0);
-
- outputConfig.setStubWrapper(new UserCodeClassWrapper<PointOutFormat>(PointOutFormat.class));
- outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
- }
-
- // - sync ------------------------------------------------------------------------------------------------------
- JobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setNumberOfIterations(maxIterations);
- syncConfig.setIterationId(ITERATION_ID);
-
- // --------------------------------------------------------------------------------------------------------------
- // 2. EDGES
- // --------------------------------------------------------------------------------------------------------------
- JobGraphUtils.connect(input, head, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, tail, DistributionPattern.ALL_TO_ALL);
- tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
- JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
- JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
- // --------------------------------------------------------------------------------------------------------------
- // 3. INSTANCE SHARING
- // --------------------------------------------------------------------------------------------------------------
-
- SlotSharingGroup sharingGroup = new SlotSharingGroup();
-
- input.setSlotSharingGroup(sharingGroup);
- head.setSlotSharingGroup(sharingGroup);
- tail.setSlotSharingGroup(sharingGroup);
- output.setSlotSharingGroup(sharingGroup);
- sync.setSlotSharingGroup(sharingGroup);
-
- tail.setStrictlyCoLocatedWith(head);
-
- return jobGraph;
- }
-
- public static final class DummyMapper extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record rec, Collector<Record> out) {
- out.collect(rec);
- }
- }
-
- public static final class DummyReducer implements GroupReduceFunction<Record, Record> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reduce(Iterable<Record> it, Collector<Record> out) {
- for (Record r :it) {
- out.collect(r);
- }
- }
- }
-
- public static final class IncrementCoordinatesMapper extends MapFunction {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record rec, Collector<Record> out) {
- CoordVector coord = rec.getField(1, CoordVector.class);
-
- double[] vector = coord.getCoordinates();
- for (int i = 0; i < vector.length; i++) {
- vector[i]++;
- }
-
- rec.setField(1, coord);
- out.collect(rec);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
deleted file mode 100644
index 4edc83e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-public class JobGraphUtils {
-
- public static final long MEGABYTE = 1024l * 1024l;
-
- private JobGraphUtils() {}
-
- public static <T extends FileInputFormat<?>> InputFormatVertex createInput(T stub, String path, String name, JobGraph graph,
- int parallelism)
- {
- stub.setFilePath(path);
- return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, parallelism);
- }
-
- private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
- int parallelism)
- {
- InputFormatVertex inputVertex = new InputFormatVertex(name);
- graph.addVertex(inputVertex);
-
- inputVertex.setInvokableClass(DataSourceTask.class);
- inputVertex.setParallelism(parallelism);
-
- TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
- inputConfig.setStubWrapper(stub);
-
- return inputVertex;
- }
-
-// public static void connect(AbstractJobVertex source, AbstractJobVertex target, ChannelType channelType,
-// DistributionPattern distributionPattern, ShipStrategyType shipStrategy) throws JobGraphDefinitionException
-// {
-// source.connectTo(target, channelType, CompressionLevel.NO_COMPRESSION, distributionPattern);
-// new TaskConfig(source.getConfiguration()).addOutputShipStrategy(shipStrategy);
-// }
-
- public static void connect(JobVertex source, JobVertex target, DistributionPattern distributionPattern) {
- target.connectNewDataSetAsInput(source, distributionPattern);
- }
-
- @SuppressWarnings("rawtypes")
- public static JobVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int parallelism)
- {
- JobVertex taskVertex = new JobVertex(name);
- graph.addVertex(taskVertex);
-
- taskVertex.setInvokableClass(task);
- taskVertex.setParallelism(parallelism);
- return taskVertex;
- }
-
- public static JobVertex createSync(JobGraph jobGraph, int parallelism) {
- JobVertex sync = new JobVertex("BulkIterationSync");
- jobGraph.addVertex(sync);
-
- sync.setInvokableClass(IterationSynchronizationSinkTask.class);
- sync.setParallelism(1);
-
- TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
- syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, parallelism);
- return sync;
- }
-
- public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
- OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
- jobGraph.addVertex(sinkVertex);
-
- sinkVertex.setInvokableClass(DataSinkTask.class);
- sinkVertex.setParallelism(parallelism);
- return sinkVertex;
- }
-}